[Announce] Zeppelin 0.9.0 is released (Flink on Zeppelin)

2021-01-17 Thread Jeff Zhang
Hi flink folks,

I'd like to tell you that Zeppelin 0.9.0 is officially released, in this
new version we made a big refactoring and improvement on flink support. It
supports 3 major versions of flink (1.10, 1.11, 1.12)

You can download zeppelin 0.9.0 here.
http://zeppelin.apache.org/download.html

And check the following 2 links for more details of how to use flink on
zeppelin
https://app.gitbook.com/@jeffzhang/s/flink-on-zeppelin/
http://zeppelin.apache.org/docs/0.9.0/interpreter/flink.html


-- 
Best Regards

Jeff Zhang


Re: Enable custom REST APIs in Flink

2020-04-21 Thread Jeff Zhang
Hi Flavio,

I am curious know where service run, Do you create this service in UDF and
run it  in TM ?

Flavio Pompermaier  于2020年4月21日周二 下午8:30写道:

> Hi to all,
> many times it happens that we use Flink as a broker towards the data layer
> but we need to be able to get some specific info from the data sources we
> use (i.e. get triggers and relationships from jdbc).
> The quick and dirty way of achieving this is to run a Flink job that calls
> another service to store the required info. Another solution could be to
> add a custom REST service that contains a lot of dependencies already
> provided by Flink, with the risk of having misaligned versions between the
> 2..
> It would be much simpler to enable users to add custom REST services to
> Flink in a configurable file. something like:
> /myservice1/* -> com.example.MyRestService1
> /myservice2/* -> com.example.MyRestService2
>
> The listed classes should be contained in a jar within the Flink lib dir
> and should implement a common interface.
> In order to avoid path collisions with already existing FLINK services,
> the configured path can be further prefixed with some other token (e.g.
> /userapi/*).
>
> What do you think about this? Does it sound reasonable to you?
> Am I the only one that thinks this could be useful for many use cases?
>
> Best,
> Flavio
>


-- 
Best Regards

Jeff Zhang


Re: Enable custom REST APIs in Flink

2020-04-21 Thread Jeff Zhang
I know some users do the same thing in spark. Usually the service run spark
driver side. But flink is different from spark. Spark driver is equal to
flink client + flink job manager. I don't think currently we allow to run
any user code in job manager. So allow running user defined service in job
manager might a big change for flink.



Flavio Pompermaier  于2020年4月21日周二 下午11:06写道:

> In my mind the user API could run everywhere but the simplest thing is to
> make them available in the Job Manager (where the other REST API lives).
> They could become a very simple but powerful way to add valuable services
> to Flink without adding useless complexity to the overall architecture for
> just a few methods.
> I don't know whether Spark or Beam allow you to do something like that but
> IMHO it's super useful (especially from a maintenance point of view wrt the
> overall architecture complexity).
>
> @Oytun indeed we'd like to avoid recompiling everything when a single user
> class (i.e. not related to Flink classes) is modified or added. Glad to see
> that there are other people having the same problem here
>
> On Tue, Apr 21, 2020 at 4:39 PM Jeff Zhang  wrote:
>
>> Hi Flavio,
>>
>> I am curious know where service run, Do you create this service in UDF
>> and run it  in TM ?
>>
>> Flavio Pompermaier  于2020年4月21日周二 下午8:30写道:
>>
>>> Hi to all,
>>> many times it happens that we use Flink as a broker towards the data
>>> layer but we need to be able to get some specific info from the
>>> data sources we use (i.e. get triggers and relationships from jdbc).
>>> The quick and dirty way of achieving this is to run a Flink job that
>>> calls another service to store the required info. Another solution could be
>>> to add a custom REST service that contains a lot of dependencies already
>>> provided by Flink, with the risk of having misaligned versions between the
>>> 2..
>>> It would be much simpler to enable users to add custom REST services to
>>> Flink in a configurable file. something like:
>>> /myservice1/* -> com.example.MyRestService1
>>> /myservice2/* -> com.example.MyRestService2
>>>
>>> The listed classes should be contained in a jar within the Flink lib dir
>>> and should implement a common interface.
>>> In order to avoid path collisions with already existing FLINK services,
>>> the configured path can be further prefixed with some other token (e.g.
>>> /userapi/*).
>>>
>>> What do you think about this? Does it sound reasonable to you?
>>> Am I the only one that thinks this could be useful for many use cases?
>>>
>>> Best,
>>> Flavio
>>>
>>
>>
>> --
>> Best Regards
>>
>> Jeff Zhang
>>
>

-- 
Best Regards

Jeff Zhang


flink-s3-fs-hadoop retry configuration

2020-05-01 Thread Jeff Henrikson

Hello Flink users,

I could use help with three related questions:

1) How can I observe retries in the flink-s3-fs-hadoop connector?

2) How can I tell if flink-s3-fs-hadoop is actually managing to pick up
the hadoop configuration I have provided, as opposed to some separate
default configuration?  My job fails quickly when I read larger or more 
numerous objects from S3.  I conjecture the failure may be related to 
insufficient retries when S3 throttles.


3) What s3 fault recovery approach would you recommend?

Background:

I am having trouble with reliable operation of the flink-s3-fs-hadoop 
connector.   My application sources all its DataStream data from S3, and 
appears to get frequently throttled by s3:


Caused by:
org.apache.flink.streaming.runtime.tasks.AsynchronousException:
Caught exception when processing split: [0]
s3://bucketname/1d/2020/04/15/00-00-00-customers.csv mod@
1586911084000 : 0 + 33554432
. . .
Caused by: java.io.InterruptedIOException: Failed to open
s3a://bucketname/1d/2020/04/15/00-00-00-customers.csv at 0 on
s3a://bucketname/1d/2020/04/15/00-00-00-customers.csv:
com.amazonaws.SdkClientException: Unable to execute HTTP request:
Timeout waiting for connection from pool

The s3 throttling does not seem to trigger retries and so
causes the job to fail.  For troubleshooting purposes, the job stays up
for much longer if I reduce s3 inputs to my job by disabling functionality.

I see in the documentation for hadoop-aws that there are properties
such as fs.s3.maxRetries fs.s3.sleepTimeSeconds for handling retries
within hadoop.

After wrangling with some classpath troubles, I managed to get
flink-shaded-hadoop-2-uber-2.8.3-10.0.jar to parse a set of hadoop
configuration files {core/hdfs/mapred/yarn}-site.xml.  I can confirm
that the cluster parses the configuration by passing invalid xml and
seeing the cluster crash.

The puzzle with which I am now faced is that the configuration for 
retries and timeouts in core-site.xml seems to have no effect on the

application.

I deploy in kubernetes with a custom docker image.  For now, I have
not enabled the zookeeper-based HA.

See below for a frequent stacktrace that I interpret as likely to be
caused by s3 throttling.

Thanks in advance for any help.

Regards,


Jeff Henrikson



2020-04-30 19:35:24
org.apache.flink.runtime.JobException: Recovery is suppressed by 
NoRestartBackoffTimeStrategy
at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110)
at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76)
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186)
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180)
at 
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:484)
at 
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380)
at 
jdk.internal.reflect.GeneratedMethodAccessor66.invoke(Unknown Source)
at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194)
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)

at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
at 
akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)

at akka.actor.Actor.aroundReceive(Actor.scala:517)
at akka.actor.Actor.aroundReceive$(Actor.scala:515)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at

Re: flink-s3-fs-hadoop retry configuration

2020-05-04 Thread Jeff Henrikson

> 2) How can I tell if flink-s3-fs-hadoop is actually managing to pick up
> the hadoop configuration I have provided, as opposed to some separate
> default configuration?

I'm reading the docs and source of flink-fs-hadoop-shaded.  I see that 
core-default-shaded.xml has fs.s3a.connection.maximum set to 15.  I have 
around 20 different DataStreams being instantiated from S3, so if they 
each require one connection to be healthy, then 15 is definitely not a 
good value.


However, I seem to be unable to override fs.s3a.connection.maximum using 
my core-site.xml.  I am also unable to see the DEBUG level messages for 
the shaded flink-fs-hadoop-shaded if I set log4j.rootLogger=DEBUG.


So now I'm wondering:

1) Anybody know how to see DEBUG output for flink-fs-hadoop-shaded?

2) Am I going to end up rebuilding flink-fs-hadoop-shaded to
override the config?


Thanks in advance,


Jeff Henrikson



https://github.com/apache/flink/tree/master/flink-filesystems/flink-fs-hadoop-shaded

https://github.com/apache/flink/blob/master/flink-filesystems/flink-fs-hadoop-shaded/src/main/resources/core-default-shaded.xml

  
fs.s3a.connection.maximum
15
Controls the maximum number of simultaneous 
connections to S3.

  




On 5/1/20 7:30 PM, Jeff Henrikson wrote:

Hello Flink users,

I could use help with three related questions:

1) How can I observe retries in the flink-s3-fs-hadoop connector?

2) How can I tell if flink-s3-fs-hadoop is actually managing to pick up
the hadoop configuration I have provided, as opposed to some separate
default configuration?  My job fails quickly when I read larger or more 
numerous objects from S3.  I conjecture the failure may be related to 
insufficient retries when S3 throttles.


3) What s3 fault recovery approach would you recommend?

Background:

I am having trouble with reliable operation of the flink-s3-fs-hadoop 
connector.   My application sources all its DataStream data from S3, and 
appears to get frequently throttled by s3:


     Caused by:
     org.apache.flink.streaming.runtime.tasks.AsynchronousException:
     Caught exception when processing split: [0]
     s3://bucketname/1d/2020/04/15/00-00-00-customers.csv mod@
     1586911084000 : 0 + 33554432
     . . .
     Caused by: java.io.InterruptedIOException: Failed to open
     s3a://bucketname/1d/2020/04/15/00-00-00-customers.csv at 0 on
     s3a://bucketname/1d/2020/04/15/00-00-00-customers.csv:
     com.amazonaws.SdkClientException: Unable to execute HTTP request:
     Timeout waiting for connection from pool

The s3 throttling does not seem to trigger retries and so
causes the job to fail.  For troubleshooting purposes, the job stays up
for much longer if I reduce s3 inputs to my job by disabling functionality.

I see in the documentation for hadoop-aws that there are properties
such as fs.s3.maxRetries fs.s3.sleepTimeSeconds for handling retries
within hadoop.

After wrangling with some classpath troubles, I managed to get
flink-shaded-hadoop-2-uber-2.8.3-10.0.jar to parse a set of hadoop
configuration files {core/hdfs/mapred/yarn}-site.xml.  I can confirm
that the cluster parses the configuration by passing invalid xml and
seeing the cluster crash.

The puzzle with which I am now faced is that the configuration for 
retries and timeouts in core-site.xml seems to have no effect on the

application.

I deploy in kubernetes with a custom docker image.  For now, I have
not enabled the zookeeper-based HA.

See below for a frequent stacktrace that I interpret as likely to be
caused by s3 throttling.

Thanks in advance for any help.

Regards,


Jeff Henrikson



     2020-04-30 19:35:24
     org.apache.flink.runtime.JobException: Recovery is suppressed by 
NoRestartBackoffTimeStrategy
     at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110) 

     at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76) 

     at 
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192) 

     at 
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186) 

     at 
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180) 

     at 
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:484) 

     at 
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380) 

     at 
jdk.internal.reflect.GeneratedMethodAccessor66.invoke(Unknown Source)
     at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 


     at java.base/java.lang.reflect.Method.invoke(Method.java:566)
     at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.

Overriding hadoop core-site.xml keys using the flink-fs-hadoop-shaded assemblies

2020-05-05 Thread Jeff Henrikson
Has anyone had success overriding hadoop core-site.xml keys using the 
flink-fs-hadoop-shaded assemblies?  If so, what versions were known to work?


Using btrace, I am seeing a bug in the hadoop shaded dependencies 
distributed with 1.10.0.  Some (but not all) of the core-site.xml keys 
cannot be overridden.


Thanks,


Jeff Henrikson


Re: What's the best practice to determine whether a job has finished or not?

2020-05-08 Thread Jeff Zhang
I use JobListener#onJobExecuted to be notified that the flink job is done.
It is pretty reliable for me, the only exception is the client process is
down.

BTW, the reason you see ApplicationNotFound exception is that yarn app is
terminated which means the flink cluster is shutdown. While for standalone
mode, the flink cluster is always up.


Caizhi Weng  于2020年5月8日周五 下午2:47写道:

> Hi dear Flink community,
>
> I would like to determine whether a job has finished (no matter
> successfully or exceptionally) in my code.
>
> I used to think that JobClient#getJobStatus is a good idea, but I found
> that it behaves quite differently under different executing environments.
> For example, under a standalone session cluster it will return the FINISHED
> status for a finished job, while under a yarn per job cluster it will throw
> a ApplicationNotFound exception. I'm afraid that there might be other
> behaviors for other environments.
>
> So what's the best practice to determine whether a job has finished or
> not? Note that I'm not waiting for the job to finish. If the job hasn't
> finished I would like to know it and do something else.
>


-- 
Best Regards

Jeff Zhang


Re: Table Environment for Remote Execution

2020-06-03 Thread Jeff Zhang
Hi Satyam,

I also meet the same issue when I integrate flink with zeppelin. Here's
what I did.

https://github.com/apache/zeppelin/blob/master/flink/src/main/java/org/apache/zeppelin/flink/TableEnvFactory.java#L226

If you are interested in flink on zeppelin, you can refer the following
blogs and videos.

Flink on Zeppelin video
https://www.youtube.com/watch?v=YxPo0Fosjjg&list=PL4oy12nnS7FFtg3KV1iS5vDb0pTz12VcX

Flink on Zeppelin tutorial blogs: 1) Get started
https://link.medium.com/oppqD6dIg5
<https://www.youtube.com/redirect?q=https%3A%2F%2Flink.medium.com%2FoppqD6dIg5&redir_token=qt__OhcVRsf8bhBrj4t08HvKm_l8MTU5MTMyNTYwMkAxNTkxMjM5MjAy&v=YxPo0Fosjjg&event=video_description>
2) Batch https://link.medium.com/3qumbwRIg5
<https://www.youtube.com/redirect?q=https%3A%2F%2Flink.medium.com%2F3qumbwRIg5&redir_token=qt__OhcVRsf8bhBrj4t08HvKm_l8MTU5MTMyNTYwMkAxNTkxMjM5MjAy&v=YxPo0Fosjjg&event=video_description>
3) Streaming https://link.medium.com/RBHa2lTIg5
<https://www.youtube.com/redirect?q=https%3A%2F%2Flink.medium.com%2FRBHa2lTIg5&redir_token=qt__OhcVRsf8bhBrj4t08HvKm_l8MTU5MTMyNTYwMkAxNTkxMjM5MjAy&v=YxPo0Fosjjg&event=video_description>
4) Advanced usage https://link.medium.com/CAekyoXIg5
<https://www.youtube.com/redirect?q=https%3A%2F%2Flink.medium.com%2FCAekyoXIg5&redir_token=qt__OhcVRsf8bhBrj4t08HvKm_l8MTU5MTMyNTYwMkAxNTkxMjM5MjAy&v=YxPo0Fosjjg&event=video_description>



Satyam Shekhar  于2020年6月4日周四 上午2:27写道:

>
> Thanks, Jark & Godfrey.
>
> The workaround was successful.
>
> I have created the following ticket to track the issue -
> https://issues.apache.org/jira/browse/FLINK-18095
>
> Regards,
> Satyam
>
> On Wed, Jun 3, 2020 at 3:26 AM Jark Wu  wrote:
>
>> Hi Satyam,
>>
>> In the long term, TableEnvironment is the entry point for pure Table/SQL
>> users. So it should have all the ability of StreamExecutionEnvironment.
>> I think remote execution is a reasonable feature, could you create an
>> JIRA issue for this?
>>
>> As a workaround, you can construct `StreamTableEnvironmentImpl` by
>> yourself via constructor, it can support batch mode
>> and StreamExecutionEnvironment.
>>
>> Best,
>> Jark
>>
>>
>> On Wed, 3 Jun 2020 at 16:35, Satyam Shekhar 
>> wrote:
>>
>>> Thanks for the reply, Godfrey.
>>>
>>> I would also love to learn the reasoning behind that limitation.
>>>
>>> For more context, I am building a Java application that receives some
>>> user input via a GRPC service. The user's input is translated to some SQL
>>> that may be executed in streaming or batch mode based on custom business
>>> logic and submitted it to Flink for execution. In my current setup, I
>>> create an ExecutionEnvironment, register sources, and execute the
>>> corresponding SQL. I was able to achieve the desired behavior with
>>> StreamTableEnvironment but it has limitations around supported SQL in batch
>>> mode.
>>>
>>> While invoking the CLI from java program might be a solution, it doesn't
>>> feel like the most natural solution for the problem. Are there other ways
>>> to address this?
>>>
>>> Regards,
>>> Satyam
>>>
>>> On Wed, Jun 3, 2020 at 12:50 AM godfrey he  wrote:
>>>
>>>> Hi Satyam,
>>>>
>>>> for blink batch mode, only TableEnvironment can be used,
>>>> and TableEnvironment do not take StreamExecutionEnvironment as argument.
>>>> Instead StreamExecutionEnvironment instance is created internally.
>>>>
>>>> back to your requirement, you can build your table program as user jar,
>>>> and submit the job through flink cli [1] to remote environment.
>>>>
>>>> Bests,
>>>> Godfrey
>>>>
>>>> [1] https://ci.apache.org/projects/flink/flink-docs-stable/ops/cli.html
>>>>
>>>>
>>>>
>>>> Satyam Shekhar  于2020年6月3日周三 下午2:59写道:
>>>>
>>>>> Hello,
>>>>>
>>>>> I am running into a very basic problem while working with Table API. I
>>>>> wish to create a TableEnvironment connected to a remote environment that
>>>>> uses Blink planner in batch mode. Examples and documentation I have come
>>>>> across so far recommend the following pattern to create such an 
>>>>> environment
>>>>> -
>>>>>
>>>>> var settings = EnvironmentSettings.newInstance()
>>>>>   .useBlinkPlanner()
>>>>>   .inBatchMode()
>>>>>   .build();
>>>>> var tEnv = TableEnvironment.create(settings);
>>>>>
>>>>> The above configuration, however, does not connect to a remote
>>>>> environment. Tracing code in TableEnvironment.java, I see the
>>>>> following method in BlinkExecutorFactory.java that appears to
>>>>> relevant -
>>>>>
>>>>> Executor create(Map, StreamExecutionEnvironment);
>>>>>
>>>>> However, it seems to be only accessible through the Scala bridge. I
>>>>> can't seem to find a way to instantiate a TableEnvironment that takes
>>>>> StreamExecutionEnvironment as an argument. How do I achieve that?
>>>>>
>>>>> Regards,
>>>>> Satyam
>>>>>
>>>>

-- 
Best Regards

Jeff Zhang


Re: Run command after Batch is finished

2020-06-05 Thread Jeff Zhang
You can try JobListener which you can register to ExecutionEnvironment.

https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/execution/JobListener.java


Mark Davis  于2020年6月6日周六 上午12:00写道:

> Hi there,
>
> I am running a Batch job with several outputs.
> Is there a way to run some code(e.g. release a distributed lock) after all
> outputs are finished?
>
> Currently I do this in a try-finally block around
> ExecutionEnvironment.execute() call, but I have to switch to the detached
> execution mode - in this mode the finally block is never run.
>
> Thank you!
>
>   Mark
>


-- 
Best Regards

Jeff Zhang


Re: Run command after Batch is finished

2020-06-06 Thread Jeff Zhang
It would run in the client side where ExecutionEnvironment is created.

Mark Davis  于2020年6月6日周六 下午8:14写道:

> Hi Jeff,
>
> Thank you very much! That is exactly what I need.
>
> Where the listener code will run in the cluster deployment(YARN, k8s)?
> Will it be sent over the network?
>
> Thank you!
>
>   Mark
>
> ‐‐‐ Original Message ‐‐‐
> On Friday, June 5, 2020 6:13 PM, Jeff Zhang  wrote:
>
> You can try JobListener which you can register to ExecutionEnvironment.
>
>
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/execution/JobListener.java
>
>
> Mark Davis  于2020年6月6日周六 上午12:00写道:
>
>> Hi there,
>>
>> I am running a Batch job with several outputs.
>> Is there a way to run some code(e.g. release a distributed lock) after
>> all outputs are finished?
>>
>> Currently I do this in a try-finally block around
>> ExecutionEnvironment.execute() call, but I have to switch to the detached
>> execution mode - in this mode the finally block is never run.
>>
>> Thank you!
>>
>>   Mark
>>
>
>
> --
> Best Regards
>
> Jeff Zhang
>
>
>

-- 
Best Regards

Jeff Zhang


Re: pyflink数据查询

2020-06-09 Thread Jeff Zhang
可以用zeppelin的z.show 来查询job结果。这边有pyflink在zeppelin上的入门教程
https://www.bilibili.com/video/BV1Te411W73b?p=20
可以加入钉钉群讨论:30022475



jack  于2020年6月9日周二 下午5:28写道:

> 问题请教:
> 描述: pyflink 从source通过sql对数据进行查询聚合等操作
> 不输出到sink中,而是可以直接作为结果,我这边可以通过开发web接口直接查询这个结果,不必去sink中进行查询。
>
> flink能否实现这样的方式?
> 感谢
>


-- 
Best Regards

Jeff Zhang


Re: flink-s3-fs-hadoop retry configuration

2020-06-17 Thread Jeff Henrikson

Robert,

Thanks for the tip!

Before you replied, I did figure out to put the keys in flink-conf.yaml, 
using btrace.  I instrumented the methods 
org.apache.hadoop.conf.Configuration.get for the keys, and 
org.apache.hadoop.conf.Configuration.substituteVars for effective 
values.  (There is a btrace bug where you can't just observe the return 
value from .get directly.)


I did not see in the code any way to observe the effective configuration 
using logging.


Regards,


Jeff



On 5/8/20 7:29 AM, Robert Metzger wrote:

I validated my assumption. Putting

s3.connection.maximum: 123456

into the flink-conf.yaml file results in the following DEBUG log output:

2020-05-08 16:20:47,461 DEBUG 
org.apache.flink.fs.s3hadoop.common.HadoopConfigLoader       [] - Adding 
Flink config entry for s3.connection.maximum as 
fs.s3a.connection.maximum to Hadoop config


I guess that is the recommended way of passing configuration into the S3 
connectors of Flink.


You also asked how to detect retries: DEBUG-log level is helpful again. 
I just tried connecting against an invalid port, and got these messages:


2020-05-08 16:26:37,671 DEBUG 
org.apache.http.impl.conn.DefaultManagedHttpClientConnection [] - 
http-outgoing-7: Shutdown connection
2020-05-08 16:26:37,671 DEBUG 
org.apache.http.impl.execchain.MainClientExec                [] - 
Connection discarded
2020-05-08 16:26:37,671 DEBUG 
org.apache.http.impl.conn.PoolingHttpClientConnectionManager [] - 
Connection released: [id: 7][route: {}->http://127.0.0.1:9000][total 
kept alive: 0; route allocated: 0 of 123456; total allocated: 0 of 123456]
2020-05-08 16:26:37,671 DEBUG com.amazonaws.request 
                    [] - Retrying Request: HEAD http://127.0.0.1:9000 
/test/ Headers: (User-Agent: Hadoop 3.1.0, aws-sdk-java/1.11.271 
Mac_OS_X/10.15.3 OpenJDK_64-Bit_Server_VM/25.252-b09 java/1.8.0_252 
scala/2.11.12, amz-sdk-invocation-id: 
051f9877-1c22-00ed-ad26-8361bcf14b98, Content-Type: 
application/octet-stream, )
2020-05-08 16:26:37,671 DEBUG com.amazonaws.http.AmazonHttpClient   
                    [] - Retriable error detected, will retry in 4226ms, 
attempt number: 7



maybe it makes sense to set the log level only for 
"com.amazonaws.http.AmazonHttpClient" to DEBUG.


How to configure the log level depends on the deployment method. 
Usually, its done by replacing the first INFO with DEBUG in 
conf/log4j.properties. ("rootLogger.level = DEBUG")



Best,
Robert

On Fri, May 8, 2020 at 3:51 PM Robert Metzger <mailto:rmetz...@apache.org>> wrote:


Hey Jeff,

Which Flink version are you using?
Have you tried configuring the S3 filesystem via Flink's  config
yaml? Afaik all config parameters prefixed with "s3." are mirrored
into the Hadoop file system connector.


On Mon, May 4, 2020 at 8:45 PM Jeff Henrikson mailto:jehenri...@gmail.com>> wrote:

  > 2) How can I tell if flink-s3-fs-hadoop is actually managing
to pick up
  > the hadoop configuration I have provided, as opposed to some
separate
  > default configuration?

I'm reading the docs and source of flink-fs-hadoop-shaded.  I
see that
core-default-shaded.xml has fs.s3a.connection.maximum set to
15.  I have
around 20 different DataStreams being instantiated from S3, so
if they
each require one connection to be healthy, then 15 is definitely
not a
good value.

However, I seem to be unable to override
fs.s3a.connection.maximum using
my core-site.xml.  I am also unable to see the DEBUG level
messages for
the shaded flink-fs-hadoop-shaded if I set log4j.rootLogger=DEBUG.

So now I'm wondering:

      1) Anybody know how to see DEBUG output for
flink-fs-hadoop-shaded?

      2) Am I going to end up rebuilding flink-fs-hadoop-shaded to
      override the config?


Thanks in advance,


Jeff Henrikson




https://github.com/apache/flink/tree/master/flink-filesystems/flink-fs-hadoop-shaded


https://github.com/apache/flink/blob/master/flink-filesystems/flink-fs-hadoop-shaded/src/main/resources/core-default-shaded.xml

    
      fs.s3a.connection.maximum
      15
      Controls the maximum number of simultaneous
connections to S3.
    




On 5/1/20 7:30 PM, Jeff Henrikson wrote:
 > Hello Flink users,
 >
 > I could use help with three related questions:
 >
 > 1) How can I observe retries in the flink-s3-fs-hadoop connector?
 >
 > 2) How can I tell if flink-s3-fs-hadoop is actually managing
to pick up
 > the hadoop configuration I have provided, as opposed to some
separate
 > default configuration?  My job

Trouble with large state

2020-06-17 Thread Jeff Henrikson

Hello Flink users,

I have an application of around 10 enrichment joins.  All events are 
read from kafka and have event timestamps.  The joins are built using 
.cogroup, with a global window, triggering on every 1 event, plus a 
custom evictor that drops records once a newer record for the same ID 
has been processed.  Deletes are represented by empty events with 
timestamp and ID (tombstones). That way, we can drop records when 
business logic dictates, as opposed to when a maximum retention has been 
attained.  The application runs RocksDBStateBackend, on Kubernetes on 
AWS with local SSDs.


Unit tests show that the joins produce expected results.  On an 8 node 
cluster, watermark output progress seems to indicate I should be able to 
bootstrap my state of around 500GB in around 1 day.  I am able to save 
and restore savepoints for the first half an hour of run time.


My current trouble is that after around 50GB of state, I stop being able 
to reliably take checkpoints or savepoints.  Some time after that, I 
start getting a variety of failures where the first suspicious log event 
is a generic cluster connectivity error, such as:


1) java.io.IOException: Connecting the channel failed: Connecting
to remote task manager + '/10.67.7.101:38955' has failed. This
might indicate that the remote task manager has been lost.

2) org.apache.flink.runtime.io.network.netty.exception
.RemoteTransportException: Connection unexpectedly closed by remote
task manager 'null'. This might indicate that the remote task
manager was lost.

3) Association with remote system
[akka.tcp://flink@10.67.6.66:34987] has failed, address is now
gated for [50] ms. Reason: [Association failed with
[akka.tcp://flink@10.67.6.66:34987]] Caused by:
[java.net.NoRouteToHostException: No route to host]

I don't see any obvious out of memory errors on the TaskManager UI.

Adding nodes to the cluster does not seem to increase the maximum 
savable state size.


I could enable HA, but for the time being I have been leaving it out to 
avoid the possibility of masking deterministic faults.


Below are my configurations.

Thanks in advance for any advice.

Regards,


Jeff Henrikson



Flink version: 1.10

Configuration set via code:
parallelism=8
maxParallelism=64
setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE)
setTolerableCheckpointFailureNumber(1000)
setMaxConcurrentCheckpoints(1)

enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
RocksDBStateBackend
setPredefinedOptions(PredefinedOptions.FLASH_SSD_OPTIMIZED)
setNumberOfTransferThreads(25)
setDbStoragePath points to a local nvme SSD

Configuration in flink-conf.yaml:

jobmanager.rpc.address: localhost
jobmanager.rpc.port: 6123
jobmanager.heap.size: 28000m
taskmanager.memory.process.size: 28000m
taskmanager.memory.jvm-metaspace.size: 512m
taskmanager.numberOfTaskSlots: 1
parallelism.default: 1
jobmanager.execution.failover-strategy: full

cluster.evenly-spread-out-slots: false

taskmanager.memory.network.fraction: 0.2   # default 0.1
taskmanager.memory.framework.off-heap.size: 2GB
taskmanager.memory.task.off-heap.size: 2GB
taskmanager.network.memory.buffers-per-channel: 32 # default 2
taskmanager.memory.managed.fraction: 0.4   # docs say 
default 0.1, but something seems to set 0.4

taskmanager.memory.task.off-heap.size: 2048MB  # default 128M

state.backend.fs.memory-threshold: 1048576
state.backend.fs.write-buffer-size: 1024
state.backend.local-recovery: true
state.backend.rocksdb.writebuffer.size: 64MB
state.backend.rocksdb.writebuffer.count: 8
state.backend.rocksdb.writebuffer.number-to-merge: 4
state.backend.rocksdb.timer-service.factory: heap
state.backend.rocksdb.block.cache-size: 6400 # default 8MB
state.backend.rocksdb.write-batch-size: 1600 # default 2MB

web.checkpoints.history: 250


Re: Trouble with large state

2020-06-18 Thread Jeff Henrikson

Hi Yun,

Thanks for your thoughts.  Answers to your questions:

>  1. "after around 50GB of state, I stop being able to reliably take
> checkpoints or savepoints. "
> What is the exact reason that job cannot complete checkpoint?
> Expired before completing or decline by some tasks? The former one
> is manly caused by high back-pressure and the later one is mainly
> due to some internal error.

In the UI, under Job | Checkpoints | History, then opening the 
checkpoint detail, the checkpoints fail by not some operators not 
acknowledging.  It's always a subset of of the larger state operators 
that stop acknowledging.  The exact selection of operators that stop is 
nondeterministic.  The checkpoints frequently fail before any timeout 
that I impose on them.


>  2. Have you checked what reason the remote task manager is lost?
> If the remote task manager is not crashed, it might be due to GC
> impact, I think you might need to check task-manager logs and GC 
logs.


The only general pattern I have observed is:

1) Some taskmanager A throws one of the various connectivity
exceptions I listed complaining about another taskmanager B.
2) Taskmanager B shows no obvious error other than complaining
that taskmanager A has disconnected from it.

Regards,


Jeff Henrikson



On 6/17/20 9:52 PM, Yun Tang wrote:

Hi Jeff

 1. "after around 50GB of state, I stop being able to reliably take
checkpoints or savepoints. "
What is the exact reason that job cannot complete checkpoint?
Expired before completing or decline by some tasks? The former one
is manly caused by high back-pressure and the later one is mainly
due to some internal error.
 2. Have you checked what reason the remote task manager is lost?
If the remote task manager is not crashed, it might be due to GC
impact, I think you might need to check task-manager logs and GC logs.

Best
Yun Tang
----
*From:* Jeff Henrikson 
*Sent:* Thursday, June 18, 2020 1:46
*To:* user 
*Subject:* Trouble with large state
Hello Flink users,

I have an application of around 10 enrichment joins.  All events are
read from kafka and have event timestamps.  The joins are built using
.cogroup, with a global window, triggering on every 1 event, plus a
custom evictor that drops records once a newer record for the same ID
has been processed.  Deletes are represented by empty events with
timestamp and ID (tombstones). That way, we can drop records when
business logic dictates, as opposed to when a maximum retention has been
attained.  The application runs RocksDBStateBackend, on Kubernetes on
AWS with local SSDs.

Unit tests show that the joins produce expected results.  On an 8 node
cluster, watermark output progress seems to indicate I should be able to
bootstrap my state of around 500GB in around 1 day.  I am able to save
and restore savepoints for the first half an hour of run time.

My current trouble is that after around 50GB of state, I stop being able
to reliably take checkpoints or savepoints.  Some time after that, I
start getting a variety of failures where the first suspicious log event
is a generic cluster connectivity error, such as:

  1) java.io.IOException: Connecting the channel failed: Connecting
  to remote task manager + '/10.67.7.101:38955' has failed. This
  might indicate that the remote task manager has been lost.

  2) org.apache.flink.runtime.io.network.netty.exception
  .RemoteTransportException: Connection unexpectedly closed by remote
  task manager 'null'. This might indicate that the remote task
  manager was lost.

  3) Association with remote system
  [akka.tcp://flink@10.67.6.66:34987] has failed, address is now
  gated for [50] ms. Reason: [Association failed with
  [akka.tcp://flink@10.67.6.66:34987]] Caused by:
  [java.net.NoRouteToHostException: No route to host]

I don't see any obvious out of memory errors on the TaskManager UI.

Adding nodes to the cluster does not seem to increase the maximum
savable state size.

I could enable HA, but for the time being I have been leaving it out to
avoid the possibility of masking deterministic faults.

Below are my configurations.

Thanks in advance for any advice.

Regards,


Jeff Henrikson



Flink version: 1.10

Configuration set via code:
  parallelism=8
  maxParallelism=64
  setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
  setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE)
  setTolerableCheckpointFailureNumber(1000)
  setMaxConcurrentCheckpoints(1)

enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
  RocksDBStateBackend
  setPredefinedOptions(PredefinedOptions.FLASH_SSD_OPTIMIZED)
  setNumberOfTransferThreads(25)
  setDbStoragePath points to a local nvme SS

Re: Trouble with large state

2020-06-18 Thread Jeff Henrikson

Vijay,

Thanks for your thoughts.  Below are answers to your questions.

> 1. What's your checkpoint interval?

I have used many different checkpoint intervals, ranging from 5 minutes 
to never.  I usually setMinPasueBetweenCheckpoints to the same value as 
the checkpoint interval.


> 2. How frequently are you updating the state into RocksDB?

My understanding is that for .cogroup:

  - Triggers control communication outside the operator
  - Evictors control cleanup of internal state
  - Configurations like write buffer size control the frequency of 
state change at the storage layer
  - There is no control for how frequently the window state updates at 
the layer of the RocksDB api layer.


Thus, the state update whenever data is ingested.

> 3. How many task managers are you using?

Usually I have been running with one slot per taskmanager.  28GB of 
usable ram on each node.


> 4. How much data each task manager handles while taking the checkpoint?

Funny you should ask.  I would be okay with zero.

The application I am replacing has a latency of 36-48 hours, so if I had 
to fully stop processing to take every snapshot synchronously, it might 
be seen as totally acceptable, especially for initial bootstrap.  Also, 
the velocity of running this backfill is approximately 115x real time on 
8 nodes, so the steady-state run may not exhibit the failure mode in 
question at all.


It has come as some frustration to me that, in the case of 
RocksDBStateBackend, the configuration key state.backend.async 
effectively has no meaningful way to be false.


The only way I have found in the existing code to get a behavior like 
synchronous snapshot is to POST to /jobs//stop with drain=false 
and a URL.  This method of failing fast is the way that I discovered 
that I needed to increase transfer threads from the default.


The reason I don't just run the whole backfill and then take one 
snapshot is that even in the absence of checkpoints, a very similar 
congestion seems to take the cluster down when I am say 20-30% of the 
way through my backfill.


Reloading from my largest feasible snapshot makes it possible to make 
another snapshot a bit larger before crash, but not by much.


On first glance, the code change to allow RocksDBStateBackend into a 
synchronous snapshots mode looks pretty easy.  Nevertheless, I was 
hoping to do the initial launch of my application without needing to 
modify the framework.


Regards,


Jeff Henrikson


On 6/18/20 7:28 AM, Vijay Bhaskar wrote:

For me this seems to be an IO bottleneck at your task manager.
I have a couple of queries:
1. What's your checkpoint interval?
2. How frequently are you updating the state into RocksDB?
3. How many task managers are you using?
4. How much data each task manager handles while taking the checkpoint?

For points (3) and (4) , you should be very careful. I feel you are 
stuck at this.
You try to scale vertically by increasing more CPU and memory for each 
task manager.

If not, try to scale horizontally so that each task manager IO gets reduces
Apart from that check is there any bottleneck with the file system.

Regards
Bhaskar





On Thu, Jun 18, 2020 at 5:12 PM Timothy Victor <mailto:vict...@gmail.com>> wrote:


I had a similar problem.   I ended up solving by not relying on
checkpoints for recovery and instead re-read my input sources (in my
case a kafka topic) from the earliest offset and rebuilding only the
state I need.  I only need to care about the past 1 to 2 days of
state so can afford to drop anything older.   My recovery time went
from over an hour for just the first checkpoint to under 10 minutes.

Tim

On Wed, Jun 17, 2020, 11:52 PM Yun Tang mailto:myas...@live.com>> wrote:

Hi Jeff

 1. "after around 50GB of state, I stop being able to reliably
take checkpoints or savepoints. "
What is the exact reason that job cannot complete
checkpoint? Expired before completing or decline by some
tasks? The former one is manly caused by high back-pressure
and the later one is mainly due to some internal error.
 2. Have you checked what reason the remote task manager is lost?
If the remote task manager is not crashed, it might be due
to GC impact, I think you might need to check task-manager
logs and GC logs.

Best
Yun Tang
----
*From:* Jeff Henrikson mailto:jehenri...@gmail.com>>
*Sent:* Thursday, June 18, 2020 1:46
*To:* user mailto:user@flink.apache.org>>
*Subject:* Trouble with large state
Hello Flink users,

I have an application of around 10 enrichment joins.  All events
are
read from kafka and have event timestamps.  The joins are built
using
.co

Re: Trouble with large state

2020-06-19 Thread Jeff Henrikson

Bhaskar,

Thank you for your thoughtful points.

> I want to discuss more on points (1) and (2)
> If we take care of them  rest will be good
>
> Coming to (1)
>
> Please try to give reasonable checkpoint interval time for every job.
> Minum checkpoint interval recommended by flink community is 3 minutes
> I thin you should give minimum 3 minutes checkpoint interval for all

I have spent very little time testing with checkpoint intervals of under 
3 minutes.  I frequently test with intervals of 5 minutes and of 30 
minutes.  I also test with checkpoint intervals such as 60 minutes, and 
never (manual only).  In terms of which exceptions get thrown, I don't 
see much difference between 5/30/60, I don't see a lot of difference.


Infinity (no checkpoint internal) seems to be an interesting value, 
because before crashing, it seems to process around twice as much state 
as with any finite checkpoint interval.  The largest savepoints I have 
captured have been manually triggered using the /job/:jobid/stop REST 
API.  I think it helps for the snapshot to be synchronous.


One curiosity about the /job/:jobid/stop command is that from time of 
the command, it often takes many minutes for the internal processing to 
stop.


Another curiosity about /job/:jobid/stop command is that sometimes 
following a completed savepoint, the cluster goes back to running!


> Coming to (2)
>
> What's your input data rate?

My application involves what I will call "main" events that are enriched 
by "secondary" events.  While the secondary events have several 
different input streams, data types, and join keys, I will estimate the 
secondary events all together.  My estimate for input rate is as follows:


50M "main" events
50 secondary events for each main event, for a
total of around 2.5B input events
8 nodes
20 hours

Combining these figures, we can estimate:

5000*50/8/20/3600 = 4340 events/second/node

I don't see how to act on your advice for (2).  Maybe your idea is that 
during backfill/bootstrap, I artificially throttle the inputs to my 
application?


100% of my application state is due to .cogroup, which manages a 
HeapListState on its own.  I cannot think of any controls for changing 
how .cogroup handles internal state per se.  I will paste below the 
Flink code path that .cogroup uses to update its internal state when it 
runs my application.


The only control I can think of with .cogroup that indirectly impacts 
internal state is delayed triggering.


Currently I use a trigger on every event, which I understand creates a 
suboptimal number of events.  I previously experimented with delayed 
triggering, but I did not get good results.


Just now I tried again ContinuousProcessingTimeTrigger of 30 seconds, 
with rocksdb.timer-service.factory: heap, and a 5 minute checkpoint 
interval.  The first checkpoint failed, which has been rare when I use 
all the same parameters except for triggering on every event.  So it 
looks worse not better.


Thanks again,


Jeff Henrikson




On 6/18/20 11:21 PM, Vijay Bhaskar wrote:

Thanks for the reply.
I want to discuss more on points (1) and (2)
If we take care of them  rest will be good

Coming to (1)

Please try to give reasonable checkpoint interval time for every job. 
Minum checkpoint interval recommended by flink community is 3 minutes

I thin you should give minimum 3 minutes checkpoint interval for all

Coming to (2)

What's your input data rate?
For example you are seeing data at 100 msg/sec, For each message if 
there is state changing and you are updating the state with RocksDB, 
it's going to
create 100 rows in 1 second at RocksDb end, On the average if 50 records 
have changed each second, even if you are using RocksDB 
differentialstate = true,
there is no use. Because everytime 50% is new rows getting added. So the 
best bet is to update records with RocksDB only once in your checkpoint 
interval.
Suppose your checkpoint interval is 5 minutes. If you update RocksDB 
state once in 5 minutes, then the rate at which new records added to 
RocksDB  will be 1 record/5min.
Whereas in your original scenario, 3 records added to rocksDB in 5 
min. You can save 1:3 ratio of records in addition to RocksDB. Which 
will save a huge
redundant size addition to RocksDB. Ultimately your  state is driven by 
your checkpoint interval. From the input source you will go back 5 min 
back and read the state, similarly from RocksDB side
also you can have a state update once in 5 min should work. Otherwise 
even if you add state there is no use.


Regards
Bhaskar

Try to update your RocksDB state in an interval equal to the checkpoint 
interval. Otherwise in my case many times what's observed is

state size grows unnecessarily.

On Fri, Jun 19, 2020 at 12:42 AM Jeff Henrikson <mailto:jehenri...@gmail.com>> wrote:


Vijay,

Thanks for yo

Re: Trouble with large state

2020-06-19 Thread Jeff Henrikson

Bhaskar,

Based on your idea of limiting input to get better checkpoint behavior, 
I made a ProcessFunction that constraints to a number of events per 
second per slot per input.  I do need to do some stateless input 
scanning before joins.  The stateless part needs to be fast and does no 
impact snapshots.  So I inserted the throttling after the input 
preprocessing but before the stateful transformations.  There is a 
significant difference of snapshot throughput (often 5x or larger) when 
I change the throttle between 200 and 300 events per second (per slot 
per input).


Hope the throttling keeps being effective as I keep the job running longer.

Odd.  But likely a very effective way out of my problem.

I wonder what drives it . . .  Thread contention?  IOPS contention?

See ProcessFunction code below.

Many thanks!


Jeff



import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.util.Collector

// Set eventsPerSecMax to -1 to disable the throttle
// TODO: Actual number of events can be slightly larger
// TODO: Remove pause correlation with system clock

case class Throttler[T](eventsPerSecMax : Double) extends 
ProcessFunction[T,T] {

  var minutePrev = 0
  var numEvents = 0
  def minutes() = {
val ms = System.currentTimeMillis()
(ms / 1000 / 60).toInt
  }
  def increment() = {
val m = minutes()
if(m != minutePrev) {
  numEvents = 0
}
numEvents += 1
  }
  def eps() = {
numEvents/60.0
  }
  override def processElement(x: T, ctx: ProcessFunction[T, T]#Context, 
out: Collector[T]): Unit = {

increment()
if(eventsPerSecMax > 0 && eps() > eventsPerSecMax) {
  Thread.sleep(1000L)
}
out.collect(x)
  }
}

On 6/19/20 9:16 AM, Jeff Henrikson wrote:

Bhaskar,

Thank you for your thoughtful points.

 > I want to discuss more on points (1) and (2)
 > If we take care of them  rest will be good
 >
 > Coming to (1)
 >
 > Please try to give reasonable checkpoint interval time for every job.
 > Minum checkpoint interval recommended by flink community is 3 minutes
 > I thin you should give minimum 3 minutes checkpoint interval for all

I have spent very little time testing with checkpoint intervals of under 
3 minutes.  I frequently test with intervals of 5 minutes and of 30 
minutes.  I also test with checkpoint intervals such as 60 minutes, and 
never (manual only).  In terms of which exceptions get thrown, I don't 
see much difference between 5/30/60, I don't see a lot of difference.


Infinity (no checkpoint internal) seems to be an interesting value, 
because before crashing, it seems to process around twice as much state 
as with any finite checkpoint interval.  The largest savepoints I have 
captured have been manually triggered using the /job/:jobid/stop REST 
API.  I think it helps for the snapshot to be synchronous.


One curiosity about the /job/:jobid/stop command is that from time of 
the command, it often takes many minutes for the internal processing to 
stop.


Another curiosity about /job/:jobid/stop command is that sometimes 
following a completed savepoint, the cluster goes back to running!


 > Coming to (2)
 >
 > What's your input data rate?

My application involves what I will call "main" events that are enriched 
by "secondary" events.  While the secondary events have several 
different input streams, data types, and join keys, I will estimate the 
secondary events all together.  My estimate for input rate is as follows:


     50M "main" events
     50 secondary events for each main event, for a
     total of around 2.5B input events
     8 nodes
     20 hours

Combining these figures, we can estimate:

     5000*50/8/20/3600 = 4340 events/second/node

I don't see how to act on your advice for (2).  Maybe your idea is that 
during backfill/bootstrap, I artificially throttle the inputs to my 
application?


100% of my application state is due to .cogroup, which manages a 
HeapListState on its own.  I cannot think of any controls for changing 
how .cogroup handles internal state per se.  I will paste below the 
Flink code path that .cogroup uses to update its internal state when it 
runs my application.


The only control I can think of with .cogroup that indirectly impacts 
internal state is delayed triggering.


Currently I use a trigger on every event, which I understand creates a 
suboptimal number of events.  I previously experimented with delayed 
triggering, but I did not get good results.


Just now I tried again ContinuousProcessingTimeTrigger of 30 seconds, 
with rocksdb.timer-service.factory: heap, and a 5 minute checkpoint 
interval.  The first checkpoint failed, which has been rare when I use 
all the same parameters except for triggering on every event.  So it 
looks worse not better.


Thanks again,


Jeff Henrikson




On 6/18/20 11:21 PM, Vijay Bhaskar wrote:

Thanks for the reply.
I wa

Re: Trouble with large state

2020-06-20 Thread Jeff Henrikson

Bhaskar,

> Glad to know some progress.

Yeah, some progress.  Yet overnight run didn't look as good as I hoped.

The throttling required to not crash during snapshots seems to be quite 
different from the throttling required to crash not during snapshots. 
So the lowest common denominator is quite a large performance penalty.


What's worse, the rate of input that makes the snapshot performance go 
from good to bad seems to change significantly as the state size grows. 
Here is checkpoint history from an overnight run.


Parameters:

- 30 minutes minimum between snapshots
- incremental snapshot mode
- inputs throttled to 100 events per sec per input per slot,
  which is around 1/4 of the unthrottled throughput

Checkpoint history:

	ID	Status	Acknowledged	Trigger Time	Latest Acknowledgement	End to End 
Duration	State Size	Buffered During Alignment

12  COMPLETED   304/304 8:52:22 10:37:181h 44m 55s  
60.5 GB 0 B
11  COMPLETED   304/304 6:47:03 8:22:19 1h 35m 16s  53.3 GB 
0 B
10  COMPLETED   304/304 5:01:20 6:17:00 1h 15m 39s  41.0 GB 
0 B
9   COMPLETED   304/304 3:47:43 4:31:19 43m 35s 34.1 GB 0 B
8   COMPLETED   304/304 2:40:58 3:17:42 36m 43s 27.8 GB 0 B
7   COMPLETED   304/304 1:39:15 2:10:57 31m 42s 23.1 GB 0 B
6   COMPLETED   304/304 0:58:02 1:09:13 11m 11s 17.4 GB 0 B
5   COMPLETED   304/304 0:23:27 0:28:01 4m 33s  14.3 GB 0 B
4   COMPLETED   304/304 23:52:2923:53:2656s 
12.7 GB 0 B
3   COMPLETED   304/304 23:20:5923:22:281m 29s  
10.8 GB 0 B
2   COMPLETED   304/304 22:46:1722:50:584m 40s  
7.40 GB 0 B

As you can see, GB/minute varies drastically.  GB/minute also varies 
drastically with full checkpoint mode.


I'm pleased that it hasn't crashed yet.  Yet I'm concerned that with the 
checkpoint GB/minute getting so slow, it will crash soon.


I'm really wishing state.backend.async=false worked for RocksDbStateBackend.

I'm also wondering if my throttler would improve if I just connected to 
the REST api to ask if any checkpoint is in progress, and then paused 
inputs accordingly.  Effectively state.backend.async=false via hacked 
application code.


> Where are you updating your state here? I
> couldn't find any flink managed state here.

The only updates to state I make are through the built-in 
DataStream.cogroup.  A unit test (without RocksDB loaded) of the way I 
use .cogroup shows exactly two ways that .cogroup calls an 
implementation of AppendingState.add.  I summarize those below.


The two AppendingState subclasses invoked are HeapListState and 
HeapReducingState.  Neither have a support attribute on them, such as 
MapState's @PublicEvolving.


> I suggested updating the flink managed state using onTimer over an
> interval equal to the checkpoint interval.

So the onTimer method, with interval set to the checkpoint interval. 
Interesting.


It looks like the closest subclass for my use case use would be either 
KeyedCoProcessFunction.  Let me see if I understand concretely the idea:


1) between checkpoints, read join input and write join output, by 
loading any state reads from external state, but buffering all state 
changes in memory in some kind of data structure.


2) whenever a checkpoint arrived or the memory consumed by buffered 
writes gets too big, flush the writes to state.


Is that the gist of the idea about .onTimer?


Jeff



There are two paths from .coGroup to AppendingState.add

path 1 of 2: .coGroup to HeapListState

add:90, HeapListState {org.apache.flink.runtime.state.heap}
processElement:203, EvictingWindowOperator 
{org.apache.flink.streaming.runtime.operators.windowing}
processElement:164, StreamOneInputProcessor 
{org.apache.flink.streaming.runtime.io}
processInput:143, StreamOneInputProcessor 
{org.apache.flink.streaming.runtime.io}



org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator#processElement

  (windowAssigner is an instance of GlobalWindows)

@Override
	public void processElement(StreamRecord element) 
throws Exception {
		final Collection elementWindows = 
windowAssigner.assignWindows(
element.getValue(), element.getTimestamp(), 
windowAssignerContext);


//if element is handled by none of assigned 
elementWindows
boolean isSkippedElement = true;

final K key = 
this.getKeyedStateBackend().getCurrentKey();

if (windowAssigner instanceof MergingWindowAssigner) {
. . .
} else {
for (W window : elementWindows) {

   

Re: Trouble with large state

2020-06-22 Thread Jeff Henrikson

Bhaskar,

I think I am unstuck.  The performance numbers I sent after throttling 
were due to a one character error in business logic.  I think I now have 
something good enough to work with for now.  I will repost if I 
encounter further unexpected issues.


Adding application-level throttling ends up resolving both my symptom of 
slow/failing checkpoints, and also my symptom of crashes during long runs.


Many thanks!


Jeff


On 6/20/20 11:46 AM, Jeff Henrikson wrote:

Bhaskar,

 > Glad to know some progress.

Yeah, some progress.  Yet overnight run didn't look as good as I hoped.

The throttling required to not crash during snapshots seems to be quite 
different from the throttling required to crash not during snapshots. So 
the lowest common denominator is quite a large performance penalty.


What's worse, the rate of input that makes the snapshot performance go 
from good to bad seems to change significantly as the state size grows. 
Here is checkpoint history from an overnight run.


Parameters:

     - 30 minutes minimum between snapshots
     - incremental snapshot mode
     - inputs throttled to 100 events per sec per input per slot,
   which is around 1/4 of the unthrottled throughput

Checkpoint history:

 ID    Status    Acknowledged    Trigger Time    Latest 
Acknowledgement    End to End Duration    State Size    Buffered During 
Alignment
 12    COMPLETED    304/304    8:52:22    10:37:18    1h 44m 55s
60.5 GB    0 B
 11    COMPLETED    304/304    6:47:03    8:22:19    1h 35m 16s
53.3 GB    0 B
 10    COMPLETED    304/304    5:01:20    6:17:00    1h 15m 39s
41.0 GB    0 B
 9    COMPLETED    304/304    3:47:43    4:31:19    43m 35s    34.1 
GB    0 B
 8    COMPLETED    304/304    2:40:58    3:17:42    36m 43s    27.8 
GB    0 B
 7    COMPLETED    304/304    1:39:15    2:10:57    31m 42s    23.1 
GB    0 B
 6    COMPLETED    304/304    0:58:02    1:09:13    11m 11s    17.4 
GB    0 B
 5    COMPLETED    304/304    0:23:27    0:28:01    4m 33s    14.3 
GB    0 B
 4    COMPLETED    304/304    23:52:29    23:53:26    56s    12.7 
GB    0 B
 3    COMPLETED    304/304    23:20:59    23:22:28    1m 29s    10.8 
GB    0 B
 2    COMPLETED    304/304    22:46:17    22:50:58    4m 40s    7.40 
GB    0 B


As you can see, GB/minute varies drastically.  GB/minute also varies 
drastically with full checkpoint mode.


I'm pleased that it hasn't crashed yet.  Yet I'm concerned that with the 
checkpoint GB/minute getting so slow, it will crash soon.


I'm really wishing state.backend.async=false worked for 
RocksDbStateBackend.


I'm also wondering if my throttler would improve if I just connected to 
the REST api to ask if any checkpoint is in progress, and then paused 
inputs accordingly.  Effectively state.backend.async=false via hacked 
application code.


 > Where are you updating your state here? I
 > couldn't find any flink managed state here.

The only updates to state I make are through the built-in 
DataStream.cogroup.  A unit test (without RocksDB loaded) of the way I 
use .cogroup shows exactly two ways that .cogroup calls an 
implementation of AppendingState.add.  I summarize those below.


The two AppendingState subclasses invoked are HeapListState and 
HeapReducingState.  Neither have a support attribute on them, such as 
MapState's @PublicEvolving.


 > I suggested updating the flink managed state using onTimer over an
 > interval equal to the checkpoint interval.

So the onTimer method, with interval set to the checkpoint interval. 
Interesting.


It looks like the closest subclass for my use case use would be either 
KeyedCoProcessFunction.  Let me see if I understand concretely the idea:


1) between checkpoints, read join input and write join output, by 
loading any state reads from external state, but buffering all state 
changes in memory in some kind of data structure.


2) whenever a checkpoint arrived or the memory consumed by buffered 
writes gets too big, flush the writes to state.


Is that the gist of the idea about .onTimer?


Jeff



There are two paths from .coGroup to AppendingState.add

     path 1 of 2: .coGroup to HeapListState

     add:90, HeapListState {org.apache.flink.runtime.state.heap}
     processElement:203, EvictingWindowOperator 
{org.apache.flink.streaming.runtime.operators.windowing}
     processElement:164, StreamOneInputProcessor 
{org.apache.flink.streaming.runtime.io}
     processInput:143, StreamOneInputProcessor 
{org.apache.flink.streaming.runtime.io}



org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator#processElement 



   (windowAssigner is an instance of GlobalWindows)

     @Override
     public void processElement(StreamRecord element) 
throws Exception {
     final Collection elementWindows = 
windowAssigner.

Re: Safer handling of Scala immutable collections

2020-10-14 Thread Jeff Zhang
Could you share your code to reproduce it ?

Rex Fenley  于2020年10月15日周四 上午5:54写道:

> Hello,
>
> I've been playing with UDFs using the Scala API and have repeatedly run
> into issues such as this:
> ```
> flink-taskmanager_1| java.lang.ClassCastException:
> scala.collection.immutable.Set$EmptySet$ cannot be cast to [J
> ```
> Is there something that can be done on Flink's end, either to catch these
> errors in type checking or to cast them in a sane manner during runtime?
>
> Thanks!
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>  |
>  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
> <https://www.facebook.com/remindhq>
>


-- 
Best Regards

Jeff Zhang


Can flink 1.9.1 use flink-shaded 2.8.3-1.8.2

2019-10-25 Thread Jeff Zhang
Hi all,

There's no new flink shaded release for flink 1.9, so I'd like to confirm
with you can flink 1.9.1 use flink-shaded 2.8.3-1.8.2 ? Or the flink shaded
is not necessary for flink 1.9 afterwards ?

https://mvnrepository.com/artifact/org.apache.flink/flink-shaded-hadoop2

-- 
Best Regards

Jeff Zhang


Re: Can flink 1.9.1 use flink-shaded 2.8.3-1.8.2

2019-10-25 Thread Jeff Zhang
Thanks Chesnay, is there any document to explain which version of
flink-shaded-hadoop-jar should I use for specific version of flink ?
e.g. The document of flink 1.9 here
https://flink.apache.org/downloads.html#apache-flink-191 point me to
flink-shaded-hadoop-jar 7.0, but the latest version
of flink-shaded-hadoop-jar is 8.0, then when should I
use flink-shaded-hadoop-jar 8.0 ?

Another question is that whether flink-shaded-hadoop-2-uber 2.8.3-7.0 could
be used for hadoop 2.8.5 as well ? I believe so, but want to confirm it.
And if it works for all hadoop 2.8.x, then it may make more sense to omit
the hadoop minor version, e.g. name it as 2.8-7.0, otherwise it may make
user confused.


Chesnay Schepler  于2019年10月25日周五 下午4:21写道:

> If you need hadoop, but the approach outlined here
> <https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/projectsetup/dependencies.html>
> doesn't work for you, then you still need a flink-shaded-hadoop-jar that
> you can download here
> <https://flink.apache.org/downloads.html#apache-flink-191>.
>
> On 25/10/2019 09:54, Jeff Zhang wrote:
>
> Hi all,
>
> There's no new flink shaded release for flink 1.9, so I'd like to confirm
> with you can flink 1.9.1 use flink-shaded 2.8.3-1.8.2 ? Or the flink shaded
> is not necessary for flink 1.9 afterwards ?
>
> https://mvnrepository.com/artifact/org.apache.flink/flink-shaded-hadoop2
>
> --
> Best Regards
>
> Jeff Zhang
>
>
>

-- 
Best Regards

Jeff Zhang


Re: [ANNOUNCE] Apache Flink 1.8.3 released

2019-12-11 Thread Jeff Zhang
Great work, Hequn

Dian Fu  于2019年12月12日周四 下午2:32写道:

> Thanks Hequn for being the release manager and everyone who contributed to
> this release.
>
> Regards,
> Dian
>
> 在 2019年12月12日,下午2:24,Hequn Cheng  写道:
>
> Hi,
>
> The Apache Flink community is very happy to announce the release of Apache
> Flink 1.8.3, which is the third bugfix release for the Apache Flink 1.8
> series.
>
> Apache Flink® is an open-source stream processing framework for
> distributed, high-performing, always-available, and accurate data streaming
> applications.
>
> The release is available for download at:
> https://flink.apache.org/downloads.html
>
> Please check out the release blog post for an overview of the improvements
> for this bugfix release:
> https://flink.apache.org/news/2019/12/11/release-1.8.3.html
>
> The full release notes are available in Jira:
> https://issues.apache.org/jira/projects/FLINK/versions/12346112
>
> We would like to thank all contributors of the Apache Flink community who
> made this release possible!
> Great thanks to @Jincheng as a mentor during this release.
>
> Regards,
> Hequn
>
>
>

-- 
Best Regards

Jeff Zhang


Re: [DISCUSS] have separate Flink distributions with built-in Hive dependencies

2019-12-13 Thread Jeff Zhang
+1, this is definitely necessary for better user experience. Setting up
environment is always painful for many big data tools.



Bowen Li  于2019年12月13日周五 下午5:02写道:

> cc user ML in case anyone want to chime in
>
> On Fri, Dec 13, 2019 at 00:44 Bowen Li  wrote:
>
>> Hi all,
>>
>> I want to propose to have a couple separate Flink distributions with Hive
>> dependencies on specific Hive versions (2.3.4 and 1.2.1). The distributions
>> will be provided to users on Flink download page [1].
>>
>> A few reasons to do this:
>>
>> 1) Flink-Hive integration is important to many many Flink and Hive users
>> in two dimensions:
>>  a) for Flink metadata: HiveCatalog is the only persistent catalog to
>> manage Flink tables. With Flink 1.10 supporting more DDL, the persistent
>> catalog would be playing even more critical role in users' workflow
>>  b) for Flink data: Hive data connector (source/sink) helps both
>> Flink and Hive users to unlock new use cases in streaming,
>> near-realtime/realtime data warehouse, backfill, etc.
>>
>> 2) currently users have to go thru a *really* tedious process to get
>> started, because it requires lots of extra jars (see [2]) that are absent
>> in Flink's lean distribution. We've had so many users from public mailing
>> list, private email, DingTalk groups who got frustrated on spending lots of
>> time figuring out the jars themselves. They would rather have a more "right
>> out of box" quickstart experience, and play with the catalog and
>> source/sink without hassle.
>>
>> 3) it's easier for users to replace those Hive dependencies for their own
>> Hive versions - just replace those jars with the right versions and no need
>> to find the doc.
>>
>> * Hive 2.3.4 and 1.2.1 are two versions that represent lots of user base
>> out there, and that's why we are using them as examples for dependencies in
>> [1] even though we've supported almost all Hive versions [3] now.
>>
>> I want to hear what the community think about this, and how to achieve it
>> if we believe that's the way to go.
>>
>> Cheers,
>> Bowen
>>
>> [1] https://flink.apache.org/downloads.html
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/hive/#dependencies
>> [3]
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/hive/#supported-hive-versions
>>
>

-- 
Best Regards

Jeff Zhang


Re: [DISCUSS] Set default planner for SQL Client to Blink planner in 1.10 release

2020-01-03 Thread Jeff Zhang
+1, I have already made blink as the default planner of flink interpreter
in Zeppelin


Jingsong Li  于2020年1月3日周五 下午4:37写道:

> Hi Jark,
>
> +1 for default blink planner in SQL-CLI.
> I believe this new planner can be put into practice in production.
> We've worked hard for nearly a year, but the old planner didn't move on.
>
> And I'd like to cc to user@flink.apache.org.
> If anyone finds that blink planner has any significant defects and has a
> larger regression than the old planner, please let us know. We will be very
> grateful.
>
> Best,
> Jingsong Lee
>
> On Fri, Jan 3, 2020 at 4:14 PM Leonard Xu  wrote:
>
>> +1 for this.
>> We bring many SQL/API features and enhance stability in 1.10 release, and
>> almost all of them happens in Blink planner.
>> SQL CLI is the most convenient entrypoint for me, I believe many users
>> will have a better experience If we set Blink planner as default planner.
>>
>> Best,
>> Leonard
>>
>> > 在 2020年1月3日,15:16,Terry Wang  写道:
>> >
>> > Since what blink planner can do is a superset of flink planner, big +1
>> for changing the default planner to Blink planner from my side.
>> >
>> > Best,
>> > Terry Wang
>> >
>> >
>> >
>> >> 2020年1月3日 15:00,Jark Wu  写道:
>> >>
>> >> Hi everyone,
>> >>
>> >> In 1.10 release, Flink SQL supports many awesome features and
>> improvements,
>> >> including:
>> >> - support watermark statement and computed column in DDL
>> >> - fully support all data types in Hive
>> >> - Batch SQL performance improvements (TPC-DS 7x than Hive MR)
>> >> - support INSERT OVERWRITE and INSERT PARTITION
>> >>
>> >> However, all the features and improvements are only avaiable in Blink
>> >> planner, not in Old planner.
>> >> There are also some other features are limited in Blink planner, e.g.
>> >> Dimension Table Join [1],
>> >> TopN [2], Deduplicate [3], streaming aggregates optimization [4], and
>> so on.
>> >>
>> >> But Old planner is still the default planner in Table API & SQL. It is
>> >> frustrating for users to set
>> >> to blink planner manually when every time start a SQL CLI. And it's
>> >> surprising to see unsupported
>> >> exception if they trying out the new features but not switch planner.
>> >>
>> >> SQL CLI is a very important entrypoint for trying out new feautures and
>> >> prototyping for users.
>> >> In order to give new planner more exposures, I would like to suggest
>> to set
>> >> default planner
>> >> for SQL Client to Blink planner before 1.10 release.
>> >>
>> >> The approach is just changing the default SQL CLI yaml
>> configuration[5]. In
>> >> this way, the existing
>> >> environment is still compatible and unaffected.
>> >>
>> >> Changing the default planner for the whole Table API & SQL is another
>> topic
>> >> and is out of scope of this discussion.
>> >>
>> >> What do you think?
>> >>
>> >> Best,
>> >> Jark
>> >>
>> >> [1]:
>> >>
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table
>> >> [2]:
>> >>
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#top-n
>> >> [3]:
>> >>
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication
>> >> [4]:
>> >>
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/tuning/streaming_aggregation_optimization.html
>> >> [5]:
>> >>
>> https://github.com/apache/flink/blob/master/flink-table/flink-sql-client/conf/sql-client-defaults.yaml#L100
>> >
>>
>>
>
> --
> Best, Jingsong Lee
>


-- 
Best Regards

Jeff Zhang


Re: [ANNOUNCE] Dian Fu becomes a Flink committer

2020-01-16 Thread Jeff Zhang
Congrats Dian Fu !

jincheng sun  于2020年1月16日周四 下午5:58写道:

> Hi everyone,
>
> I'm very happy to announce that Dian accepted the offer of the Flink PMC
> to become a committer of the Flink project.
>
> Dian Fu has been contributing to Flink for many years. Dian Fu played an
> essential role in PyFlink/CEP/SQL/Table API modules. Dian Fu has
> contributed several major features, reported and fixed many bugs, spent a
> lot of time reviewing pull requests and also frequently helping out on the
> user mailing lists and check/vote the release.
>
> Please join in me congratulating Dian for becoming a Flink committer !
>
> Best,
> Jincheng(on behalf of the Flink PMC)
>


-- 
Best Regards

Jeff Zhang


Re: [DISCUSS] Upload the Flink Python API 1.9.x to PyPI for user convenience.

2020-02-03 Thread Jeff Zhang
+1


Xingbo Huang  于2020年2月4日周二 下午1:07写道:

> Hi Jincheng,
>
> Thanks for driving this.
> +1 for this proposal.
>
> Compared to building from source, downloading directly from PyPI will
> greatly save the development cost of Python users.
>
> Best,
> Xingbo
>
>
>
> Wei Zhong  于2020年2月4日周二 下午12:43写道:
>
>> Hi Jincheng,
>>
>> Thanks for bring up this discussion!
>>
>> +1 for this proposal. Building from source takes long time and requires a
>> good network environment. Some users may not have such an environment.
>> Uploading to PyPI will greatly improve the user experience.
>>
>> Best,
>> Wei
>>
>> jincheng sun  于2020年2月4日周二 上午11:49写道:
>>
>>> Hi folks,
>>>
>>> I am very happy to receive some user inquiries about the use of Flink
>>> Python API (PyFlink) recently. One of the more common questions is
>>> whether
>>> it is possible to install PyFlink without using source code build. The
>>> most
>>> convenient and natural way for users is to use `pip install
>>> apache-flink`.
>>> We originally planned to support the use of `pip install apache-flink` in
>>> Flink 1.10, but the reason for this decision was that when Flink 1.9 was
>>> released at August 22, 2019[1], Flink's PyPI account system was not
>>> ready.
>>> At present, our PyPI account is available at October 09, 2019 [2](Only
>>> PMC
>>> can access), So for the convenience of users I propose:
>>>
>>> - Publish the latest release version (1.9.2) of Flink 1.9 to PyPI.
>>> - Update Flink 1.9 documentation to add support for `pip install`.
>>>
>>> As we all know, Flink 1.9.2 was just completed released at January 31,
>>> 2020
>>> [3]. There is still at least 1 to 2 months before the release of 1.9.3,
>>> so
>>> my proposal is completely considered from the perspective of user
>>> convenience. Although the proposed work is not large, we have not set a
>>> precedent for independent release of the Flink Python API(PyFlink) in the
>>> previous release process. I hereby initiate the current discussion and
>>> look
>>> forward to your feedback!
>>>
>>> Best,
>>> Jincheng
>>>
>>> [1]
>>>
>>> https://lists.apache.org/thread.html/4a4d23c449f26b66bc58c71cc1a5c6079c79b5049c6c6744224c5f46%40%3Cdev.flink.apache.org%3E
>>> [2]
>>>
>>> https://lists.apache.org/thread.html/8273a5e8834b788d8ae552a5e177b69e04e96c0446bb90979444deee%40%3Cprivate.flink.apache.org%3E
>>> [3]
>>>
>>> https://lists.apache.org/thread.html/ra27644a4e111476b6041e8969def4322f47d5e0aae8da3ef30cd2926%40%3Cdev.flink.apache.org%3E
>>>
>>

-- 
Best Regards

Jeff Zhang


Re: [ANNOUNCE] Apache Flink 1.10.0 released

2020-02-12 Thread Jeff Zhang
Congratulations! Really appreciated your hard work.

Yangze Guo  于2020年2月13日周四 上午9:29写道:

> Thanks, Gary & Yu. Congrats to everyone involved!
>
> Best,
> Yangze Guo
>
> On Thu, Feb 13, 2020 at 9:23 AM Jingsong Li 
> wrote:
> >
> > Congratulations! Great work.
> >
> > Best,
> > Jingsong Lee
> >
> > On Wed, Feb 12, 2020 at 11:05 PM Leonard Xu  wrote:
> >>
> >> Great news!
> >> Thanks everyone involved !
> >> Thanks Gary and Yu for being the release manager !
> >>
> >> Best,
> >> Leonard Xu
> >>
> >> 在 2020年2月12日,23:02,Stephan Ewen  写道:
> >>
> >> Congrats to us all.
> >>
> >> A big piece of work, nicely done.
> >>
> >> Let's hope that this helps our users make their existing use cases
> easier and also opens up new use cases.
> >>
> >> On Wed, Feb 12, 2020 at 3:31 PM 张光辉  wrote:
> >>>
> >>> Greet work.
> >>>
> >>> Congxian Qiu  于2020年2月12日周三 下午10:11写道:
> >>>>
> >>>> Great work.
> >>>> Thanks everyone involved.
> >>>> Thanks Gary and Yu for being the release manager
> >>>>
> >>>>
> >>>> Best,
> >>>> Congxian
> >>>>
> >>>>
> >>>> Jark Wu  于2020年2月12日周三 下午9:46写道:
> >>>>>
> >>>>> Congratulations to everyone involved!
> >>>>> Great thanks to Yu & Gary for being the release manager!
> >>>>>
> >>>>> Best,
> >>>>> Jark
> >>>>>
> >>>>> On Wed, 12 Feb 2020 at 21:42, Zhu Zhu  wrote:
> >>>>>>
> >>>>>> Cheers!
> >>>>>> Thanks Gary and Yu for the great job as release managers.
> >>>>>> And thanks to everyone whose contribution makes the release
> possible!
> >>>>>>
> >>>>>> Thanks,
> >>>>>> Zhu Zhu
> >>>>>>
> >>>>>> Wyatt Chun  于2020年2月12日周三 下午9:36写道:
> >>>>>>>
> >>>>>>> Sounds great. Congrats & Thanks!
> >>>>>>>
> >>>>>>> On Wed, Feb 12, 2020 at 9:31 PM Yu Li  wrote:
> >>>>>>>>
> >>>>>>>> The Apache Flink community is very happy to announce the release
> of Apache Flink 1.10.0, which is the latest major release.
> >>>>>>>>
> >>>>>>>> Apache Flink® is an open-source stream processing framework for
> distributed, high-performing, always-available, and accurate data streaming
> applications.
> >>>>>>>>
> >>>>>>>> The release is available for download at:
> >>>>>>>> https://flink.apache.org/downloads.html
> >>>>>>>>
> >>>>>>>> Please check out the release blog post for an overview of the
> improvements for this new major release:
> >>>>>>>> https://flink.apache.org/news/2020/02/11/release-1.10.0.html
> >>>>>>>>
> >>>>>>>> The full release notes are available in Jira:
> >>>>>>>>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12345845
> >>>>>>>>
> >>>>>>>> We would like to thank all contributors of the Apache Flink
> community who made this release possible!
> >>>>>>>>
> >>>>>>>> Cheers,
> >>>>>>>> Gary & Yu
> >>
> >>
> >
> >
> > --
> > Best, Jingsong Lee
>


-- 
Best Regards

Jeff Zhang


Re: [ANNOUNCE] Jingsong Lee becomes a Flink committer

2020-02-20 Thread Jeff Zhang
Congratulations!Jingsong. You deserve it

wenlong.lwl  于2020年2月21日周五 上午11:43写道:

> Congrats Jingsong!
>
> On Fri, 21 Feb 2020 at 11:41, Dian Fu  wrote:
>
> > Congrats Jingsong!
> >
> > > 在 2020年2月21日,上午11:39,Jark Wu  写道:
> > >
> > > Congratulations Jingsong! Well deserved.
> > >
> > > Best,
> > > Jark
> > >
> > > On Fri, 21 Feb 2020 at 11:32, zoudan  wrote:
> > >
> > >> Congratulations! Jingsong
> > >>
> > >>
> > >> Best,
> > >> Dan Zou
> > >>
> >
> >
>


-- 
Best Regards

Jeff Zhang


Re: SHOW CREATE TABLE in Flink SQL

2020-03-02 Thread Jeff Zhang
+1 for this, maybe we can add 'describe extended table' like hive

Gyula Fóra  于2020年3月2日周一 下午8:49写道:

> Hi All!
>
> I am looking for the functionality to show how a table was created or show
> all the properties (connector, etc.)
>
> I could only find DESCRIBE at this point which only shows the schema.
>
> Is there anything similar to "SHOW CREATE TABLE" or is this something that
> we should maybe add in the future?
>
> Thank you!
> Gyula
>


-- 
Best Regards

Jeff Zhang


Re: How to override flink-conf parameters for SQL Client

2020-03-05 Thread Jeff Zhang
Hi Gyula,

I am doing integration Flink with Zeppelin. One feature in Zeppelin is that
user could override any features in flink-conf.yaml. (Actually any features
here
https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/config.html).
Of course you can run flink sql in Zeppelin, and could also leverage other
features of Zeppelin, like visualization.

If you are interested, you could try the master branch of Zeppelin + this
improvement PR

https://github.com/apache/zeppelin
https://github.com/apache/zeppelin/pull/3676
https://github.com/apache/zeppelin/blob/master/docs/interpreter/flink.md






Gyula Fóra  于2020年3月5日周四 下午6:51写道:

> I could basically list a few things I want to set (execution.target for
> example), but it's fair to assume that I would like to be able to set
> anything :)
>
> Gyula
>
> On Thu, Mar 5, 2020 at 11:35 AM Jingsong Li 
> wrote:
>
>> Hi Gyula,
>>
>> Maybe Blink planner has invoked "StreamExecutionEnvironment.configure",
>> which planner do you use?
>>
>> But "StreamExecutionEnvironment.configure" is only for partial
>> configuration, can not for all configuration in flink-conf.yaml.
>> So what's the config do you want to set? I know some config like
>> "taskmanager.network.blocking-shuffle.compression.enabled" can not set
>>
>> Best,
>> Jingsong Lee
>>
>> On Thu, Mar 5, 2020 at 6:17 PM Jark Wu  wrote:
>>
>>> Hi Gyula,
>>>
>>> Flink configurations can be overrided via
>>> `TableConfig#getConfiguration()`, however, SQL CLI only allows to set Table
>>> specific configs.
>>> I will think it as a bug/improvement of SQL CLI which should be fixed in
>>> 1.10.1.
>>>
>>> Best,
>>> Jark
>>>
>>> On Thu, 5 Mar 2020 at 18:12, Gyula Fóra  wrote:
>>>
>>>> Thanks Caizhi,
>>>>
>>>> This seems like a pretty big shortcoming for any multi-user/multi-app
>>>> environment. I will open a jira for this.
>>>>
>>>> Gyula
>>>>
>>>> On Thu, Mar 5, 2020 at 10:58 AM Caizhi Weng 
>>>> wrote:
>>>>
>>>>> Hi Gyula.
>>>>>
>>>>> I'm afraid there is no way to override all Flink configurations
>>>>> currently. SQL client yaml file can only override some of the Flink
>>>>> configurations.
>>>>>
>>>>> Configuration entries indeed can only set Table specific configs,
>>>>> while deployment entires are used to set the result fetching address and
>>>>> port. There is currently no way to change the execution target from the 
>>>>> SQL
>>>>> client.
>>>>>
>>>>> Gyula Fóra  于2020年3月5日周四 下午4:31写道:
>>>>>
>>>>>> Hi All!
>>>>>>
>>>>>> I am trying to understand if there is any way to override flink
>>>>>> configuration parameters when starting the SQL Client.
>>>>>>
>>>>>> It seems that the only way to pass any parameters is through the
>>>>>> environment yaml.
>>>>>>
>>>>>> There I found 2 possible routes:
>>>>>>
>>>>>> configuration: this doesn't work as it only sets Table specific
>>>>>> configs apparently, but maybe I am wrong.
>>>>>>
>>>>>> deployment: I tried using dynamic properties options here but
>>>>>> unfortunately we normalize (lowercase) the YAML keys so it is impossible 
>>>>>> to
>>>>>> pass options like -yD or -D.
>>>>>>
>>>>>> Does anyone have any suggestions?
>>>>>>
>>>>>> Thanks
>>>>>> Gyula
>>>>>>
>>>>>
>>
>> --
>> Best, Jingsong Lee
>>
>

-- 
Best Regards

Jeff Zhang


Re: How to override flink-conf parameters for SQL Client

2020-03-05 Thread Jeff Zhang
There's 2 kinds of configuration: job level & cluster level. I am afraid we
don't have document to differentiate that, it depends on how user
understand these configuration. We may need to improve document on that.

Kurt Young  于2020年3月6日周五 上午8:34写道:

> If you already have a running flink cluster and you want submit another
> job to this cluster, then all the configurations
> relates to process parameters like TM memory, slot number etc are not be
> able to modify.
>
> Best,
> Kurt
>
>
> On Thu, Mar 5, 2020 at 11:08 PM Gyula Fóra  wrote:
>
>> Kurt can you please explain which conf parameters do you mean?
>>
>> In regular executions (Yarn for instance) we  have dynamic config
>> parameters overriding any flink-conf argument.
>> So it is not about setting them in the user code but it should happen
>> before the ClusterDescriptors are created (ie in the together with the
>> CustomCommandLine logic)
>>
>> Gyula
>>
>> On Thu, Mar 5, 2020 at 3:49 PM Kurt Young  wrote:
>>
>>> IIRC the tricky thing here is not all the config options belong to
>>> flink-conf.yaml can be adjust dynamically in user's program.
>>> So it will end up like some of the configurations can be overridden but
>>> some are not. The experience is not quite good for users.
>>>
>>> Best,
>>> Kurt
>>>
>>>
>>> On Thu, Mar 5, 2020 at 10:15 PM Jeff Zhang  wrote:
>>>
>>>> Hi Gyula,
>>>>
>>>> I am doing integration Flink with Zeppelin. One feature in Zeppelin is
>>>> that user could override any features in flink-conf.yaml. (Actually any
>>>> features here
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/config.html).
>>>> Of course you can run flink sql in Zeppelin, and could also leverage other
>>>> features of Zeppelin, like visualization.
>>>>
>>>> If you are interested, you could try the master branch of Zeppelin +
>>>> this improvement PR
>>>>
>>>> https://github.com/apache/zeppelin
>>>> https://github.com/apache/zeppelin/pull/3676
>>>> https://github.com/apache/zeppelin/blob/master/docs/interpreter/flink.md
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> Gyula Fóra  于2020年3月5日周四 下午6:51写道:
>>>>
>>>>> I could basically list a few things I want to set (execution.target
>>>>> for example), but it's fair to assume that I would like to be able to set
>>>>> anything :)
>>>>>
>>>>> Gyula
>>>>>
>>>>> On Thu, Mar 5, 2020 at 11:35 AM Jingsong Li 
>>>>> wrote:
>>>>>
>>>>>> Hi Gyula,
>>>>>>
>>>>>> Maybe Blink planner has invoked
>>>>>> "StreamExecutionEnvironment.configure", which planner do you use?
>>>>>>
>>>>>> But "StreamExecutionEnvironment.configure" is only for partial
>>>>>> configuration, can not for all configuration in flink-conf.yaml.
>>>>>> So what's the config do you want to set? I know some config like
>>>>>> "taskmanager.network.blocking-shuffle.compression.enabled" can not 
>>>>>> set
>>>>>>
>>>>>> Best,
>>>>>> Jingsong Lee
>>>>>>
>>>>>> On Thu, Mar 5, 2020 at 6:17 PM Jark Wu  wrote:
>>>>>>
>>>>>>> Hi Gyula,
>>>>>>>
>>>>>>> Flink configurations can be overrided via
>>>>>>> `TableConfig#getConfiguration()`, however, SQL CLI only allows to set 
>>>>>>> Table
>>>>>>> specific configs.
>>>>>>> I will think it as a bug/improvement of SQL CLI which should be
>>>>>>> fixed in 1.10.1.
>>>>>>>
>>>>>>> Best,
>>>>>>> Jark
>>>>>>>
>>>>>>> On Thu, 5 Mar 2020 at 18:12, Gyula Fóra 
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Thanks Caizhi,
>>>>>>>>
>>>>>>>> This seems like a pretty big shortcoming for any
>>>>>>>> multi-user/multi-app environment. I will open a jira for this.
>>>>>>>>
>>>>>>>> Gyula
>>>>>>>>
>>

Re: Scala Shell gives error "rest.address must be set"

2020-03-17 Thread Jeff Zhang
It looks like you are running under standalone mode, what is
your command to start scala shell. ?

Craig Foster  于2020年3月18日周三 上午5:23写道:

> Hi:
> When I upgraded from Flink 1.9.1 to Flink 1.10.0 I can't execute
> programs at the Scala shell.
>
> It gives me an error that the REST address must be set. This looks
> like it comes from HA but I don't have HA configured for Flink and it
> was very hard to find this documented other than in the PR/JIRA in the
> history so don't have much context. Can someone point me to how to
> configure this properly? For reference, I put the example stacktrace
> below.
>
> scala> val text = benv.fromElements("To be, or not to be,--that is the
> question:--");
> text: org.apache.flink.api.scala.DataSet[String] =
> org.apache.flink.api.scala.DataSet@2396408a
>
> scala> val counts = text.flatMap { _.toLowerCase.split("\\W+")}.map {
> (_, 1) }.groupBy(0).sum(1);
> counts: org.apache.flink.api.scala.AggregateDataSet[(String, Int)] =
> org.apache.flink.api.scala.AggregateDataSet@38bce2ed
>
> scala> counts.print()
> 20/03/17 21:15:34 INFO java.ExecutionEnvironment: The job has 0
> registered types and 0 default Kryo serializers
> 20/03/17 21:15:34 INFO configuration.GlobalConfiguration: Loading
> configuration property: env.yarn.conf.dir, /etc/hadoop/conf
> 20/03/17 21:15:34 INFO configuration.GlobalConfiguration: Loading
> configuration property: env.hadoop.conf.dir, /etc/hadoop/conf
> java.lang.RuntimeException: Couldn't retrieve standalone cluster
>   at
> org.apache.flink.client.deployment.StandaloneClusterDescriptor.lambda$retrieve$0(StandaloneClusterDescriptor.java:53)
>   at
> org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor.execute(AbstractSessionClusterExecutor.java:64)
>   at
> org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:944)
>   at
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:860)
>   at
> org.apache.flink.api.java.ScalaShellEnvironment.execute(ScalaShellEnvironment.java:81)
>   at
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:844)
>   at org.apache.flink.api.java.DataSet.collect(DataSet.java:413)
>   at org.apache.flink.api.java.DataSet.print(DataSet.java:1652)
>   at org.apache.flink.api.scala.DataSet.print(DataSet.scala:1864)
>   ... 30 elided
> Caused by: java.lang.NullPointerException: rest.address must be set
>   at
> org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:104)
>   at
> org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.getWebMonitorAddress(HighAvailabilityServicesUtils.java:196)
>   at
> org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createClientHAService(HighAvailabilityServicesUtils.java:146)
>   at
> org.apache.flink.client.program.rest.RestClusterClient.(RestClusterClient.java:161)
>   at
> org.apache.flink.client.deployment.StandaloneClusterDescriptor.lambda$retrieve$0(StandaloneClusterDescriptor.java:51)
>   ... 38 more
>


-- 
Best Regards

Jeff Zhang


Re: Scala Shell gives error "rest.address must be set"

2020-03-17 Thread Jeff Zhang
I agree, this is really confusing for users. Do you mind to create a ticket
for that ?

Craig Foster  于2020年3月18日周三 上午8:36写道:

> If I specify these options, it seems to work...but I thought I could
> have this dynamically determined when submitting jobs just using the
> "yarn" option:
>
> /usr/lib/flink/bin/start-scala-shell.sh yarn -s 4 -jm 1024m -tm 4096m
>
> I guess what isn't clear here to me is that if you use `yarn` alone
> there needs to be an existing yarn cluster already started.
>
>
> On Tue, Mar 17, 2020 at 4:22 PM Craig Foster 
> wrote:
> >
> > Yeah, I was wondering about that. I'm using
> > `/usr/lib/flink/bin/start-scala-shell.sh yarn`-- previously I'd use
> > `/usr/lib/flink/bin/start-scala-shell.sh yarn -n ${NUM}`
> >  but that deprecated option was removed.
> >
> >
> > On Tue, Mar 17, 2020 at 4:11 PM Jeff Zhang  wrote:
> > >
> > > It looks like you are running under standalone mode, what is your
> command to start scala shell. ?
> > >
> > > Craig Foster  于2020年3月18日周三 上午5:23写道:
> > >>
> > >> Hi:
> > >> When I upgraded from Flink 1.9.1 to Flink 1.10.0 I can't execute
> > >> programs at the Scala shell.
> > >>
> > >> It gives me an error that the REST address must be set. This looks
> > >> like it comes from HA but I don't have HA configured for Flink and it
> > >> was very hard to find this documented other than in the PR/JIRA in the
> > >> history so don't have much context. Can someone point me to how to
> > >> configure this properly? For reference, I put the example stacktrace
> > >> below.
> > >>
> > >> scala> val text = benv.fromElements("To be, or not to be,--that is the
> > >> question:--");
> > >> text: org.apache.flink.api.scala.DataSet[String] =
> > >> org.apache.flink.api.scala.DataSet@2396408a
> > >>
> > >> scala> val counts = text.flatMap { _.toLowerCase.split("\\W+")}.map {
> > >> (_, 1) }.groupBy(0).sum(1);
> > >> counts: org.apache.flink.api.scala.AggregateDataSet[(String, Int)] =
> > >> org.apache.flink.api.scala.AggregateDataSet@38bce2ed
> > >>
> > >> scala> counts.print()
> > >> 20/03/17 21:15:34 INFO java.ExecutionEnvironment: The job has 0
> > >> registered types and 0 default Kryo serializers
> > >> 20/03/17 21:15:34 INFO configuration.GlobalConfiguration: Loading
> > >> configuration property: env.yarn.conf.dir, /etc/hadoop/conf
> > >> 20/03/17 21:15:34 INFO configuration.GlobalConfiguration: Loading
> > >> configuration property: env.hadoop.conf.dir, /etc/hadoop/conf
> > >> java.lang.RuntimeException: Couldn't retrieve standalone cluster
> > >>   at
> org.apache.flink.client.deployment.StandaloneClusterDescriptor.lambda$retrieve$0(StandaloneClusterDescriptor.java:53)
> > >>   at
> org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor.execute(AbstractSessionClusterExecutor.java:64)
> > >>   at
> org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:944)
> > >>   at
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:860)
> > >>   at
> org.apache.flink.api.java.ScalaShellEnvironment.execute(ScalaShellEnvironment.java:81)
> > >>   at
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:844)
> > >>   at org.apache.flink.api.java.DataSet.collect(DataSet.java:413)
> > >>   at org.apache.flink.api.java.DataSet.print(DataSet.java:1652)
> > >>   at org.apache.flink.api.scala.DataSet.print(DataSet.scala:1864)
> > >>   ... 30 elided
> > >> Caused by: java.lang.NullPointerException: rest.address must be set
> > >>   at
> org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:104)
> > >>   at
> org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.getWebMonitorAddress(HighAvailabilityServicesUtils.java:196)
> > >>   at
> org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createClientHAService(HighAvailabilityServicesUtils.java:146)
> > >>   at
> org.apache.flink.client.program.rest.RestClusterClient.(RestClusterClient.java:161)
> > >>   at
> org.apache.flink.client.deployment.StandaloneClusterDescriptor.lambda$retrieve$0(StandaloneClusterDescriptor.java:51)
> > >>   ... 38 more
> > >
> > >
> > >
> > > --
> > > Best Regards
> > >
> > > Jeff Zhang
>


-- 
Best Regards

Jeff Zhang


Re: [Third-party Tool] Flink memory calculator

2020-03-29 Thread Jeff Zhang
Hi Yangze,

Does this tool just parse the configuration in flink-conf.yaml ?  Maybe it
could be done in JobListener [1] (we should enhance it via adding hook
before job submission), so that it could all the cases (e.g. parameters
coming from command line)

[1]
https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/execution/JobListener.java#L35


Yangze Guo  于2020年3月30日周一 上午9:40写道:

> Hi, Yun,
>
> I'm sorry that it currently could not handle it. But I think it is a
> really good idea and that feature would be added to the next version.
>
> Best,
> Yangze Guo
>
> On Mon, Mar 30, 2020 at 12:21 AM Yun Tang  wrote:
> >
> > Very interesting and convenient tool, just a quick question: could this
> tool also handle deployment cluster commands like "-tm" mixed with
> configuration in `flink-conf.yaml` ?
> >
> > Best
> > Yun Tang
> > 
> > From: Yangze Guo 
> > Sent: Friday, March 27, 2020 18:00
> > To: user ; user...@flink.apache.org <
> user...@flink.apache.org>
> > Subject: [Third-party Tool] Flink memory calculator
> >
> > Hi, there.
> >
> > In release-1.10, the memory setup of task managers has changed a lot.
> > I would like to provide here a third-party tool to simulate and get
> > the calculation result of Flink's memory configuration.
> >
> >  Although there is already a detailed setup guide[1] and migration
> > guide[2] officially, the calculator could further allow users to:
> > - Verify if there is any conflict in their configuration. The
> > calculator is more lightweight than starting a Flink cluster,
> > especially when running Flink on Yarn/Kubernetes. User could make sure
> > their configuration is correct locally before deploying it to external
> > resource managers.
> > - Get all of the memory configurations before deploying. User may set
> > taskmanager.memory.task.heap.size and taskmanager.memory.managed.size.
> > But they also want to know the total memory consumption of Flink. With
> > this tool, users could get all of the memory configurations they are
> > interested in. If anything is unexpected, they would not need to
> > re-deploy a Flink cluster.
> >
> > The repo link of this tool is
> > https://github.com/KarmaGYZ/flink-memory-calculator. It reuses the
> > BashJavaUtils.jar of Flink and ensures the calculation result is
> > exactly the same as your Flink dist. For more details, please take a
> > look at the README.
> >
> > Any feedback or suggestion is welcomed!
> >
> > [1]
> https://ci.apache.org/projects/flink/flink-docs-master/ops/memory/mem_setup.html
> > [2]
> https://ci.apache.org/projects/flink/flink-docs-master/ops/memory/mem_migration.html
> >
> > Best,
> > Yangze Guo
>


-- 
Best Regards

Jeff Zhang


[ANNOUNCE] Flink on Zeppelin (Zeppelin 0.9 is released)

2020-03-30 Thread Jeff Zhang
Hi Folks,

I am very excited to announce the integration work of flink on apache
zeppelin notebook is completed. You can now run flink jobs via datastream
api, table api, sql, pyflink in apache apache zeppelin notebook. Download
it here http://zeppelin.apache.org/download.html),

Here's some highlights of this work

1. Support 3 kind of execution mode: local, remote, yarn
2. Support multiple languages  in one flink session: scala, python, sql
3. Support hive connector (reading from hive and writing to hive)
4. Dependency management
5. UDF support (scala, pyflink)
6. Support both batch sql and streaming sql

For more details and usage instructions, you can refer following 4 blogs

1) Get started https://link.medium.com/oppqD6dIg5
<https://t.co/PTouUYYTrv?amp=1> 2) Batch https://link.medium.com/3qumbwRIg5
<https://t.co/Yo9QAY0Joj?amp=1> 3) Streaming https://
link.medium.com/RBHa2lTIg5 <https://t.co/sUapN40tvI?amp=1> 4) Advanced
usage https://link.medium.com/CAekyoXIg5 <https://t.co/MXolULmafZ?amp=1>

Welcome to use flink on zeppelin and give feedback and comments.

-- 
Best Regards

Jeff Zhang


Re: Flink SQL Gateway

2020-04-16 Thread Jeff Zhang
Hi Flavio,

If you would like to use have a UI to register data sources, run flink sql
and preview the sql result, then you can use zeppelin directly. You can
check the tutorial here,
1) Get started https://link.medium.com/oppqD6dIg5
<https://t.co/PTouUYYTrv?amp=1> 2) Batch https://link.medium.com/3qumbwRIg5
<https://t.co/Yo9QAY0Joj?amp=1> 3) Streaming https://
link.medium.com/RBHa2lTIg5 <https://t.co/sUapN40tvI?amp=1> 4) Advanced
usage https://link.medium.com/CAekyoXIg5 <https://t.co/MXolULmafZ?amp=1>

And here's one article shared by someone else about how to use flink on
zeppelin.
https://medium.com/@abdelkrim.hadjidj/event-driven-supply-chain-for-crisis-with-flinksql-be80cb3ad4f9

Besides that, Zeppelin provides rest api which you can use to integarte
with other system, but it is not standard jdbc protocol.
http://zeppelin.apache.org/docs/0.9.0-preview1/usage/rest_api/notebook.html

And I am doing more improvement recently, I will reveal more details in
next week's flink forward.
https://www.flink-forward.org/sf-2020/conference-program#it%E2%80%99s-finally-here--python-on-flink---flink-on-zeppelin





Flavio Pompermaier  于2020年4月16日周四 下午8:24写道:

> Basically we want to give a UI to the user to register its data sources
> (i.e. catalogs in the Flink world), preview them (SELECT * LIMIT 100 for
> example) but, in the case of JDBC catalogs, also to see relationships and
> triggers.
> We don't want to reimplement the wheel so we would like to reuse and
> contribute to Flink as much as possible (since then in the batch jobs we
> use Flink and we don't like to do the same work twice..).
> In this way we can contribute to Flink if something is missing in the SQL
> Gateway. However I don't know how to extend the existing stuff (for example
> if I want table relationships and triggers)..
>
> Best,
> Flavio
>
> On Thu, Apr 16, 2020 at 1:38 PM godfrey he  wrote:
>
>> Hi Flavio,
>>
>> Since 1.11(master), Flink supports "CREATE CATALOG ..." [1],
>> we can use this statement create catalog dynamically.
>>
>> Currently, Catalog[2] dose not supports any operations on TRIGGER.
>> Flink can't also use such info now. What's your user scenario?
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-15349
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/catalogs.html#catalog-api
>>
>> Best,
>> Godfrey
>>
>> Flavio Pompermaier  于2020年4月16日周四 下午6:16写道:
>>
>>> Hi Godfrey,
>>> I'd like to use the SQL gateway as a data proxy in our architecture.
>>> However, catalogs in our use case are not know at configuration time..
>>> is there a way to permit to register a JDBC catalog (for example when I
>>> want to connect to a Postgres database)?
>>> What if I want to add SHOW TRIGGERS? Do you think it could be
>>> interesting?
>>>
>>> On Thu, Apr 16, 2020 at 10:55 AM godfrey he  wrote:
>>>
>>>> Hi Flavio,
>>>>
>>>> We prose FLIP-91[1] to support SQL Gateway at the beginning of this
>>>> year.
>>>> After a long discussion, we reached an agreement that
>>>> SQL Gateway is an eco-system under ververia as first step.[2]
>>>> Which could help SQL Gateway move forward faster.
>>>> Now we almost finish first version development, some users are trying
>>>> it out.
>>>> Any suggestions are welcome!
>>>>
>>>> [1]
>>>> https://docs.google.com/document/d/1DKpFdov1o_ObvrCmU-5xi-VrT6nR2gxq-BbswSSI9j8/edit?ts=5e4cd711#heading=h.sru3xzbt2i1k
>>>> [2] https://github.com/ververica/flink-sql-gateway
>>>>
>>>> Best,
>>>> Godfrey
>>>>
>>>> Flavio Pompermaier  于2020年4月16日周四 下午4:42写道:
>>>>
>>>>> Hi Jeff,
>>>>> FLIP-24 [1] proposed to develop a SQL gateway to query Flink via SQL
>>>>> but since then no progress has been made on that point. Do you think that
>>>>> Zeppelin could be used somehow as a SQL Gateway towards Flink for the
>>>>> moment?
>>>>> Any chance that a Flink SQL Gateway could ever be developed? Is there
>>>>> anybody interested in this?
>>>>>
>>>>> Best,
>>>>> Flavio
>>>>>
>>>>> [1]
>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-24+-+SQL+Client
>>>>>
>>>>

-- 
Best Regards

Jeff Zhang


Re: Job manager URI rpc address:port

2020-04-19 Thread Jeff Zhang
Hi Som,

You can take a look at flink on zeppelin, in zeppelin you can connect to a
remote flink cluster via a few configuration, and you don't need to worry
about the jars. Flink interpreter will ship necessary jars for you. Here's
a list of tutorials.

1) Get started https://link.medium.com/oppqD6dIg5
<https://t.co/PTouUYYTrv?amp=1> 2) Batch https://link.medium.com/3qumbwRIg5
<https://t.co/Yo9QAY0Joj?amp=1> 3) Streaming https://
link.medium.com/RBHa2lTIg5 <https://t.co/sUapN40tvI?amp=1> 4) Advanced
usage https://link.medium.com/CAekyoXIg5 <https://t.co/MXolULmafZ?amp=1>


Zahid Rahman  于2020年4月19日周日 下午7:27写道:

> Hi Tison,
>
> I think I may have found what I want in example 22.
>
> https://www.programcreek.com/java-api-examples/?api=org.apache.flink.configuration.Configuration
>
> I need to create Configuration object first as shown .
>
> Also I think  flink-conf.yaml file may contain configuration for client
> rather than  server. So before starting is irrelevant.
> I am going to play around and see but if the Configuration class allows me
> to set configuration programmatically and overrides the yaml file then that
> would be great.
>
>
>
> On Sun, 19 Apr 2020, 11:35 Som Lima,  wrote:
>
>> Thanks.
>> flink-conf.yaml does allow me to do what I need to do without making any
>> changes to client source code.
>>
>> But
>> RemoteStreamEnvironment constructor  expects a jar file as the third
>> parameter also.
>>
>> RemoteStreamEnvironment
>> <https://ci.apache.org/projects/flink/flink-docs-release-1.7/api/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.html#RemoteStreamEnvironment-java.lang.String-int-java.lang.String...->
>> (String
>> <http://docs.oracle.com/javase/7/docs/api/java/lang/String.html?is-external=true>
>>  host,
>> int port, String
>> <http://docs.oracle.com/javase/7/docs/api/java/lang/String.html?is-external=true>
>> ... jarFiles)
>> Creates a new RemoteStreamEnvironment that points to the master
>> (JobManager) described by the given host name and port.
>>
>> On Sun, 19 Apr 2020, 11:02 tison,  wrote:
>>
>>> You can change flink-conf.yaml "jobmanager.address" or "jobmanager.port"
>>> options before run the program or take a look at RemoteStreamEnvironment
>>> which enables configuring host and port.
>>>
>>> Best,
>>> tison.
>>>
>>>
>>> Som Lima  于2020年4月19日周日 下午5:58写道:
>>>
>>>> Hi,
>>>>
>>>> After running
>>>>
>>>> $ ./bin/start-cluster.sh
>>>>
>>>> The following line of code defaults jobmanager  to localhost:6123
>>>>
>>>> final  ExecutionEnvironment env = Environment.getExecutionEnvironment();
>>>>
>>>> which is same on spark.
>>>>
>>>> val spark =
>>>> SparkSession.builder.master(local[*]).appname("anapp").getOrCreate
>>>>
>>>> However if I wish to run the servers on a different physical computer.
>>>> Then in Spark I can do it this way using the spark URI in my IDE.
>>>>
>>>> Conf =
>>>> SparkConf().setMaster("spark://:").setAppName("anapp")
>>>>
>>>> Can you please tell me the equivalent change to make so I can run my
>>>> servers and my IDE from different physical computers.
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>

-- 
Best Regards

Jeff Zhang


Re: Job manager URI rpc address:port

2020-04-19 Thread Jeff Zhang
Som, Let us know when you have any problems

Som Lima  于2020年4月20日周一 上午2:31写道:

> Thanks for the info and links.
>
> I had a lot of problems I am not sure what I was doing wrong.
>
> May be conflicts with setup from apache spark.  I think I may need to
> setup users for each development.
>
>
> Anyway I kept doing fresh installs about four altogether I think.
>
> Everything works fine now
> Including remote access  of zeppelin on machines across the local area
> network.
>
> Next step  setup remote clusters
>  Wish me luck !
>
>
>
>
>
>
>
> On Sun, 19 Apr 2020, 14:58 Jeff Zhang,  wrote:
>
>> Hi Som,
>>
>> You can take a look at flink on zeppelin, in zeppelin you can connect to
>> a remote flink cluster via a few configuration, and you don't need to worry
>> about the jars. Flink interpreter will ship necessary jars for you. Here's
>> a list of tutorials.
>>
>> 1) Get started https://link.medium.com/oppqD6dIg5
>> <https://t.co/PTouUYYTrv?amp=1> 2) Batch https://
>> link.medium.com/3qumbwRIg5 <https://t.co/Yo9QAY0Joj?amp=1> 3) Streaming
>> https://link.medium.com/RBHa2lTIg5 <https://t.co/sUapN40tvI?amp=1> 4)
>> Advanced usage https://link.medium.com/CAekyoXIg5
>> <https://t.co/MXolULmafZ?amp=1>
>>
>>
>> Zahid Rahman  于2020年4月19日周日 下午7:27写道:
>>
>>> Hi Tison,
>>>
>>> I think I may have found what I want in example 22.
>>>
>>> https://www.programcreek.com/java-api-examples/?api=org.apache.flink.configuration.Configuration
>>>
>>> I need to create Configuration object first as shown .
>>>
>>> Also I think  flink-conf.yaml file may contain configuration for client
>>> rather than  server. So before starting is irrelevant.
>>> I am going to play around and see but if the Configuration class allows
>>> me to set configuration programmatically and overrides the yaml file then
>>> that would be great.
>>>
>>>
>>>
>>> On Sun, 19 Apr 2020, 11:35 Som Lima,  wrote:
>>>
>>>> Thanks.
>>>> flink-conf.yaml does allow me to do what I need to do without making
>>>> any changes to client source code.
>>>>
>>>> But
>>>> RemoteStreamEnvironment constructor  expects a jar file as the third
>>>> parameter also.
>>>>
>>>> RemoteStreamEnvironment
>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.7/api/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.html#RemoteStreamEnvironment-java.lang.String-int-java.lang.String...->
>>>> (String
>>>> <http://docs.oracle.com/javase/7/docs/api/java/lang/String.html?is-external=true>
>>>>  host,
>>>> int port, String
>>>> <http://docs.oracle.com/javase/7/docs/api/java/lang/String.html?is-external=true>
>>>> ... jarFiles)
>>>> Creates a new RemoteStreamEnvironment that points to the master
>>>> (JobManager) described by the given host name and port.
>>>>
>>>> On Sun, 19 Apr 2020, 11:02 tison,  wrote:
>>>>
>>>>> You can change flink-conf.yaml "jobmanager.address" or
>>>>> "jobmanager.port" options before run the program or take a look at
>>>>> RemoteStreamEnvironment which enables configuring host and port.
>>>>>
>>>>> Best,
>>>>> tison.
>>>>>
>>>>>
>>>>> Som Lima  于2020年4月19日周日 下午5:58写道:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> After running
>>>>>>
>>>>>> $ ./bin/start-cluster.sh
>>>>>>
>>>>>> The following line of code defaults jobmanager  to localhost:6123
>>>>>>
>>>>>> final  ExecutionEnvironment env =
>>>>>> Environment.getExecutionEnvironment();
>>>>>>
>>>>>> which is same on spark.
>>>>>>
>>>>>> val spark =
>>>>>> SparkSession.builder.master(local[*]).appname("anapp").getOrCreate
>>>>>>
>>>>>> However if I wish to run the servers on a different physical computer.
>>>>>> Then in Spark I can do it this way using the spark URI in my IDE.
>>>>>>
>>>>>> Conf =
>>>>>> SparkConf().setMaster("spark://:").setAppName("anapp")
>>>>>>
>>>>>> Can you please tell me the equivalent change to make so I can run my
>>>>>> servers and my IDE from different physical computers.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>
>> --
>> Best Regards
>>
>> Jeff Zhang
>>
>

-- 
Best Regards

Jeff Zhang


Re: Job manager URI rpc address:port

2020-04-19 Thread Jeff Zhang
Glad to hear that.

Som Lima  于2020年4月20日周一 上午8:08写道:

> I will thanks.  Once I had it set up and working.
> I switched  my computers around from client to server to server to client.
> With your excellent instructions I was able to do it in 5 .minutes
>
> On Mon, 20 Apr 2020, 00:05 Jeff Zhang,  wrote:
>
>> Som, Let us know when you have any problems
>>
>> Som Lima  于2020年4月20日周一 上午2:31写道:
>>
>>> Thanks for the info and links.
>>>
>>> I had a lot of problems I am not sure what I was doing wrong.
>>>
>>> May be conflicts with setup from apache spark.  I think I may need to
>>> setup users for each development.
>>>
>>>
>>> Anyway I kept doing fresh installs about four altogether I think.
>>>
>>> Everything works fine now
>>> Including remote access  of zeppelin on machines across the local area
>>> network.
>>>
>>> Next step  setup remote clusters
>>>  Wish me luck !
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Sun, 19 Apr 2020, 14:58 Jeff Zhang,  wrote:
>>>
>>>> Hi Som,
>>>>
>>>> You can take a look at flink on zeppelin, in zeppelin you can connect
>>>> to a remote flink cluster via a few configuration, and you don't need to
>>>> worry about the jars. Flink interpreter will ship necessary jars for you.
>>>> Here's a list of tutorials.
>>>>
>>>> 1) Get started https://link.medium.com/oppqD6dIg5
>>>> <https://t.co/PTouUYYTrv?amp=1> 2) Batch https://
>>>> link.medium.com/3qumbwRIg5 <https://t.co/Yo9QAY0Joj?amp=1> 3)
>>>> Streaming https://link.medium.com/RBHa2lTIg5
>>>> <https://t.co/sUapN40tvI?amp=1> 4) Advanced usage https://
>>>> link.medium.com/CAekyoXIg5 <https://t.co/MXolULmafZ?amp=1>
>>>>
>>>>
>>>> Zahid Rahman  于2020年4月19日周日 下午7:27写道:
>>>>
>>>>> Hi Tison,
>>>>>
>>>>> I think I may have found what I want in example 22.
>>>>>
>>>>> https://www.programcreek.com/java-api-examples/?api=org.apache.flink.configuration.Configuration
>>>>>
>>>>> I need to create Configuration object first as shown .
>>>>>
>>>>> Also I think  flink-conf.yaml file may contain configuration for
>>>>> client rather than  server. So before starting is irrelevant.
>>>>> I am going to play around and see but if the Configuration class
>>>>> allows me to set configuration programmatically and overrides the yaml 
>>>>> file
>>>>> then that would be great.
>>>>>
>>>>>
>>>>>
>>>>> On Sun, 19 Apr 2020, 11:35 Som Lima,  wrote:
>>>>>
>>>>>> Thanks.
>>>>>> flink-conf.yaml does allow me to do what I need to do without making
>>>>>> any changes to client source code.
>>>>>>
>>>>>> But
>>>>>> RemoteStreamEnvironment constructor  expects a jar file as the third
>>>>>> parameter also.
>>>>>>
>>>>>> RemoteStreamEnvironment
>>>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.7/api/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.html#RemoteStreamEnvironment-java.lang.String-int-java.lang.String...->
>>>>>> (String
>>>>>> <http://docs.oracle.com/javase/7/docs/api/java/lang/String.html?is-external=true>
>>>>>>  host,
>>>>>> int port, String
>>>>>> <http://docs.oracle.com/javase/7/docs/api/java/lang/String.html?is-external=true>
>>>>>> ... jarFiles)
>>>>>> Creates a new RemoteStreamEnvironment that points to the master
>>>>>> (JobManager) described by the given host name and port.
>>>>>>
>>>>>> On Sun, 19 Apr 2020, 11:02 tison,  wrote:
>>>>>>
>>>>>>> You can change flink-conf.yaml "jobmanager.address" or
>>>>>>> "jobmanager.port" options before run the program or take a look at
>>>>>>> RemoteStreamEnvironment which enables configuring host and port.
>>>>>>>
>>>>>>> Best,
>>>>>>> tison.
>>>>>>>
>>>>>>>
>>>>>>> Som Lima  于2020年4月19日周日 下午5:58写道:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> After running
>>>>>>>>
>>>>>>>> $ ./bin/start-cluster.sh
>>>>>>>>
>>>>>>>> The following line of code defaults jobmanager  to localhost:6123
>>>>>>>>
>>>>>>>> final  ExecutionEnvironment env =
>>>>>>>> Environment.getExecutionEnvironment();
>>>>>>>>
>>>>>>>> which is same on spark.
>>>>>>>>
>>>>>>>> val spark =
>>>>>>>> SparkSession.builder.master(local[*]).appname("anapp").getOrCreate
>>>>>>>>
>>>>>>>> However if I wish to run the servers on a different physical
>>>>>>>> computer.
>>>>>>>> Then in Spark I can do it this way using the spark URI in my IDE.
>>>>>>>>
>>>>>>>> Conf =
>>>>>>>> SparkConf().setMaster("spark://:").setAppName("anapp")
>>>>>>>>
>>>>>>>> Can you please tell me the equivalent change to make so I can run
>>>>>>>> my servers and my IDE from different physical computers.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>
>>>> --
>>>> Best Regards
>>>>
>>>> Jeff Zhang
>>>>
>>>
>>
>> --
>> Best Regards
>>
>> Jeff Zhang
>>
>

-- 
Best Regards

Jeff Zhang


Re: FLINK JOB

2020-04-20 Thread Jeff Zhang
How do you run flink job ? It should not always be localhost:8081

Som Lima  于2020年4月20日周一 下午4:33写道:

> Hi,
>
> FLINK JOB  url  defaults to localhost
>
> i.e. localhost:8081.
>
> I have to manually change it to server :8081 to get Apache  flink  Web
> Dashboard to display.
>
>
>
>
>

-- 
Best Regards

Jeff Zhang


Re: FLINK JOB

2020-04-20 Thread Jeff Zhang
I see, so you are running flink interpreter in local mode. But you access
zeppelin from a remote machine, right ?  Do you mean you can access it
after changing localhost to ip ? If so, then I can add one configuration in
zeppelin side to replace the localhost to real ip.

Som Lima  于2020年4月20日周一 下午4:44写道:

> I am only running the zeppelin  word count example by clicking the
> zeppelin run arrow.
>
>
> On Mon, 20 Apr 2020, 09:42 Jeff Zhang,  wrote:
>
>> How do you run flink job ? It should not always be localhost:8081
>>
>> Som Lima  于2020年4月20日周一 下午4:33写道:
>>
>>> Hi,
>>>
>>> FLINK JOB  url  defaults to localhost
>>>
>>> i.e. localhost:8081.
>>>
>>> I have to manually change it to server :8081 to get Apache  flink
>>> Web Dashboard to display.
>>>
>>>
>>>
>>>
>>>
>>
>> --
>> Best Regards
>>
>> Jeff Zhang
>>
>

-- 
Best Regards

Jeff Zhang


microsecond resolution

2016-12-04 Thread jeff jacobson
I've sourced stackoverflow, the docs, and the web but I can't figure out:
does flink support microsecond timestamp resolution? Thanks!


Re: microsecond resolution

2016-12-04 Thread jeff jacobson
Sorry if I'm missing something. That link mentions milliseconds, no? My
question is whether or not I can specify microseconds where
1000microseconds = 1millisecond. Thanks!

On Sun, Dec 4, 2016 at 5:05 PM, Matthias J. Sax  wrote:

> Yes. It does.
>
> See:
> https://ci.apache.org/projects/flink/flink-docs-
> release-1.1/apis/streaming/event_timestamps_watermarks.
> html#assigning-timestamps
>
> "Both timestamps and watermarks are specified as millliseconds since the
> Java epoch of 1970-01-01T00:00:00Z."
>
>
>
> -Matthias
>
>
> On 12/04/2016 10:57 AM, jeff jacobson wrote:
> > I've sourced stackoverflow, the docs, and the web but I can't figure
> > out: does flink support microsecond timestamp resolution? Thanks!
>
>


Re: microsecond resolution

2016-12-04 Thread jeff jacobson
Wow. Really? Is there a way to do micros? A hack? A Jira story? Most (all?)
U.S. equity and European futures, options, and stock markets timestamp in
microseconds. This makes Flink unusable for a massive industry vertical. To
the extent lower-frequency time-series data is being used (e.g. end of data
prices), stream processing is kind of overkill. Love everything I've read
about Flink...there's got to be a way to make this work, no?

On Sun, Dec 4, 2016 at 5:27 PM, Matthias J. Sax  wrote:

> -BEGIN PGP SIGNED MESSAGE-
> Hash: SHA512
>
> Oh. My bad... Did not read your question carefully enough.
>
> Than the answer is no, it does not support microseconds (only
> milliseconds).
>
> - -Matthias
>
>
> On 12/4/16 2:22 PM, jeff jacobson wrote:
> > Sorry if I'm missing something. That link mentions milliseconds,
> > no? My question is whether or not I can specify microseconds where
> > 1000microseconds = 1millisecond. Thanks!
> >
> > On Sun, Dec 4, 2016 at 5:05 PM, Matthias J. Sax  > <mailto:mj...@apache.org>> wrote:
> >
> > Yes. It does.
> >
> > See:
> > https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/strea
> ming/event_timestamps_watermarks.html#assigning-timestamps
> >
> >
> <https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/stream
> ing/event_timestamps_watermarks.html#assigning-timestamps>
> >
> > "Both timestamps and watermarks are specified as millliseconds
> > since the Java epoch of 1970-01-01T00:00:00Z."
> >
> >
> >
> > -Matthias
> >
> >
> > On 12/04/2016 10:57 AM, jeff jacobson wrote:
> >> I've sourced stackoverflow, the docs, and the web but I can't
> >> figure out: does flink support microsecond timestamp resolution?
> >> Thanks!
> >
> >
> -BEGIN PGP SIGNATURE-
> Comment: GPGTools - https://gpgtools.org
>
> iQIYBAEBCgAGBQJYRJhUAAoJELz8Z8hxAGOiNKoP32ChGeNd7N8Zco2q6lsu+Hxd
> JZq62ey3wTrIUS+3oRlILwnu81cViQHtMMVBly3+YnqB85gNiaEUxEQTQCdKPl8G
> AqxoFIkMcrKGzwGXigKnCAoVIiyuPeNuhY1d1yv4rWrkt7qb0lCC02Xoq1C0hoS6
> Stwk62GXmNRXPYpyjnSq/iAIMbjWaU+ZU0t4V3J8loroNuJ5QcUsJLfRXeo3/5ho
> f42L+IANyB5K7vnTxNZYyf5ShNVbTY9/iFaviluxrCNztqGTo7CxMpcyWyMS3wcF
> ycXcq/daB+guEJpW0sm4JtMPSsQ/kN99c/ig3t0HX1kDV7xrDDSF2qPvbYOWF38n
> omTr7RY3YRFi5LOKvBGa96Aw5UYjMddjcqozWId6xgdXfvz6RUeJCWa9RW8I6ptg
> 8TaJpM2WgDJMgMuzdl8dDv65l78DkLlNlNo53O66b/9Pt78P75KNjj8naD5kkj4C
> i9amwnUNNEnZucA2/1vhzr6cVSzrzBLL7juVj0VmABZo4itUZjjR0UkN7MB+ioWU
> trNhaXgE6EP/160n6D0/NUu02prm3jq8mK6gu9lZFWGbAeCUcch+CbvWSaiXAw3H
> BOieCsgZD1wfXQJ3wEmnqj/YP94uDlx1IjynskDevjk6OIyIysbBSIqgsUK6fvQ8
> ztXO6ls7ARMOBmA=
> =/O+Q
> -END PGP SIGNATURE-
>


Re: microsecond resolution

2016-12-05 Thread jeff jacobson
Thanks Kostas. So if we're comfortable treating timestamps as longs (and
doing conversions to human readable time at our application level), we can
use CEP, ML lib etc. in addition to all basic Flink functions? That's great
news?

To Matthias's point, *why then does the following not read "**Both
timestamps and watermarks are specified as longs"?* Before I go headlong
into developing on Flink, I just want to be sure I'm covered here. Again,
thanks. (The Youtube videos from FlinkForward are also great, btw.
Incredibly impressed with Data Artisans.)


https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/
event_timestamps_watermarks.html#assigning-timestamps

"Both timestamps and watermarks are specified as milliseconds since the Java
epoch of 1970-01-01T00:00:00Z."



On Mon, Dec 5, 2016 at 4:57 AM, Kostas Kloudas 
wrote:

> Hi Jeff,
>
> Actually in Flink timestamps are simple longs.
> This means that you can assign anything you want as a timestamp, as long
> as it fits in a long.
>
> Hope this helps and if not, we can discuss to see if we can find a
> solution that
> fits your needs together.
>
> Cheers,
> Kostas
>
> On Dec 4, 2016, at 11:39 PM, jeff jacobson 
> wrote:
>
> Wow. Really? Is there a way to do micros? A hack? A Jira story? Most
> (all?) U.S. equity and European futures, options, and stock markets
> timestamp in microseconds. This makes Flink unusable for a massive industry
> vertical. To the extent lower-frequency time-series data is being used
> (e.g. end of data prices), stream processing is kind of overkill. Love
> everything I've read about Flink...there's got to be a way to make this
> work, no?
>
> On Sun, Dec 4, 2016 at 5:27 PM, Matthias J. Sax  wrote:
>
>> -BEGIN PGP SIGNED MESSAGE-
>> Hash: SHA512
>>
>> Oh. My bad... Did not read your question carefully enough.
>>
>> Than the answer is no, it does not support microseconds (only
>> milliseconds).
>>
>> - -Matthias
>>
>>
>> On 12/4/16 2:22 PM, jeff jacobson wrote:
>> > Sorry if I'm missing something. That link mentions milliseconds,
>> > no? My question is whether or not I can specify microseconds where
>> > 1000microseconds = 1millisecond. Thanks!
>> >
>> > On Sun, Dec 4, 2016 at 5:05 PM, Matthias J. Sax > > <mailto:mj...@apache.org>> wrote:
>> >
>> > Yes. It does.
>> >
>> > See:
>> > https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/strea
>> ming/event_timestamps_watermarks.html#assigning-timestamps
>> <https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/event_timestamps_watermarks.html#assigning-timestamps>
>> >
>> >
>> <https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/stream
>> ing/event_timestamps_watermarks.html#assigning-timestamps
>> <https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/event_timestamps_watermarks.html#assigning-timestamps>
>> >
>> >
>> > "Both timestamps and watermarks are specified as millliseconds
>> > since the Java epoch of 1970-01-01T00:00:00Z."
>> >
>> >
>> >
>> > -Matthias
>> >
>> >
>> > On 12/04/2016 10:57 AM, jeff jacobson wrote:
>> >> I've sourced stackoverflow, the docs, and the web but I can't
>> >> figure out: does flink support microsecond timestamp resolution?
>> >> Thanks!
>> >
>> >
>> -BEGIN PGP SIGNATURE-
>> Comment: GPGTools - https://gpgtools.org
>>
>> iQIYBAEBCgAGBQJYRJhUAAoJELz8Z8hxAGOiNKoP32ChGeNd7N8Zco2q6lsu+Hxd
>> JZq62ey3wTrIUS+3oRlILwnu81cViQHtMMVBly3+YnqB85gNiaEUxEQTQCdKPl8G
>> AqxoFIkMcrKGzwGXigKnCAoVIiyuPeNuhY1d1yv4rWrkt7qb0lCC02Xoq1C0hoS6
>> Stwk62GXmNRXPYpyjnSq/iAIMbjWaU+ZU0t4V3J8loroNuJ5QcUsJLfRXeo3/5ho
>> f42L+IANyB5K7vnTxNZYyf5ShNVbTY9/iFaviluxrCNztqGTo7CxMpcyWyMS3wcF
>> ycXcq/daB+guEJpW0sm4JtMPSsQ/kN99c/ig3t0HX1kDV7xrDDSF2qPvbYOWF38n
>> omTr7RY3YRFi5LOKvBGa96Aw5UYjMddjcqozWId6xgdXfvz6RUeJCWa9RW8I6ptg
>> 8TaJpM2WgDJMgMuzdl8dDv65l78DkLlNlNo53O66b/9Pt78P75KNjj8naD5kkj4C
>> i9amwnUNNEnZucA2/1vhzr6cVSzrzBLL7juVj0VmABZo4itUZjjR0UkN7MB+ioWU
>> trNhaXgE6EP/160n6D0/NUu02prm3jq8mK6gu9lZFWGbAeCUcch+CbvWSaiXAw3H
>> BOieCsgZD1wfXQJ3wEmnqj/YP94uDlx1IjynskDevjk6OIyIysbBSIqgsUK6fvQ8
>> ztXO6ls7ARMOBmA=
>> =/O+Q
>> -END PGP SIGNATURE-
>>
>
>
>


Re: microsecond resolution

2016-12-05 Thread jeff jacobson
Thanks for clearing things up, Stephan and Kostas

On Mon, Dec 5, 2016 at 8:08 AM, Kostas Kloudas 
wrote:

> Hi Jeff,
>
> As Stephan said the interpretation of the timestamps is up to the logic of
> your job.
> And as for the documentation, thanks for reporting this.
> We should update it.
>
> Cheers,
> Kostas
>
> On Dec 5, 2016, at 1:56 PM, Stephan Ewen  wrote:
>
> @Jeff - good point about the docs.
>
> I think Kostas is right though - the event timestamps are up to the user's
> interpretation.
>
> The built-in window assigners interpret them as "Unix Epoch Millis", but
> you can define your own window assigners that interpret the timestamps
> differently.
> The system interprets them as also as Unix Epoch Millis when mixing event
> time and processing time (because processing time comes from
> System.currentTimeMillis())
>
> So, you can "re-interpret" them by using custom window assigners and not
> using processing time.
> If you want to use a processing time like component, I'd suggest to
> incorporate that in your watermark generator.
>
> Stephan
>
>
>
> On Mon, Dec 5, 2016 at 1:05 PM, jeff jacobson  com> wrote:
>
>> Thanks Kostas. So if we're comfortable treating timestamps as longs (and
>> doing conversions to human readable time at our application level), we can
>> use CEP, ML lib etc. in addition to all basic Flink functions? That's great
>> news?
>>
>> To Matthias's point, *why then does the following not read "**Both
>> timestamps and watermarks are specified as longs"?* Before I go headlong
>> into developing on Flink, I just want to be sure I'm covered here. Again,
>> thanks. (The Youtube videos from FlinkForward are also great, btw.
>> Incredibly impressed with Data Artisans.)
>>
>>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.1/
>> apis/streaming/event_timestamps_watermarks.html#assigning-timestamps
>>
>> "Both timestamps and watermarks are specified as milliseconds since the Java
>> epoch of 1970-01-01T00:00:00Z."
>>
>>
>>
>> On Mon, Dec 5, 2016 at 4:57 AM, Kostas Kloudas <
>> k.klou...@data-artisans.com> wrote:
>>
>>> Hi Jeff,
>>>
>>> Actually in Flink timestamps are simple longs.
>>> This means that you can assign anything you want as a timestamp, as long
>>> as it fits in a long.
>>>
>>> Hope this helps and if not, we can discuss to see if we can find a
>>> solution that
>>> fits your needs together.
>>>
>>> Cheers,
>>> Kostas
>>>
>>> On Dec 4, 2016, at 11:39 PM, jeff jacobson <
>>> jeff.harold.jacob...@gmail.com> wrote:
>>>
>>> Wow. Really? Is there a way to do micros? A hack? A Jira story? Most
>>> (all?) U.S. equity and European futures, options, and stock markets
>>> timestamp in microseconds. This makes Flink unusable for a massive industry
>>> vertical. To the extent lower-frequency time-series data is being used
>>> (e.g. end of data prices), stream processing is kind of overkill. Love
>>> everything I've read about Flink...there's got to be a way to make this
>>> work, no?
>>>
>>> On Sun, Dec 4, 2016 at 5:27 PM, Matthias J. Sax 
>>> wrote:
>>>
>>>> -BEGIN PGP SIGNED MESSAGE-
>>>> Hash: SHA512
>>>>
>>>> Oh. My bad... Did not read your question carefully enough.
>>>>
>>>> Than the answer is no, it does not support microseconds (only
>>>> milliseconds).
>>>>
>>>> - -Matthias
>>>>
>>>>
>>>> On 12/4/16 2:22 PM, jeff jacobson wrote:
>>>> > Sorry if I'm missing something. That link mentions milliseconds,
>>>> > no? My question is whether or not I can specify microseconds where
>>>> > 1000microseconds = 1millisecond. Thanks!
>>>> >
>>>> > On Sun, Dec 4, 2016 at 5:05 PM, Matthias J. Sax >>> > <mailto:mj...@apache.org>> wrote:
>>>> >
>>>> > Yes. It does.
>>>> >
>>>> > See:
>>>> > https://ci.apache.org/projects/flink/flink-docs-release-1.1/
>>>> apis/strea
>>>> ming/event_timestamps_watermarks.html#assigning-timestamps
>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/event_timestamps_watermarks.html#assigning-timestamps>
>>>> >
>>>> >
>>>> <https://ci.apache.org/pro

Can not run scala-shell in yarn mode in flink 1.5

2018-06-05 Thread Jeff Zhang
I try to run scala-shell in yarn mode in 1.5, but hit the following error.
I can run it successfully in 1.4.2. It is the same even when I change the
mode to legacy.  Is this a known issue or something changed in 1.5 ? Thanks

Command I Use: bin/start-scala-shell.sh yarn -n 1


Starting Flink Shell:
2018-06-06 12:30:02,672 INFO
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: jobmanager.rpc.address, localhost
2018-06-06 12:30:02,673 INFO
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: jobmanager.rpc.port, 6123
2018-06-06 12:30:02,674 INFO
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: jobmanager.heap.mb, 1024
2018-06-06 12:30:02,674 INFO
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: taskmanager.heap.mb, 1024
2018-06-06 12:30:02,674 INFO
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: taskmanager.numberOfTaskSlots, 1
2018-06-06 12:30:02,674 INFO
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: parallelism.default, 1
2018-06-06 12:30:02,675 INFO
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: rest.port, 8081
Exception in thread "main" java.lang.UnsupportedOperationException: Can't
deploy a standalone cluster.
at
org.apache.flink.client.deployment.StandaloneClusterDescriptor.deploySessionCluster(StandaloneClusterDescriptor.java:57)
at
org.apache.flink.client.deployment.StandaloneClusterDescriptor.deploySessionCluster(StandaloneClusterDescriptor.java:31)
at
org.apache.flink.api.scala.FlinkShell$.deployNewYarnCluster(FlinkShell.scala:272)
at
org.apache.flink.api.scala.FlinkShell$.fetchConnectionInfo(FlinkShell.scala:164)
at
org.apache.flink.api.scala.FlinkShell$.liftedTree1$1(FlinkShell.scala:194)
at org.apache.flink.api.scala.FlinkShell$.startShell(FlinkShell.scala:193)
at org.apache.flink.api.scala.FlinkShell$.main(FlinkShell.scala:135)
at org.apache.flink.api.scala.FlinkShell.main(FlinkShell.scala)


Re: flink 1.4.2 Ambari

2018-07-19 Thread Jeff Bean
Antonio,

Have you seen:

https://github.com/abajwa-hw/ambari-flink-service

Jeff

On Fri, Jul 13, 2018 at 7:45 PM, antonio saldivar 
wrote:

> Hello
>
> I am trying to find the way to add Flink 1.4.2 service to ambari because
> is not listed in the Stack. does anyone has the  steps to add this service
> manually?
>
> Thank you
> Best regards
>



-- 
Jeff Bean
Technical Evangelist
1-831-435-9847
<https://data-artisans.com/>


How to add flink table jar to classpath via bin/flink

2018-09-13 Thread Jeff Zhang
Because flink-table is a provided dependency, so it won't be included in
the final shaded jar. I didn't find way to add custom jar to classpath via
bin/flink, does anyone know that ? Thanks


Re: Starting a seperate Java process within a Flink cluster

2018-11-02 Thread Jeff Zhang
The error is most likely due to classpath issue. Because classpath is
different when you running flink program in IDE and run it in cluster.

And starting another jvm process in SourceFunction doesn't seems a good
approach to me, is it possible for you to do in your custom SourceFunction ?


Ly, The Anh 于2018年11月2日周五 下午5:22写道:

> Yes, i did. It is definitely there. I tried and made a separate Maven
> project to test if something was wrong with my jar.
> The resulting shaded jar of that test project was fine and the
> message-buffer-process was running with that test jar.
>
>
> Am 02.11.2018 04:47 schrieb Yun Tang :
> Hi
>
> Since you use the message-buffer-process as a dependency and the error
> tells you class not found, have you ever check your application jar package
> whether containing the wanted MessageBufferProcess.class? If not existed,
> try to use assembly-maven
>   or shaded-maven
>  plugin to include
> your classes.
>
> Best
> Yun Tang
> --
> *From:* Ly, The Anh 
> *Sent:* Friday, November 2, 2018 6:33
> *To:* user@flink.apache.org
> *Subject:* Starting a seperate Java process within a Flink cluster
>
>
> Hello,
>
>
> I am currently working on my masters and I encountered a difficult
> problem.
>
>
> Background (for context): I am trying to connect different data stream
> processors. Therefore i am using Flink's internal mechanisms of creating
> custom sinks and sources to receive from and send to different data stream
> processors. I am starting a separate
>
> process (message-buffer-process) in those custom sinks and sources
> to communicate and buffer data into that message-buffer-process.  My
> implementation is created with Maven and it could potentially be added as
> an dependency.
>
>
> Problem: I already tested my implementation by adding it as an dependency
> to a simple Flink word-count example. The test was within an IDE which
> works perfectly fine. But when i package that Flink work-count example and
> try
>
> to run it with "./flink run " or by uploading and submitting it as a job,
> it tells me that my buffer-process-class could not be found:
>
> In German: "Fehler: Hauptklasse
> de.tuberlin.mcc.geddsprocon.messagebuffer.MessageBufferProcess konnte nicht
> gefunden oder geladen werden"
>
> Roughly translated: "Error: Main class
> de.tuberlin.mcc.geddsprocon.messagebuffer.MessageBufferProcess could not be
> found or loaded"
>
>
> Code snipplets:
>
> Example - Adding my custom sink to send data to another data stream
> processor:
>
> dataStream.addSink(
>   (SinkFunction)DSPConnectorFactory
>   .getInstance()
>   .createSinkConnector(
>   new DSPConnectorConfig
>   .Builder("localhost", 9656)
>   .withDSP("flink")
>   
> .withBufferConnectorString("buffer-connection-string")
>   .withHWM(20)
>   .withTimeout(1)
>   .build()));
>
>
>
> The way i am trying to start the separate buffer-process: 
> JavaProcessBuilder.exec(MessageBufferProcess.class, connectionString, 
> addSentMessagesFrame);
> How JavaProcessBuilder.exec looks like:
> public static Process exec(Class javaClass, String connectionString, boolean 
> addSentMessagesFrame) throws IOException, InterruptedException {
> String javaHome = System.getProperty("java.home");
> String javaBin = javaHome +
> File.separator + "bin" +
> File.separator + "java";
> String classpath = System.getProperty("java.class.path");
> String className = javaClass.getCanonicalName();
>
> System.out.println("Trying to build process " + classpath + " " + 
> className);
>
> ProcessBuilder builder = new ProcessBuilder(
> javaBin, "-cp", classpath, className, connectionString, 
> Boolean.toString(addSentMessagesFrame));
>
> builder.redirectOutput(ProcessBuilder.Redirect.INHERIT);
> builder.redirectError(ProcessBuilder.Redirect.INHERIT);
>
> Process process = builder.start();
> return process;
> }
>
> I also tried running that message-buffer process separately in another maven 
> project and its packaged .jar file. That worked perfectly fine too. That is 
> why I am assuming that my approach is not appropriate for running in Flink.
> Did I miss something and starting my approach doesn't actually work within 
> Flink's context? I hope the information I gave you is sufficient to help 
> understanding my issue. If you need any more information feel free to message 
> me!
>
> Thanks for any help!
>
>  With best regards
>
>
>


Field could not be resolved by the field mapping when using kafka connector

2018-11-13 Thread Jeff Zhang
Hi,

I hit the following error when I try to use kafka connector in flink table
api. There's very little document about how to use kafka connector in flink
table api, could anyone help me on that ? Thanks

Exception in thread "main" org.apache.flink.table.api.ValidationException:
Field 'event_ts' could not be resolved by the field mapping.
at
org.apache.flink.table.sources.TableSourceUtil$.org$apache$flink$table$sources$TableSourceUtil$$resolveInputField(TableSourceUtil.scala:491)
at
org.apache.flink.table.sources.TableSourceUtil$$anonfun$org$apache$flink$table$sources$TableSourceUtil$$resolveInputFields$1.apply(TableSourceUtil.scala:521)
at
org.apache.flink.table.sources.TableSourceUtil$$anonfun$org$apache$flink$table$sources$TableSourceUtil$$resolveInputFields$1.apply(TableSourceUtil.scala:521)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
at
org.apache.flink.table.sources.TableSourceUtil$.org$apache$flink$table$sources$TableSourceUtil$$resolveInputFields(TableSourceUtil.scala:521)
at
org.apache.flink.table.sources.TableSourceUtil$.validateTableSource(TableSourceUtil.scala:127)
at
org.apache.flink.table.plan.schema.StreamTableSourceTable.(StreamTableSourceTable.scala:33)
at
org.apache.flink.table.api.StreamTableEnvironment.registerTableSourceInternal(StreamTableEnvironment.scala:150)
at
org.apache.flink.table.api.TableEnvironment.registerTableSource(TableEnvironment.scala:541)
at
org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.scala:47)
at
org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSourceAndSink(ConnectTableDescriptor.scala:68)

And here's the source code:



 case class Record(status: String, direction: String, var event_ts: Timestamp)


  def main(args: Array[String]): Unit = {
val senv = StreamExecutionEnvironment.getExecutionEnvironment
senv.setParallelism(1)
senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

val data: DataStream[Record] = ...
val tEnv = TableEnvironment.getTableEnvironment(senv)
tEnv
  // declare the external system to connect to
  .connect(
  new Kafka()
.version("0.11")
.topic("processed5.events")
.startFromEarliest()
.property("zookeeper.connect", "localhost:2181")
.property("bootstrap.servers", "localhost:9092"))
  .withFormat(new Json()
.failOnMissingField(false)
.deriveSchema()
  )
  .withSchema(
new Schema()
  .field("status", Types.STRING)
  .field("direction", Types.STRING)
  .field("event_ts", Types.SQL_TIMESTAMP).rowtime(
  new 
Rowtime().timestampsFromField("event_ts").watermarksPeriodicAscending())
  )

  // specify the update-mode for streaming tables
  .inAppendMode()

  // register as source, sink, or both and under a name
  .registerTableSourceAndSink("MyUserTable");

tEnv.fromDataStream(data).insertInto("MyUserTable")


Field could not be resolved by the field mapping when using kafka connector

2018-11-13 Thread Jeff Zhang
Hi,

I hit the following error when I try to use kafka connector in flink table
api. There's very little document about how to use kafka connector in flink
table api, could anyone help me on that ? Thanks

Exception in thread "main" org.apache.flink.table.api.ValidationException:
Field 'event_ts' could not be resolved by the field mapping.
at
org.apache.flink.table.sources.TableSourceUtil$.org$apache$flink$table$sources$TableSourceUtil$$resolveInputField(TableSourceUtil.scala:491)
at
org.apache.flink.table.sources.TableSourceUtil$$anonfun$org$apache$flink$table$sources$TableSourceUtil$$resolveInputFields$1.apply(TableSourceUtil.scala:521)
at
org.apache.flink.table.sources.TableSourceUtil$$anonfun$org$apache$flink$table$sources$TableSourceUtil$$resolveInputFields$1.apply(TableSourceUtil.scala:521)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
at
org.apache.flink.table.sources.TableSourceUtil$.org$apache$flink$table$sources$TableSourceUtil$$resolveInputFields(TableSourceUtil.scala:521)
at
org.apache.flink.table.sources.TableSourceUtil$.validateTableSource(TableSourceUtil.scala:127)
at
org.apache.flink.table.plan.schema.StreamTableSourceTable.(StreamTableSourceTable.scala:33)
at
org.apache.flink.table.api.StreamTableEnvironment.registerTableSourceInternal(StreamTableEnvironment.scala:150)
at
org.apache.flink.table.api.TableEnvironment.registerTableSource(TableEnvironment.scala:541)
at
org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.scala:47)
at
org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSourceAndSink(ConnectTableDescriptor.scala:68)

And here's the source code:



 case class Record(status: String, direction: String, var event_ts: Timestamp)


  def main(args: Array[String]): Unit = {
val senv = StreamExecutionEnvironment.getExecutionEnvironment
senv.setParallelism(1)
senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

val data: DataStream[Record] = ...
val tEnv = TableEnvironment.getTableEnvironment(senv)
tEnv
  // declare the external system to connect to
  .connect(
  new Kafka()
.version("0.11")
.topic("processed5.events")
.startFromEarliest()
.property("zookeeper.connect", "localhost:2181")
.property("bootstrap.servers", "localhost:9092"))
  .withFormat(new Json()
.failOnMissingField(false)
.deriveSchema()
  )
  .withSchema(
new Schema()
  .field("status", Types.STRING)
  .field("direction", Types.STRING)
  .field("event_ts", Types.SQL_TIMESTAMP).rowtime(
  new 
Rowtime().timestampsFromField("event_ts").watermarksPeriodicAscending())
  )

  // specify the update-mode for streaming tables
  .inAppendMode()

  // register as source, sink, or both and under a name
  .registerTableSourceAndSink("MyUserTable");

tEnv.fromDataStream(data).insertInto("MyUserTable")

0封新邮件
回复


Confused window operation

2018-11-13 Thread Jeff Zhang
Hi all,

I am a little confused with the following windows operation. Here's the
code,

val senv = StreamExecutionEnvironment.getExecutionEnvironment
senv.setParallelism(1)
val data = senv.fromElements("hello world", "hello flink", "hello hadoop")

data.flatMap(line => line.split("\\s"))
  .map(w => (w, 1))
  .keyBy(0)
  .countWindow(2, 1)
  .sum(1)
  .print("**")

senv.execute()


And this is the output:

**> (hello,1)
**> (world,1)
**> (hello,2)
**> (flink,1)
**> (hello,2)
**> (hadoop,1)


As my understanding, here we have 3 windows.

window 1

(hello, world)

window 2

(hello, world)

(hello, flink)

window 3

(hello flink)

(hello hadoop)

So for the first window, we have output (hello, 1) (world, 1)

for the second window we should output (hello, 2), (world,1 ), (flink, 1)

for the third window we should have output (hello, 2), (flink, 1), (hadoop, 1)


But as you can see, in the above I get different result, do I
misunderstand the window ? Could anyone help me to understand that ?
Thanks


Re: Field could not be resolved by the field mapping when using kafka connector

2018-11-13 Thread Jeff Zhang
Thanks hequn, it is very helpful

On Wed, Nov 14, 2018 at 2:32 PM Hequn Cheng  wrote:

> Hi jeff,
>
> We need a different field name for the rowtime indicator, something looks
> like:
>
>>   new Schema()
>> .field("status", Types.STRING)
>> .field("direction", Types.STRING)
>> .field("rowtime", Types.SQL_TIMESTAMP).rowtime(
>> new
>> Rowtime().timestampsFromField("event_ts").watermarksPeriodicAscending())
>
>
> Furthermore, we should define another sink schema which contains no
> rowtime definitions, since currently time attributes and custom field
> mappings are not supported yet for sink.
>
>> val sinkSchema =
>>   new Schema()
>> .field("status", Types.STRING)
>> .field("direction", Types.STRING)
>> .field("rowtime", Types.SQL_TIMESTAMP)
>
>
> Btw, a unified api for source and sink is under discussion now. More
> details here[1]
>
> Best, Hequn
>
> [1]
> https://docs.google.com/document/d/1Yaxp1UJUFW-peGLt8EIidwKIZEWrrA-pznWLuvaH39Y/edit?ts=5bb62df4#heading=h.41fd6rs7b3cf
>
>
> On Wed, Nov 14, 2018 at 9:18 AM Jeff Zhang  wrote:
>
>>
>> Hi,
>>
>> I hit the following error when I try to use kafka connector in flink
>> table api. There's very little document about how to use kafka connector in
>> flink table api, could anyone help me on that ? Thanks
>>
>> Exception in thread "main"
>> org.apache.flink.table.api.ValidationException: Field 'event_ts' could not
>> be resolved by the field mapping.
>> at
>> org.apache.flink.table.sources.TableSourceUtil$.org$apache$flink$table$sources$TableSourceUtil$$resolveInputField(TableSourceUtil.scala:491)
>> at
>> org.apache.flink.table.sources.TableSourceUtil$$anonfun$org$apache$flink$table$sources$TableSourceUtil$$resolveInputFields$1.apply(TableSourceUtil.scala:521)
>> at
>> org.apache.flink.table.sources.TableSourceUtil$$anonfun$org$apache$flink$table$sources$TableSourceUtil$$resolveInputFields$1.apply(TableSourceUtil.scala:521)
>> at
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>> at
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>> at
>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>> at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
>> at
>> org.apache.flink.table.sources.TableSourceUtil$.org$apache$flink$table$sources$TableSourceUtil$$resolveInputFields(TableSourceUtil.scala:521)
>> at
>> org.apache.flink.table.sources.TableSourceUtil$.validateTableSource(TableSourceUtil.scala:127)
>> at
>> org.apache.flink.table.plan.schema.StreamTableSourceTable.(StreamTableSourceTable.scala:33)
>> at
>> org.apache.flink.table.api.StreamTableEnvironment.registerTableSourceInternal(StreamTableEnvironment.scala:150)
>> at
>> org.apache.flink.table.api.TableEnvironment.registerTableSource(TableEnvironment.scala:541)
>> at
>> org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.scala:47)
>> at
>> org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSourceAndSink(ConnectTableDescriptor.scala:68)
>>
>> And here's the source code:
>>
>>
>>
>>  case class Record(status: String, direction: String, var event_ts: 
>> Timestamp)
>>
>>
>>   def main(args: Array[String]): Unit = {
>> val senv = StreamExecutionEnvironment.getExecutionEnvironment
>> senv.setParallelism(1)
>> senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>>
>> val data: DataStream[Record] = ...
>> val tEnv = TableEnvironment.getTableEnvironment(senv)
>> tEnv
>>   // declare the external system to connect to
>>   .connect(
>>   new Kafka()
>> .version("0.11")
>> .topic("processed5.events")
>> .startFromEarliest()
>> .property("zookeeper.connect", "localhost:2181")
>> .property("bootstrap.servers", "localhost:9092"))
>>   .withFormat(new Json()
>> .failOnMissingField(false)
>> .deriveSchema()
>>   )
>>   .withSchema(
>> new Schema()
>>   .field("status", Types.STRING)
>>   .field("direction", Types.STRING)
>>   .field("event_ts", Types.SQL_TIMESTAMP).rowtime(
>>   new 
>> Rowtime().timestampsFromField("event_ts").watermarksPeriodicAscending())
>>   )
>>
>>   // specify the update-mode for streaming tables
>>   .inAppendMode()
>>
>>   // register as source, sink, or both and under a name
>>   .registerTableSourceAndSink("MyUserTable");
>>
>> tEnv.fromDataStream(data).insertInto("MyUserTable")
>>
>> 0封新邮件
>> 回复
>>
>>

-- 
Best Regards

Jeff Zhang


Re: Confused window operation

2018-11-13 Thread Jeff Zhang
Thanks hequn & acqua.csq 

On Wed, Nov 14, 2018 at 2:17 PM Hequn Cheng  wrote:

> Hi Jeff,
>
> The window is not a global window. It is related to a specified key. You
> would have 6 windows after flatMap() and keyBy().
> key: hello with 3 windows
> key: world with 1 window
> key: flink with 1 window
> key: hadoop with 1 window
>
> Best, Hequn
>
>
> On Wed, Nov 14, 2018 at 10:31 AM Jeff Zhang  wrote:
>
>> Hi all,
>>
>> I am a little confused with the following windows operation. Here's the
>> code,
>>
>> val senv = StreamExecutionEnvironment.getExecutionEnvironment
>> senv.setParallelism(1)
>> val data = senv.fromElements("hello world", "hello flink", "hello hadoop")
>>
>> data.flatMap(line => line.split("\\s"))
>>   .map(w => (w, 1))
>>   .keyBy(0)
>>   .countWindow(2, 1)
>>   .sum(1)
>>   .print("**")
>>
>> senv.execute()
>>
>>
>> And this is the output:
>>
>> **> (hello,1)
>> **> (world,1)
>> **> (hello,2)
>> **> (flink,1)
>> **> (hello,2)
>> **> (hadoop,1)
>>
>>
>> As my understanding, here we have 3 windows.
>>
>> window 1
>>
>> (hello, world)
>>
>> window 2
>>
>> (hello, world)
>>
>> (hello, flink)
>>
>> window 3
>>
>> (hello flink)
>>
>> (hello hadoop)
>>
>> So for the first window, we have output (hello, 1) (world, 1)
>>
>> for the second window we should output (hello, 2), (world,1 ), (flink, 1)
>>
>> for the third window we should have output (hello, 2), (flink, 1), (hadoop, 
>> 1)
>>
>>
>> But as you can see, in the above I get different result, do I misunderstand 
>> the window ? Could anyone help me to understand that ? Thanks
>>
>>

-- 
Best Regards

Jeff Zhang


Are flink connectors included in the binary release ?

2018-11-14 Thread Jeff Zhang
I don't see the jars of flink connectors in the binary release of flink
1.6.1, so just want to confirm whether flink binary release include these
connectors. Thanks

-- 
Best Regards

Jeff Zhang


Re: Are flink connectors included in the binary release ?

2018-11-14 Thread Jeff Zhang
Thanks Chesnay, but if user want to use connectors in scala shell, they
have to download it.

On Wed, Nov 14, 2018 at 5:22 PM Chesnay Schepler  wrote:

> Connectors are never contained in binary releases as they are supposed t
> be packaged into the user-jar.
>
> On 14.11.2018 10:12, Jeff Zhang wrote:
>
>
> I don't see the jars of flink connectors in the binary release of flink
> 1.6.1, so just want to confirm whether flink binary release include these
> connectors. Thanks
>
> --
> Best Regards
>
> Jeff Zhang
>
>
>

-- 
Best Regards

Jeff Zhang


Re: Assigning a port range to rest.port

2018-12-05 Thread Jeff Zhang
This requirement makes sense to me. Another issue I hit due to single value
of rest port is that user can not start 2 local MiniCluster, I try to start
2 flink scala-shell in local mode, but fails due to port conflict.



Gyula Fóra  于2018年12月5日周三 下午8:04写道:

> Hi!
> Is there any way currently to set a port range for the rest client?
> rest.port only takes a single number and it is anyways overwritten to 0.
>
> This seems to be necessary when running the flink client from behind a
> firewall where only a predefined port-range is accessible from the outside.
>
> I would assume this is a common setup in prod environments. This hasn't
> been a problem with the legacy execution mode.
>
> Any thoughts?
> Gyula
>


-- 
Best Regards

Jeff Zhang


Re: Flink with Docker: docker-compose and FLINK_JOB_ARGUMENT exception

2018-12-07 Thread Jeff Zhang
I didn't use the built-in docker of flink, but the following flink docker
works for me pretty well.

https://github.com/big-data-europe/docker-flink



Piotr Nowojski  于2018年12月7日周五 下午6:20写道:

> Hi,
>
> I have never used flink and docker together, so I’m not sure if I will be
> able to help, however have you seen this README:
> https://github.com/apache/flink/tree/master/flink-container/docker
> ?
> Shouldn’t you be passing your arguments via `FLINK_JOB_ARGUMENTS`
> environment variable?
>
> Piotrek
>
> On 7 Dec 2018, at 10:55, Marke Builder  wrote:
>
> Hi,
>
> I'm trying to run flink with docker (docker-compose) and job arguments
> "config-dev.properties". But it seams that the job arguments are not
> available:
>
> docker-compose.yml
>
> version: '2'
> services:
>   job-cluster:
> image: ${FLINK_DOCKER_IMAGE_NAME:-timeseries-v1}
> ports:
>   - '8081:8081'
> command: job-cluster --job-classname
> -Djobmanager.rpc.address=job-cluster -Dparallelism.default=1 --config
> config-dev.properties
>
>   taskmanager:
> image: ${FLINK_DOCKER_IMAGE_NAME:-timeseries-v1}
> command: task-manager -Djobmanager.rpc.address=job-cluster
> scale: 1
>
>
> Excpetion:
> 4:32 AMorg.apache.flink.runtime.entrypoint.FlinkParseException: Failed to
> parse the command line arguments.
> 12/7/2018 10:44:32 AM  at
> org.apache.flink.runtime.entrypoint.parser.CommandLineParser.parse(CommandLineParser.java:52)
> 12/7/2018 10:44:32 AM  at
> org.apache.flink.container.entrypoint.StandaloneJobClusterEntryPoint.main(StandaloneJobClusterEntryPoint.java:143)
> 12/7/2018 10:44:32 AMCaused by:
> org.apache.commons.cli.MissingArgumentException: Missing argument for
> option: j
> 12/7/2018 10:44:32 AM  at
> org.apache.commons.cli.DefaultParser.checkRequiredArgs(DefaultParser.java:211)
> 12/7/2018 10:44:32 AM  at
> org.apache.commons.cli.DefaultParser.handleOption(DefaultParser.java:599)
> 12/7/2018 10:44:32 AM  at
> org.apache.commons.cli.DefaultParser.handleShortAndLongOption(DefaultParser.java:548)
> 12/7/2018 10:44:32 AM  at
> org.apache.commons.cli.DefaultParser.handleToken(DefaultParser.java:243)
> 12/7/2018 10:44:32 AM  at
> org.apache.commons.cli.DefaultParser.parse(DefaultParser.java:120)
> 12/7/2018 10:44:32 AM  at
> org.apache.commons.cli.DefaultParser.parse(DefaultParser.java:81)
> 12/7/2018 10:44:32 AM  at
> org.apache.flink.runtime.entrypoint.parser.CommandLineParser.parse(CommandLineParser.java:50)
> 12/7/2018 10:44:32 AM  ... 1 more
> 12/7/2018 10:44:32 AMException in thread "main"
> java.lang.NoSuchMethodError:
> org.apache.flink.runtime.entrypoint.parser.CommandLineParser.printHelp()V
> 12/7/2018 10:44:32 AM  at
> org.apache.flink.container.entrypoint.StandaloneJobClusterEntryPoint.main(StandaloneJobClusterEntryPoint.java:146)
>
>
>

-- 
Best Regards

Jeff Zhang


Re: [ANNOUNCE] Apache Flink 1.5.6 released

2018-12-26 Thread Jeff Zhang
Thanks Thomas. It's nice to have a more stable flink 1.5.x

vino yang  于2018年12月27日周四 上午9:43写道:

> Thomas, thanks for being a release manager.
> And Thanks for the whole community.
> I think the release of Flink 1.5.6 makes sense for many users who are
> currently unable to upgrade major versions.
>
> Best,
> Vino
>
> jincheng sun  于2018年12月27日周四 上午8:00写道:
>
>> Thanks a lot for being our release manager Thomas.
>> Thanks a lot for made this release possible!
>>
>> Cheers,
>> Jincheng
>>
>> Thomas Weise  于2018年12月27日周四 上午4:03写道:
>>
>>> The Apache Flink community is very happy to announce the release of
>>> Apache
>>> Flink 1.5.6, which is the final bugfix release for the Apache Flink 1.5
>>> series.
>>>
>>> Apache Flink® is an open-source stream processing framework for
>>> distributed, high-performing, always-available, and accurate data
>>> streaming
>>> applications.
>>>
>>> The release is available for download at:
>>> https://flink.apache.org/downloads.html
>>>
>>> Please check out the release blog post for an overview of the
>>> improvements
>>> for this bugfix release:
>>> https://flink.apache.org/news/2018/12/22/release-1.5.6.html
>>>
>>> The full release notes are available in Jira:
>>>
>>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12344315
>>>
>>> We would like to thank all contributors of the Apache Flink community who
>>> made this release possible!
>>>
>>> Regards,
>>> Thomas
>>>
>>

-- 
Best Regards

Jeff Zhang


Re: Query on retract stream

2019-01-21 Thread Jeff Zhang
I am thinking of another approach instead of retract stream. Is it possible
to define a custom window to do this ? This window is defined for each
order. And then you just need to analyze the events in this window.

Piotr Nowojski  于2019年1月21日周一 下午8:44写道:

> Hi,
>
> There is a missing feature in Flink Table API/SQL of supporting retraction
> streams as the input (or conversions from append stream to retraction
> stream) at the moment. With that your problem would simplify to one simple
> `SELECT uid, count(*) FROM Changelog GROUP BY uid`. There is an ongoing
> work with related work [1], so this might be supported in the next couple
> of months.
>
> There might a workaround at the moment that could work. I think you would
> need to write your own custom `LAST_ROW(x)` aggregation function, which
> would just return the value of the most recent aggregated row. With that
> you could write a query like this:
>
> SELECT
> uid, count(*)
> FROM (
> SELECT
> *
> FROM (
> SELECT
> uid, LAST_ROW(status)
> FROM
> changelog
> GROUP BY
> uid, oid)
> WHERE status = `pending`)
> GROUP BY
> uid
>
> Where `changelog` is an append only stream with the following content:
>
> *user, order, status, event_time*
> u1, o1, pending, t1
> u2, o2, failed, t2
> *u1, o3, pending, t3*
> *u1, o3, success, t4*
> u2, o4, pending, t5
> u2, o4, pending, t6
>
>
>
> Besides that, you could also write your own a relatively simple Data
> Stream application to do the same thing.
>
> I’m CC’ing Timo, maybe he will have another better idea.
>
> Piotrek
>
> [1] https://issues.apache.org/jira/browse/FLINK-8577
>
> On 18 Jan 2019, at 18:30, Gagan Agrawal  wrote:
>
> Hi,
> I have a requirement and need to understand if same can be achieved with
> Flink retract stream. Let's say we have stream with 4 attributes userId,
> orderId, status, event_time where orderId is unique and hence any change in
> same orderId updates previous value as below
>
> *Changelog* *Event Stream*
>
> *user, order, status, event_time*
> u1, o1, pending, t1
> u2, o2, failed, t2
> *u1, o3, pending, t3*
> *u1, o3, success, t4*
> u2, o4, pending, t5
> u2, o4, pending, t6
>
> *Snapshot view at time t6 (as viewed in mysql)*
> u1, o1, pending, t1
> u2, o2, failed, t2
> u1, o3, success, t4
> u4, o4, pending, t6
> (Here rows at time t3 and t5 are deleted as they have been updated for
> respective order ids)
>
> What I need is to maintain count of "Pending" orders against a user and if
> they go beyond configured threshold, then push that user and pending count
> to Kafka. Here there can be multiple updates to order status e.g Pending ->
> Success or Pending -> Failed. Also in some cases there may not be any
> change in status but we may still get a row (may be due to some other
> attribute update which we are not concerned about). So is it possible to
> have running count in flink as below at respective event times. Here
> Pending count is decreased from 2 to 1 for user u1 at t4 since one of it's
> order status was changed from Pending to Success. Similarly for user u2, at
> time t6, there was no change in running count as there was no change in
> status for order o4
>
> t1 -> u1 : 1, u2 : 0
> t2 -> u1 : 1, u2 : 0
> t3 -> u1 : 2, u2 : 0
> *t4 -> u1 : 1, u2 : 0 (since o3 moved pending to success, so count is
> decreased for u1)*
> t5 -> u1 : 1, u2 : 1
> *t6 -> u1 : 1, u2 : 1 (no increase in count of u2 as o4 update has no
> change)*
>
> As I understand may be retract stream can achieve this. However I am not
> sure how. Any samples around this would be of great help.
>
> Gagan
>
>
>

-- 
Best Regards

Jeff Zhang


Re: [DISCUSS] Towards a leaner flink-dist

2019-01-21 Thread Jeff Zhang
Thanks Chesnay for raising this discussion thread.  I think there are 3
major use scenarios for flink binary distribution.

1. Use it to set up standalone cluster
2. Use it to experience features of flink, such as via scala-shell,
sql-client
3. Downstream project use it to integrate with their system

I did a size estimation of flink dist folder, lib folder take around 100M
and opt folder take around 200M. Overall I agree to make a thin flink dist.
So the next problem is which components to drop. I check the opt folder,
and I think the filesystem components and metrics components could be moved
out. Because they are pluggable components and is only used in scenario 1 I
think (setting up standalone cluster). Other components like flink-table,
flink-ml, flnk-gellay, we should still keep them IMHO, because new user may
still use it to try the features of flink. For me, scala-shell is the first
option to try new features of flink.



Fabian Hueske  于2019年1月18日周五 下午7:34写道:

> Hi Chesnay,
>
> Thank you for the proposal.
> I think this is a good idea.
> We follow a similar approach already for Hadoop dependencies and
> connectors (although in application space).
>
> +1
>
> Fabian
>
> Am Fr., 18. Jan. 2019 um 10:59 Uhr schrieb Chesnay Schepler <
> ches...@apache.org>:
>
>> Hello,
>>
>> the binary distribution that we release by now contains quite a lot of
>> optional components, including various filesystems, metric reporters and
>> libraries. Most users will only use a fraction of these, and as such
>> pretty much only increase the size of flink-dist.
>>
>> With Flink growing more and more in scope I don't believe it to be
>> feasible to ship everything we have with every distribution, and instead
>> suggest more of a "pick-what-you-need" model, where flink-dist is rather
>> lean and additional components are downloaded separately and added by
>> the user.
>>
>> This would primarily affect the /opt directory, but could also be
>> extended to cover flink-dist. For example, the yarn and mesos code could
>> be spliced out into separate jars that could be added to lib manually.
>>
>> Let me know what you think.
>>
>> Regards,
>>
>> Chesnay
>>
>>

-- 
Best Regards

Jeff Zhang


Re: Is there a way to get all flink build-in SQL functions

2019-01-25 Thread Jeff Zhang
I believe it make sense to list available udf programmatically. e.g. Users
may want to see available udfs in sql-client. It would also benefit other
downstream project that use flink sql. Besides that I think flink should
also provide api for querying the description of udf about how to use it.

yinhua.dai  于2019年1月25日周五 下午5:12写道:

> Thanks Guys.
> I just wondering if there is another way except hard code the list:)
> Thanks anyway.
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


-- 
Best Regards

Jeff Zhang


Re: Flink 1.7 Notebook Environment

2019-02-10 Thread Jeff Zhang
Hi Faizan,

I have implemented one flink interpreter for blink which is donated by
alibaba to flink community recently. Maybe you notice this news recently.

Here's some tutorials which you may be interested.

https://flink-china.org/doc/blink/ops/zeppelin.html
https://flink-china.org/doc/blink/quickstart/zeppelin_quickstart.html

And here's the code base: https://github.com/zjffdu/zeppelin/tree/blink_poc


Faizan Ahmed  于2019年2月11日周一 上午11:44写道:

> Hi all,
> I have been searching around quite a bit and doing my own experiments to
> make the latest Flink release 1.7.1 to work with Apache Zeppelin however
> Apache Zeppelin's Flink interpreter is quite outdated (Flink 1.3). AFAIK
> its not possible to use Flink running on YARN via Zeppelin as it only works
> with a local cluster.
>
> Has anyone been able to run Flink's latest release on Zeppelin? If yes
> then please share some instructions/tutorial. If no then is there any other
> suitable notebook environment for running Flink (maybe Jupyter)? I want to
> prototype my ideas in Flink and since I'm coming from Spark background it
> would be really useful to have notebook environment for vaildation of flink
> apps.
>
> Looking forward to your response
>
> Thanks
>


-- 
Best Regards

Jeff Zhang


Re: [ANNOUNCE] New Flink PMC member Thomas Weise

2019-02-12 Thread Jeff Zhang
Congrats Thomas !

Fabian Hueske  于2019年2月12日周二 下午5:59写道:

> Hi everyone,
>
> On behalf of the Flink PMC I am happy to announce Thomas Weise as a new
> member of the Apache Flink PMC.
>
> Thomas is a long time contributor and member of our community.
> He is starting and participating in lots of discussions on our mailing
> lists, working on topics that are of joint interest of Flink and Beam, and
> giving talks on Flink at many events.
>
> Please join me in welcoming and congratulating Thomas!
>
> Best,
> Fabian
>


-- 
Best Regards

Jeff Zhang


Re: [DISCUSS] Adding a mid-term roadmap to the Flink website

2019-02-14 Thread Jeff Zhang
cific
>>>> timeline. Instead, we share our vision for the future and major initiatives
>>>> that are receiving attention and give users and contributors an
>>>> understanding what they can look forward to.
>>>>
>>>> *Future Role of Table API and DataStream API*
>>>>   - Table API becomes first class citizen
>>>>   - Table API becomes primary API for analytics use cases
>>>>   * Declarative, automatic optimizations
>>>>   * No manual control over state and timers
>>>>   - DataStream API becomes primary API for applications and data
>>>> pipeline use cases
>>>>   * Physical, user controls data types, no magic or optimizer
>>>>   * Explicit control over state and time
>>>>
>>>> *Batch Streaming Unification*
>>>>   - Table API unification (environments) (FLIP-32)
>>>>   - New unified source interface (FLIP-27)
>>>>   - Runtime operator unification & code reuse between DataStream / Table
>>>>   - Extending Table API to make it convenient API for all analytical
>>>> use cases (easier mix in of UDFs)
>>>>   - Same join operators on bounded/unbounded Table API and DataStream
>>>> API
>>>>
>>>> *Faster Batch (Bounded Streams)*
>>>>   - Much of this comes via Blink contribution/merging
>>>>   - Fine-grained Fault Tolerance on bounded data (Table API)
>>>>   - Batch Scheduling on bounded data (Table API)
>>>>   - External Shuffle Services Support on bounded streams
>>>>   - Caching of intermediate results on bounded data (Table API)
>>>>   - Extending DataStream API to explicitly model bounded streams (API
>>>> breaking)
>>>>   - Add fine fault tolerance, scheduling, caching also to DataStream API
>>>>
>>>> *Streaming State Evolution*
>>>>   - Let all built-in serializers support stable evolution
>>>>   - First class support for other evolvable formats (Protobuf, Thrift)
>>>>   - Savepoint input/output format to modify / adjust savepoints
>>>>
>>>> *Simpler Event Time Handling*
>>>>   - Event Time Alignment in Sources
>>>>   - Simpler out-of-the box support in sources
>>>>
>>>> *Checkpointing*
>>>>   - Consistency of Side Effects: suspend / end with savepoint (FLIP-34)
>>>>   - Failed checkpoints explicitly aborted on TaskManagers (not only on
>>>> coordinator)
>>>>
>>>> *Automatic scaling (adjusting parallelism)*
>>>>   - Reactive scaling
>>>>   - Active scaling policies
>>>>
>>>> *Kubernetes Integration*
>>>>   - Active Kubernetes Integration (Flink actively manages containers)
>>>>
>>>> *SQL Ecosystem*
>>>>   - Extended Metadata Stores / Catalog / Schema Registries support
>>>>   - DDL support
>>>>   - Integration with Hive Ecosystem
>>>>
>>>> *Simpler Handling of Dependencies*
>>>>   - Scala in the APIs, but not in the core (hide in separate class
>>>> loader)
>>>>   - Hadoop-free by default
>>>>
>>>>

-- 
Best Regards

Jeff Zhang


Install 1.7.2 on EC2 - No task slots - 2019

2019-03-23 Thread Jeff Crane
When downloading the latest 1.7.2 and extracting to (a free) AMZ EC2, the 
daemon (./bin/start-cluster.sh) reports 0 task managers, 0 task slots, and 0 
avails, out of the box. All jobs fail. All ports are open to all traffic. 
Can anyone tell me what I missed?


Re: Install 1.7.2 on EC2 - No task slots - 2019

2019-03-26 Thread Jeff Crane
of main cluster component stdout file: 
/home/ec2-user/flink-1.7.2/log/flink-ec2-user-standalonesession-0-ip-10-0-0-204.us-west-1.compute.internal.out2019-03-26
 15:43:15,504 INFO  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint  
  - Rest endpoint listening at localhost:80812019-03-26 15:43:15,504 INFO  
org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint    - 
http://localhost:8081 was granted leadership with 
leaderSessionID=----2019-03-26 15:43:15,504 
INFO  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint    - Web 
frontend listening at http://localhost:8081.2019-03-26 15:43:15,670 INFO  
org.apache.flink.runtime.rpc.akka.AkkaRpcService              - Starting RPC 
endpoint for org.apache.flink.runtime.resourcemanager.StandaloneResourceManager 
at akka://flink/user/resourcemanager .2019-03-26 15:43:15,713 INFO  
org.apache.flink.runtime.rpc.akka.AkkaRpcService              - Starting RPC 
endpoint for org.apache.flink.runtime.dispatcher.StandaloneDispatcher at 
akka://flink/user/dispatcher .2019-03-26 15:43:15,744 INFO  
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - 
ResourceManager akka.tcp://flink@localhost:6123/user/resourcemanager was 
granted leadership with fencing token 
2019-03-26 15:43:15,745 INFO  
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Starting 
the SlotManager.2019-03-26 15:43:15,781 INFO  
org.apache.flink.runtime.dispatcher.StandaloneDispatcher      - Dispatcher 
akka.tcp://flink@localhost:6123/user/dispatcher was granted leadership with 
fencing token ----2019-03-26 15:43:15,783 INFO  
org.apache.flink.runtime.dispatcher.StandaloneDispatcher      - Recovering all 
persisted jobs.

--On Monday, March 25, 2019, 1:23:45 AM PDT, Konstantin Knauf 
 wrote:

Hi Jeff, 

do you see any log files in the log directory of your Flink installation 
directory? If so, please share them. 

Cheers, 

Konstantin  

BucketAssigner - Confusion

2019-04-01 Thread Jeff Crane
I have had an issue understanding the documentation, in regard to 
BucketAssigner.BucketID getBucketId(IN element,
 BucketAssigner.Context 
context)SimpleVersionedSerializer getSerializer()First of all, I 
don't understand what type of "BucketID" means. I assume that's the returned 
type fo the getBucketID, which doesn't make sense. The description says 
getBucketId (returns?)  "A string representing the identifier of the bucket" So 
BucketID is not a type, it's always a string?Base on the docs, I implemented 
like this, which doesn't write anything!public final class CustomBucketAssigner 
implements BucketAssigner {

public String getBucketId(final MyEvent element, final Context context) {

DateTime dateTimeL = new DateTime(context.currentWatermark());

return String.join("_",
String.valueOf(dateTimeL.getYear()),
String.valueOf(dateTimeL.getMonthOfYear()),
String.valueOf(dateTimeL.getDayOfMonth()),
String.valueOf(dateTimeL.getHourOfDay()),
String.valueOf(dateTimeL.getMinuteOfHour())
);
}

// I assume  because BucketID is always string?
public SimpleVersionedSerializer getSerializer() {
return SimpleVersionedStringSerializer.INSTANCE;
}}
Can someone explain how bucketAssigned is supposed to do in plainer english. I 
don't think the docs are clear and I'm lost.

Re: BucketAssigner - Confusion

2019-04-02 Thread Jeff Crane
 According to my IDE (Jetbrains), I get an error with getBucketID(IN, Context) 
signature requiring a return of string (Flink 1.7 libs), so I still don't think 
the BucketID is a variable type.

I still don't understand the role of the:public 
SimpleVersionedSerializer getSerializer() {
return SimpleVersionedStringSerializer.INSTANCE;
}Where does that come into play, if the getBucketID makes a string anyway?



On Monday, April 1, 2019, 11:44:14 AM PDT, Jeff Crane 
 wrote:  
 
 I have had an issue understanding the documentation, in regard to 
BucketAssigner.BucketID getBucketId(IN element,
 BucketAssigner.Context 
context)SimpleVersionedSerializer getSerializer()First of all, I 
don't understand what type of "BucketID" means. I assume that's the returned 
type fo the getBucketID, which doesn't make sense. The description says 
getBucketId (returns?)  "A string representing the identifier of the bucket" So 
BucketID is not a type, it's always a string?Base on the docs, I implemented 
like this, which doesn't write anything!public final class CustomBucketAssigner 
implements BucketAssigner {

public String getBucketId(final MyEvent element, final Context context) {

DateTime dateTimeL = new DateTime(context.currentWatermark());

return String.join("_",
String.valueOf(dateTimeL.getYear()),
String.valueOf(dateTimeL.getMonthOfYear()),
String.valueOf(dateTimeL.getDayOfMonth()),
String.valueOf(dateTimeL.getHourOfDay()),
String.valueOf(dateTimeL.getMinuteOfHour())
);
}

// I assume  because BucketID is always string?
public SimpleVersionedSerializer getSerializer() {
return SimpleVersionedStringSerializer.INSTANCE;
}}
Can someone explain how bucketAssigned is supposed to do in plainer english. I 
don't think the docs are clear and I'm lost.  

[Discuss] Add JobListener (hook) in flink job lifecycle

2019-04-17 Thread Jeff Zhang
Hi All,

I created FLINK-12214 <https://issues.apache.org/jira/browse/FLINK-12214> for
adding JobListener (hook) in flink job lifecycle. Since this is a new
public api for flink, so I'd like to discuss it more widely in community to
get more feedback.

The background and motivation is that I am integrating flink into apache
zeppelin <http://zeppelin.apache.org/>(which is a notebook in case you
don't know). And I'd like to capture some job context (like jobId) in the
lifecycle of flink job (submission, executed, cancelled) so that I can
manipulate job in more fined grained control (e.g. I can capture the jobId
when job is submitted, and then associate it with one paragraph, and when
user click the cancel button, I can call the flink cancel api to cancel
this job)

I believe other projects which integrate flink would need similar
mechanism. I plan to add api addJobListener in
ExecutionEnvironment/StreamExecutionEnvironment so that user can add
customized hook in flink job lifecycle.

Here's draft interface JobListener.

public interface JobListener {

void onJobSubmitted(JobID jobId);

void onJobExecuted(JobExecutionResult jobResult);

void onJobCanceled(JobID jobId, String savepointPath);
}

Let me know your comment and concern, thanks.


-- 
Best Regards

Jeff Zhang


Re: [Discuss] Add JobListener (hook) in flink job lifecycle

2019-04-19 Thread Jeff Zhang
>>>  The ExecutionEnvironment is usually used by the user who writes the
code and this person (I assume) would not be really interested in these
callbacks.

Usually ExecutionEnvironment is used by the user who write the code, but it
doesn't needs to be created and configured by this person. e.g. in Zeppelin
notebook, ExecutionEnvironment is created by Zeppelin, user just use
ExecutionEnvironment to write flink program.  You are right that the end
user would not be interested in these callback, but the third party library
that integrate with zeppelin would be interested in these callbacks.

>>> In your case, it could be sufficient to offer some hooks for the
ClusterClient or being able to provide a custom ClusterClient.

Actually in my initial PR (https://github.com/apache/flink/pull/8190), I do
pass JobListener to ClusterClient and invoke it there.
But IMHO, ClusterClient is not supposed be a public api for users. Instead
JobClient is the public api that user should use to control job. So adding
hooks to ClusterClient directly and provide a custom ClusterClient doesn't
make sense to me. IIUC, you are suggesting the following approach
 env.getClusterClient().addJobListener(jobListener)
but I don't see its benefit compared to this.
 env.addJobListener(jobListener)

Overall, I think adding hooks is orthogonal with fine grained job control.
And I agree that we should refactor the flink client component, but I don't
think it would affect the JobListener interface. What do you think ?




Till Rohrmann  于2019年4月18日周四 下午8:57写道:

> Thanks for starting this discussion Jeff. I can see the need for
> additional hooks for third party integrations.
>
> The thing I'm wondering is whether we really need/want to expose a
> JobListener via the ExecutionEnvironment. The ExecutionEnvironment is
> usually used by the user who writes the code and this person (I assume)
> would not be really interested in these callbacks. If he would, then one
> should rather think about a better programmatic job control where the
> `ExecutionEnvironment#execute` call returns a `JobClient` instance.
> Moreover, we would effectively make this part of the public API and every
> implementation would need to offer it.
>
> In your case, it could be sufficient to offer some hooks for the
> ClusterClient or being able to provide a custom ClusterClient. The
> ClusterClient is the component responsible for the job submission and
> retrieval of the job result and, hence, would be able to signal when a job
> has been submitted or completed.
>
> Cheers,
> Till
>
> On Thu, Apr 18, 2019 at 8:57 AM vino yang  wrote:
>
>> Hi Jeff,
>>
>> I personally like this proposal. From the perspective of programmability,
>> the JobListener can make the third program more appreciable.
>>
>> The scene where I need the listener is the Flink cube engine for Apache
>> Kylin. In the case, the Flink job program is embedded into the Kylin's
>> executable context.
>>
>> If we could have this listener, it would be easier to integrate with
>> Kylin.
>>
>> Best,
>> Vino
>>
>> Jeff Zhang  于2019年4月18日周四 下午1:30写道:
>>
>>>
>>> Hi All,
>>>
>>> I created FLINK-12214
>>> <https://issues.apache.org/jira/browse/FLINK-12214> for adding
>>> JobListener (hook) in flink job lifecycle. Since this is a new public api
>>> for flink, so I'd like to discuss it more widely in community to get more
>>> feedback.
>>>
>>> The background and motivation is that I am integrating flink into apache
>>> zeppelin <http://zeppelin.apache.org/>(which is a notebook in case you
>>> don't know). And I'd like to capture some job context (like jobId) in the
>>> lifecycle of flink job (submission, executed, cancelled) so that I can
>>> manipulate job in more fined grained control (e.g. I can capture the jobId
>>> when job is submitted, and then associate it with one paragraph, and when
>>> user click the cancel button, I can call the flink cancel api to cancel
>>> this job)
>>>
>>> I believe other projects which integrate flink would need similar
>>> mechanism. I plan to add api addJobListener in
>>> ExecutionEnvironment/StreamExecutionEnvironment so that user can add
>>> customized hook in flink job lifecycle.
>>>
>>> Here's draft interface JobListener.
>>>
>>> public interface JobListener {
>>>
>>> void onJobSubmitted(JobID jobId);
>>>
>>> void onJobExecuted(JobExecutionResult jobResult);
>>>
>>> void onJobCanceled(JobID jobId, String savepointPath);
>>> }
>>>
>>> Let me know your comment and concern, thanks.
>>>
>>>
>>> --
>>> Best Regards
>>>
>>> Jeff Zhang
>>>
>>

-- 
Best Regards

Jeff Zhang


Re: [Discuss] Add JobListener (hook) in flink job lifecycle

2019-04-23 Thread Jeff Zhang
Hi Till,

IMHO, allow adding hooks involves 2 steps.
1. Provide hook interface, and call these hook in flink (ClusterClient) at
the right place. This should be done by framework (flink)
2. Implement new hook implementation and add/register them into
framework(flink)

What I am doing is step 1 which should be done by flink, step 2 is done by
users. But IIUC, your suggestion of using custom ClusterClient seems mixing
these 2 steps together. Say I'd like to add new hooks, I have to implement
a new custom ClusterClient, add new hooks and call them in the custom
ClusterClient at the right place.
This doesn't make sense to me. For a user who want to add hooks, he is not
supposed to understand the mechanism of ClusterClient, and should not touch
ClusterClient. What do you think ?




Till Rohrmann  于2019年4月23日周二 下午4:24写道:

> I think we should not expose the ClusterClient configuration via the
> ExecutionEnvironment (env.getClusterClient().addJobListener) because this
> is effectively the same as exposing the JobListener interface directly on
> the ExecutionEnvironment. Instead I think it could be possible to provide a
> ClusterClient factory which is picked up from the Configuration or some
> other mechanism for example. That way it would not need to be exposed via
> the ExecutionEnvironment at all.
>
> Cheers,
> Till
>
> On Fri, Apr 19, 2019 at 11:12 AM Jeff Zhang  wrote:
>
>> >>>  The ExecutionEnvironment is usually used by the user who writes the
>> code and this person (I assume) would not be really interested in these
>> callbacks.
>>
>> Usually ExecutionEnvironment is used by the user who write the code, but
>> it doesn't needs to be created and configured by this person. e.g. in
>> Zeppelin notebook, ExecutionEnvironment is created by Zeppelin, user just
>> use ExecutionEnvironment to write flink program.  You are right that the
>> end user would not be interested in these callback, but the third party
>> library that integrate with zeppelin would be interested in these callbacks.
>>
>> >>> In your case, it could be sufficient to offer some hooks for the
>> ClusterClient or being able to provide a custom ClusterClient.
>>
>> Actually in my initial PR (https://github.com/apache/flink/pull/8190), I
>> do pass JobListener to ClusterClient and invoke it there.
>> But IMHO, ClusterClient is not supposed be a public api for users.
>> Instead JobClient is the public api that user should use to control job. So
>> adding hooks to ClusterClient directly and provide a custom ClusterClient
>> doesn't make sense to me. IIUC, you are suggesting the following approach
>>  env.getClusterClient().addJobListener(jobListener)
>> but I don't see its benefit compared to this.
>>  env.addJobListener(jobListener)
>>
>> Overall, I think adding hooks is orthogonal with fine grained job
>> control. And I agree that we should refactor the flink client component,
>> but I don't think it would affect the JobListener interface. What do you
>> think ?
>>
>>
>>
>>
>> Till Rohrmann  于2019年4月18日周四 下午8:57写道:
>>
>>> Thanks for starting this discussion Jeff. I can see the need for
>>> additional hooks for third party integrations.
>>>
>>> The thing I'm wondering is whether we really need/want to expose a
>>> JobListener via the ExecutionEnvironment. The ExecutionEnvironment is
>>> usually used by the user who writes the code and this person (I assume)
>>> would not be really interested in these callbacks. If he would, then one
>>> should rather think about a better programmatic job control where the
>>> `ExecutionEnvironment#execute` call returns a `JobClient` instance.
>>> Moreover, we would effectively make this part of the public API and every
>>> implementation would need to offer it.
>>>
>>> In your case, it could be sufficient to offer some hooks for the
>>> ClusterClient or being able to provide a custom ClusterClient. The
>>> ClusterClient is the component responsible for the job submission and
>>> retrieval of the job result and, hence, would be able to signal when a job
>>> has been submitted or completed.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Thu, Apr 18, 2019 at 8:57 AM vino yang  wrote:
>>>
>>>> Hi Jeff,
>>>>
>>>> I personally like this proposal. From the perspective of
>>>> programmability, the JobListener can make the third program more
>>>> appreciable.
>>>>
>>>> The scene where I need the listener is the Flink cube engine for Apache
>>

Re: [Discuss] Add JobListener (hook) in flink job lifecycle

2019-04-25 Thread Jeff Zhang
Hi  Beckett,

Thanks for your feedback, See my comments inline

>>>  How do user specify the listener? *
What I proposal is to register JobListener in ExecutionEnvironment. I don't
think we should make ClusterClient as public api.

>>> Where should the listener run? *
I don't think it is proper to run listener in JobMaster. The listener is
user code, and usually it is depends on user's other component. So running
it in client side make more sense to me.

>>> What should be reported to the Listener? *
I am open to add other api in this JobListener. But for now, I am afraid
the ExecutionEnvironment is not aware of failover, so it is not possible to
report failover event.

>>> What can the listeners do on notifications? *
Do you mean to pass JobGraph to these methods ? like following ( I am
afraid JobGraph is not a public and stable api, we should not expose it to
users)

public interface JobListener {

void onJobSubmitted(JobGraph graph, JobID jobId);

void onJobExecuted(JobGraph graph, JobExecutionResult jobResult);

void onJobCanceled(JobGraph graph, JobID jobId, String savepointPath);
}


Becket Qin  于2019年4月25日周四 下午7:40写道:

> Thanks for the proposal, Jeff. Adding a listener to allow users handle
> events during the job lifecycle makes a lot of sense to me.
>
> Here are my two cents.
>
> * How do user specify the listener? *
> It is not quite clear to me whether we consider ClusterClient as a public
> interface? From what I understand ClusterClient is not a public interface
> right now. In contrast, ExecutionEnvironment is the de facto interface for
> administrative work. After job submission, it is essentially bound to a job
> as an administrative handle. Given this current state, personally I feel
> acceptable to have the listener registered to the ExecutionEnvironment.
>
> * Where should the listener run? *
> If the listener runs on the client side, the client have to be always
> connected to the Flink cluster. This does not quite work if the Job is a
> streaming job. Should we provide the option to run the listener in
> JobMaster as well?
>
> * What should be reported to the Listener? *
> Besides the proposed APIs, does it make sense to also report events such
> as failover?
>
> * What can the listeners do on notifications? *
> If the listeners are expected to do anything on the job, should some
> helper class to manipulate the jobs be passed to the listener method?
> Otherwise users may not be able to easily take action.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
>
>
>
> On Wed, Apr 24, 2019 at 2:43 PM Jeff Zhang  wrote:
>
>> Hi Till,
>>
>> IMHO, allow adding hooks involves 2 steps.
>> 1. Provide hook interface, and call these hook in flink (ClusterClient)
>> at the right place. This should be done by framework (flink)
>> 2. Implement new hook implementation and add/register them into
>> framework(flink)
>>
>> What I am doing is step 1 which should be done by flink, step 2 is done
>> by users. But IIUC, your suggestion of using custom ClusterClient seems
>> mixing these 2 steps together. Say I'd like to add new hooks, I have to
>> implement a new custom ClusterClient, add new hooks and call them in the
>> custom ClusterClient at the right place.
>> This doesn't make sense to me. For a user who want to add hooks, he is
>> not supposed to understand the mechanism of ClusterClient, and should not
>> touch ClusterClient. What do you think ?
>>
>>
>>
>>
>> Till Rohrmann  于2019年4月23日周二 下午4:24写道:
>>
>>> I think we should not expose the ClusterClient configuration via the
>>> ExecutionEnvironment (env.getClusterClient().addJobListener) because this
>>> is effectively the same as exposing the JobListener interface directly on
>>> the ExecutionEnvironment. Instead I think it could be possible to provide a
>>> ClusterClient factory which is picked up from the Configuration or some
>>> other mechanism for example. That way it would not need to be exposed via
>>> the ExecutionEnvironment at all.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Fri, Apr 19, 2019 at 11:12 AM Jeff Zhang  wrote:
>>>
>>>> >>>  The ExecutionEnvironment is usually used by the user who writes
>>>> the code and this person (I assume) would not be really interested in these
>>>> callbacks.
>>>>
>>>> Usually ExecutionEnvironment is used by the user who write the code,
>>>> but it doesn't needs to be created and configured by this person. e.g. in
>>>> Zeppelin notebook, ExecutionEnvironment is created by Zeppelin, user just
>>>>

Re: Zeppelin

2019-04-25 Thread Jeff Zhang
Thanks Dawid,

Hi Sergey,

I am working on update the flink interpreter of zeppelin to support flink
1.9 (supposed to be released this summer).
For the current flink interpreter of zeppelin 0.9, I haven't verified it
against flink 1.8. could you show the full interpreter log ? And what is
the size your input file ?



Dawid Wysakowicz  于2019年4月25日周四 下午6:31写道:

> Hi Sergey,
>
> I am not very familiar with Zepellin. But I know Jeff (cc'ed) is working
> on integrating Flink with some notebooks. He might be able to help you.
>
> Best,
>
> Dawid
> On 25/04/2019 08:42, Smirnov Sergey Vladimirovich (39833) wrote:
>
> Hello,
>
>
>
> Trying to link Zeppelin 0.9 with Flink 1.8. It`s a small dev cluster
> deployed in standalone manner.
>
> Got the same error as described here
> https://stackoverflow.com/questions/54257671/runnning-a-job-in-apache-flink-standalone-mode-on-zeppelin-i-have-this-error-to
>
> Would appreciate for any support for helping to resolve that problem.
>
>
>
> Regards,
>
> Sergey
>
>
>
>

-- 
Best Regards

Jeff Zhang


Re: Ask about running Flink sql-client.sh

2019-05-01 Thread Jeff Zhang
Try ./sql-client.sh embedded



Rad Rad  于2019年5月1日周三 下午8:28写道:

>
> Hi
> I would ask about the command for running sql-client.sh
>
> These  commands don't work
> ./sql-client.sh OR ./flink sql-client
>
> Regards.
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


-- 
Best Regards

Jeff Zhang


What does flink session mean ?

2019-06-02 Thread Jeff Zhang
Hi Folks,


When I read the flink client api code, the concept of session is a little
vague and unclear to me. It looks like the session concept is only applied
in batch mode (I only see it in ExecutionEnvironment but not in
StreamExecutionEnvironment). But for local mode
(LocalExecutionEnvironment), starting one new session is starting one new
MiniCluster, but in remote mode (RemoteExecutionEnvironment), starting one
new session is just starting one new ClusterClient instead of one new
cluster. So I am confused what does flink session really mean. Could anyone
help me understand this ? Thanks.




-- 
Best Regards

Jeff Zhang


Re: What does flink session mean ?

2019-06-04 Thread Jeff Zhang
Thanks for the reply, @Till Rohrmann .  Regarding
reuse computed results. I think JM keep all the metadata of intermediate
data, and interactive programming is also trying to reuse computed results.
It looks like it may not be necessary to introduce the session concept as
long as we can achieve reusing computed results. Let me if I understand it
correctly.



Till Rohrmann  于2019年6月4日周二 下午4:03写道:

> Hi Jeff,
>
> the session functionality which you find in Flink's client are the
> remnants of an uncompleted feature which was abandoned. The idea was that
> one could submit multiple parts of a job to the same cluster where these
> parts are added to the same ExecutionGraph. That way we wanted to allow to
> reuse computed results when using a notebook for ad-hoc queries, for
> example. But as I said, this feature has never been completed.
>
> Cheers,
> Till
>
> On Sun, Jun 2, 2019 at 3:20 PM Jeff Zhang  wrote:
>
>>
>> Hi Folks,
>>
>>
>> When I read the flink client api code, the concept of session is a little
>> vague and unclear to me. It looks like the session concept is only applied
>> in batch mode (I only see it in ExecutionEnvironment but not in
>> StreamExecutionEnvironment). But for local mode
>> (LocalExecutionEnvironment), starting one new session is starting one new
>> MiniCluster, but in remote mode (RemoteExecutionEnvironment), starting one
>> new session is just starting one new ClusterClient instead of one new
>> cluster. So I am confused what does flink session really mean. Could anyone
>> help me understand this ? Thanks.
>>
>>
>>
>>
>> --
>> Best Regards
>>
>> Jeff Zhang
>>
>

-- 
Best Regards

Jeff Zhang


Re: [DISCUSS] Deprecate previous Python APIs

2019-06-11 Thread Jeff Zhang
+1

Stephan Ewen  于2019年6月11日周二 下午9:30写道:

> Hi all!
>
> I would suggest to deprecating the existing python APIs for DataSet and
> DataStream API with the 1.9 release.
>
> Background is that there is a new Python API under development.
> The new Python API is initially against the Table API. Flink 1.9 will
> support Table API programs without UDFs, 1.10 is planned to support UDFs.
> Future versions would support also the DataStream API.
>
> In the long term, Flink should have one Python API for DataStream and
> Table APIs. We should not maintain multiple different implementations and
> confuse users that way.
> Given that the existing Python APIs are a bit limited and not under active
> development, I would suggest to deprecate them in favor of the new API.
>
> Best,
> Stephan
>
>

-- 
Best Regards

Jeff Zhang


Re: Building some specific modules in flink

2019-06-24 Thread Jeff Zhang
You need to specify flink-dist in -pl. Module flink-dist will build the
flink binary distribution.

syed  于2019年6月25日周二 上午9:14写道:

> Hi;
> I am trying to modify some core functionalities of flink for my through
> understanding about flink.  I already build the flink from source, now I am
> looking to build only a few modules which I have modified. Is this
> possible,
> or every time I have to build the flink in full (all modules). As it takes
> me about 30-35 minutes to build the flink in full.
>
> Specifically, I have modified some classes in *flink-streaming-java* and
> *flink-runtime* modules. I am looking to build only these two modules and
> integrate these into already build flink (all modules). I alrady tried
> using
> –pl option using mvn, it installs these modules but changes are not updated
> in already build binaries.
> Please guide me how can I do this.
> Kind regards;
> syed
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


-- 
Best Regards

Jeff Zhang


Re: Apache Flink - Running application with different flink configurations (flink-conf.yml) on EMR

2019-06-28 Thread Jeff Zhang
This is due to flink doesn't unify the execution in different enviroments.
The community has discuss it before about how to enhance the flink client
api. The initial proposal is to introduce FlinkConf which contains all the
configuration so that we can unify the executions in all environments (IDE,
CLI, SQL Client, Scala Shell, downstream project)

Here's the sample code:

val conf = new FlinkConf().setProperty(“key_1”, “value_1”) // create
FlinkConf

val env = new ExecutionEnvironment(conf)   // create ExecutionEnvironment

val jobId = env.submit(...)   // non-blocking job submission
(detached mode)

val jobStatus = env.getClusterClient().queryJobStatus(jobId)   // approach
1: query job status via ClusterClient

val jobStatus = env.queryJobStatus(jobId)   // approach 2: query job status
via ExecutionEnvironment.


And you can refer this for more details:

https://docs.google.com/document/d/1VavBrYn8vJeZs-Mhu5VzKO6xrWCF40aY0nlQ_UVVTRg/edit?usp=sharing




Xintong Song  于2019年6月28日周五 下午10:28写道:

> Hi, Singh,
>
> I don't think that should work. The -D or -yD parameters needs to be
> passed to the Flink start-up scripts or the "flink run" command. I don't
> think the IntelliJ VM arguments are equivalent to that. In fact, I'm not
> aware of any method to set "-D" parameters when running jobs IDE.
>
> Thank you~
>
> Xintong Song
>
>
>
> On Fri, Jun 28, 2019 at 8:45 PM M Singh  wrote:
>
>> Hi Xintong:
>>
>> I passed the -Dparallelism.default=2 in the  run configuration VM
>> arguments for IntelliJ.
>>
>> So what I am looking for is a way to overwrite the config parameters
>> which are defined in the flink-config.yaml file (parallelism.default is
>> just an example) which would be picked up regardless of the env (eg:
>> locally, on yarn or IDE).  When I run the application in IDE (locally) with
>> the above mentioned VM parameter, the StreamExecutionEnvironment.config
>> does not show this value and the Flink UI shows configuration parameter
>> parallelism as 8.  Is there any other place where I can see the parameter
>> settings ?
>>
>> Thanks.
>>
>> On Thursday, June 27, 2019, 11:58:28 PM EDT, Xintong Song <
>> tonysong...@gmail.com> wrote:
>>
>>
>> Could you provide some more details on how you run your job with -D
>> options in IDE?
>>
>>
>> Thank you~
>>
>> Xintong Song
>>
>>
>>
>> On Fri, Jun 28, 2019 at 5:03 AM M Singh  wrote:
>>
>> Hi Xintong:  Thanks for your pointers.
>>
>> I tried -Dparallelism.default=2 locally (in IDE) and it did not work.  Do
>> you know if there is a common way that would work both for emr, locally and
>> ide ?
>>
>> Thanks again.
>>
>> On Thursday, June 27, 2019, 12:03:06 AM EDT, Xintong Song <
>> tonysong...@gmail.com> wrote:
>>
>>
>> Hi Singh,
>>
>> You can use the environment variable "FLINK_CONF_DIR" to specify path to
>> the directory of config files. You can also override config options with
>> command line arguments prefixed -D (for yarn-session.sh) or -yD (for 'flink
>> run' command).
>>
>> Thank you~
>>
>> Xintong Song
>>
>>
>>
>> On Wed, Jun 26, 2019 at 9:13 PM M Singh  wrote:
>>
>> Hi:
>>
>> I have a single EMR cluster with Flink and want to run multiple
>> applications on it with different flink configurations.  Is there a way to
>>
>> 1. Pass the config file name for each application, or
>> 2. Overwrite the config parameters via command line arguments for the
>> application.  This is similar to how we can overwrite the default
>> parameters in spark
>>
>> I searched the documents and have tried using ParameterTool with the
>> config parameter names, but it has not worked as yet.
>>
>> Thanks for your help.
>>
>> Mans
>>
>>

-- 
Best Regards

Jeff Zhang


Re: [ANNOUNCE] Rong Rong becomes a Flink committer

2019-07-11 Thread Jeff Zhang
Congrats, Rong!


vino yang  于2019年7月12日周五 上午10:08写道:

> congratulations Rong Rong!
>
> Fabian Hueske  于2019年7月11日周四 下午10:25写道:
>
>> Hi everyone,
>>
>> I'm very happy to announce that Rong Rong accepted the offer of the Flink
>> PMC to become a committer of the Flink project.
>>
>> Rong has been contributing to Flink for many years, mainly working on SQL
>> and Yarn security features. He's also frequently helping out on the
>> user@f.a.o mailing lists.
>>
>> Congratulations Rong!
>>
>> Best, Fabian
>> (on behalf of the Flink PMC)
>>
>

-- 
Best Regards

Jeff Zhang


Re: user test can't run flink-1.8.1 wordcount

2019-07-12 Thread Jeff Zhang
You need to provide more details( like how it doesn't work), otherwise it
is difficult for people to help you.



&#38;#38;#38;#38;#38;#10084; <799326...@qq.com> 于2019年7月12日周五 下午3:52写道:

> Dear all,
> I got some issues about flink that Flink-1.8.1’s HA cluster to execute
> test cases on the cli client to start the users of outside the cluster. For
> instance, the command “flink run WordCounter.jar”  it’s doesn’s work. So,
> could you give me some successful examples, please.
>
>
> Thanks!
>


-- 
Best Regards

Jeff Zhang


Re: Jython support for Flink

2019-07-19 Thread Jeff Zhang
Hi Dante,

Flink 1.9 support python api, which may be what you want. See
https://ci.apache.org/projects/flink/flink-docs-master/tutorials/python_table_api.html


Dante Van den Broeke  于2019年7月19日周五 下午10:40写道:

> Dear,
>
>
> I'm a student currently working on a project involving apache kafka and
> flink. The project itself is revolved around path prediction and machine
> learning for websites. To test a prove of concept I setup a kafka server
> locally (goal is to expend this to a google cloud server or similar later)
> and a kafka producer (written in java intelliJ idea project). The producer
> would send JSON data (currently just a local file but later json data from
> the website itself) to a flink-kafka connection and the data transformation
> (key-windowed by user id) would than happen in the flink framework.
>
>
> The problem i'm facing however is that i wrote all the algorithms for
> transformation of the data in python and i'm struggling with initializing a
> jython environment to setup the flink-kafka connection.
>
> I was wondering whether or not there is a working example for this setup /
> some documentation regarding the framework as i'm struggling to find a lot
> of documentation for my application online.
>
>
> thanks in advance.
>
>
> kind regards,
>
> Dante Van den Broeke
>
>

-- 
Best Regards

Jeff Zhang


Re: [SURVEY] How many people implement Flink job based on the interface Program?

2019-07-22 Thread Jeff Zhang
Hi Flavio,

Based on the discussion in the tickets you mentioned above, the
program-class attribute was a mistake and community is intended to use
main-class to replace it.

Deprecating Program interface is a part of work of flink new client api.
IIUC, your requirements are not so complicated. We can implement that in
the new flink client api. How about listing your requirement, and let's
discuss how we can make it in the new flink client api. BTW, I guess most
of your requirements are based on your flink job server, It would be
helpful if you could provide more info about your flink job server. Thanks



Flavio Pompermaier  于2019年7月22日周一 下午8:59写道:

> Hi Tison,
> we use a modified version of the Program interface to enable a web UI do
> properly detect and run Flink jobs contained in a jar + their parameters.
> As stated in [1], we dected multiple Main classes per jar by handling an
> extra comma-separeted Manifest entry (i.e. 'Main-classes').
>
> As mentioned on the discussion on the dev ML, our revised Program
> interface looks like this:
>
> public interface FlinkJob {
>   String getDescription();
>   List getParameters();
>   boolean isStreamingOrBatch();
> }
>
> public class FlinkJobParameter {
>   private String paramName;
>   private String paramType = "string";
>   private String paramDesc;
>   private String paramDefaultValue;
>   private Set choices;
>   private boolean mandatory;
> }
>
> I've also opened some JIRA issues related to this topic:
>
> [1] https://issues.apache.org/jira/browse/FLINK-10864
> [2] https://issues.apache.org/jira/browse/FLINK-10862
> [3] https://issues.apache.org/jira/browse/FLINK-10879.
>
> Best,
> Flavio
>
>
> On Mon, Jul 22, 2019 at 1:46 PM Zili Chen  wrote:
>
>> Hi guys,
>>
>> We want to have an accurate idea of how many people are implementing
>> Flink job based on the interface Program, and how they actually
>> implement it.
>>
>> The reason I ask for the survey is from this thread[1] where we notice
>> this codepath is stale and less useful than it should be. As it is an
>> interface marked as @PublicEvolving it is originally aimed at serving
>> as user interface. Thus before doing deprecation or dropping, we'd like
>> to see if there are users implementing their job based on this
>> interface(org.apache.flink.api.common.Program) and if there is any,
>> we are curious about how it is used.
>>
>> If little or none of Flink user based on this interface, we would
>> propose deprecating or dropping it.
>>
>> I really appreciate your time and your insight.
>>
>> Best,
>> tison.
>>
>> [1]
>> https://lists.apache.org/thread.html/7ffc9936a384b891dbcf0a481d26c6d13b2125607c200577780d1e18@%3Cdev.flink.apache.org%3E
>>
>
>
>

-- 
Best Regards

Jeff Zhang


Re: [SURVEY] How many people implement Flink job based on the interface Program?

2019-07-23 Thread Jeff Zhang
Thanks Flavio,

I get most of your points except one

   - Get the list of jobs contained in jar (ideally this is is true for
   every engine beyond Spark or Flink)

Just curious to know how you submit job via rest api, if there're multiple
jobs in one jar, then do you need to submit jar one time and submit jobs
multiple times ?
And is there any relationship between these jobs in the same jar ?



Flavio Pompermaier  于2019年7月23日周二 下午4:01写道:

> Hi Jeff, the thing about the manifest is really about to have a way to
> list multiple main classes in the jart (without the need to inspect every
> Java class or forcing a 1-to-1 between jar and job like it is now).
> My requirements were driven by the UI we're using in our framework:
>
>- Get the list of jobs contained in jar (ideally this is is true for
>every engine beyond Spark or Flink)
>- Get the list of required/optional parameters for each job
>- Besides the optionality of a parameter, each parameter should
>include an help description, a type (to validate the input param), a
>default value and a set of choices (when there's a limited number of
>options available)
>- obviously the job serve should be able to submit/run/cancel/monitor
>a job and upload/delete the uploaded jars
>- the job server should not depend on any target platform dependency
>(Spark or Flink) beyond the rest client: at the moment the rest client
>requires a lot of core libs (indeed because it needs to submit the job
>graph/plan)
>- in our vision, the flink client should be something like Apache Livy
>(https://livy.apache.org/)
>- One of the biggest  limitations we face when running a Flink job
>from the REST API is the fact that the job can't do anything after
>env.execute() while we need to call an external service to signal that the
>    job has ended + some other details
>
> Best,
> Flavio
>
> On Tue, Jul 23, 2019 at 3:44 AM Jeff Zhang  wrote:
>
>> Hi Flavio,
>>
>> Based on the discussion in the tickets you mentioned above, the
>> program-class attribute was a mistake and community is intended to use
>> main-class to replace it.
>>
>> Deprecating Program interface is a part of work of flink new client api.
>> IIUC, your requirements are not so complicated. We can implement that in
>> the new flink client api. How about listing your requirement, and let's
>> discuss how we can make it in the new flink client api. BTW, I guess most
>> of your requirements are based on your flink job server, It would be
>> helpful if you could provide more info about your flink job server. Thanks
>>
>>
>>
>> Flavio Pompermaier  于2019年7月22日周一 下午8:59写道:
>>
>>> Hi Tison,
>>> we use a modified version of the Program interface to enable a web UI do
>>> properly detect and run Flink jobs contained in a jar + their parameters.
>>> As stated in [1], we dected multiple Main classes per jar by handling an
>>> extra comma-separeted Manifest entry (i.e. 'Main-classes').
>>>
>>> As mentioned on the discussion on the dev ML, our revised Program
>>> interface looks like this:
>>>
>>> public interface FlinkJob {
>>>   String getDescription();
>>>   List getParameters();
>>>   boolean isStreamingOrBatch();
>>> }
>>>
>>> public class FlinkJobParameter {
>>>   private String paramName;
>>>   private String paramType = "string";
>>>   private String paramDesc;
>>>   private String paramDefaultValue;
>>>   private Set choices;
>>>   private boolean mandatory;
>>> }
>>>
>>> I've also opened some JIRA issues related to this topic:
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-10864
>>> [2] https://issues.apache.org/jira/browse/FLINK-10862
>>> [3] https://issues.apache.org/jira/browse/FLINK-10879.
>>>
>>> Best,
>>> Flavio
>>>
>>>
>>> On Mon, Jul 22, 2019 at 1:46 PM Zili Chen  wrote:
>>>
>>>> Hi guys,
>>>>
>>>> We want to have an accurate idea of how many people are implementing
>>>> Flink job based on the interface Program, and how they actually
>>>> implement it.
>>>>
>>>> The reason I ask for the survey is from this thread[1] where we notice
>>>> this codepath is stale and less useful than it should be. As it is an
>>>> interface marked as @PublicEvolving it is originally aimed at serving
>>>> as user interface. Thus before doing deprecation or dropping, we'd like
>>>> to see if there are users implementing their job based on this
>>>> interface(org.apache.flink.api.common.Program) and if there is any,
>>>> we are curious about how it is used.
>>>>
>>>> If little or none of Flink user based on this interface, we would
>>>> propose deprecating or dropping it.
>>>>
>>>> I really appreciate your time and your insight.
>>>>
>>>> Best,
>>>> tison.
>>>>
>>>> [1]
>>>> https://lists.apache.org/thread.html/7ffc9936a384b891dbcf0a481d26c6d13b2125607c200577780d1e18@%3Cdev.flink.apache.org%3E
>>>>
>>>
>>>
>>>
>>
>> --
>> Best Regards
>>
>> Jeff Zhang
>>
>
>
>

-- 
Best Regards

Jeff Zhang


Re: [SURVEY] How many people implement Flink job based on the interface Program?

2019-07-23 Thread Jeff Zhang
IIUC the list of jobs contained in jar means the jobs you defined in the
pipeline. Then I don't think it is flink's responsibility to maintain the
job list info, it is the job scheduler that define the pipeline. So the job
scheduler should maintain the job list.



Flavio Pompermaier  于2019年7月23日周二 下午5:23写道:

> The jobs are somehow related to each other in the sense that we have a
> configurable pipeline where there are optional steps you can enable/disable
> (and thus we create a single big jar).
> Because of this, we have our application REST service that actually works
> also as a job scheduler and use the job server as a proxy towards Flink:
> when one steps ends (this is what is signalled back after the env.execute()
> from Flink to the application REST service) our application tells the job
> server to execute the next job of the pipeline on the cluster.
> Of course this is a "dirty" solution (because we should user a workflow
> scheduler like Airflow or Luigi or similar) but we wanted to keep things as
> simplest as possible for the moment.
> In the future, if our customers would ever improve this part, we will
> integrate our application with a dedicated job scheduler like the one
> listed before (probably)..I don't know if some of them are nowadays already
> integrated with Flink..when we started coding our frontend application (2
> ears ago) none of them were using it.
>
> Best,
> Flavio
>
> On Tue, Jul 23, 2019 at 10:40 AM Jeff Zhang  wrote:
>
>> Thanks Flavio,
>>
>> I get most of your points except one
>>
>>- Get the list of jobs contained in jar (ideally this is is true for
>>every engine beyond Spark or Flink)
>>
>> Just curious to know how you submit job via rest api, if there're
>> multiple jobs in one jar, then do you need to submit jar one time and
>> submit jobs multiple times ?
>> And is there any relationship between these jobs in the same jar ?
>>
>>
>>
>> Flavio Pompermaier  于2019年7月23日周二 下午4:01写道:
>>
>>> Hi Jeff, the thing about the manifest is really about to have a way to
>>> list multiple main classes in the jart (without the need to inspect every
>>> Java class or forcing a 1-to-1 between jar and job like it is now).
>>> My requirements were driven by the UI we're using in our framework:
>>>
>>>- Get the list of jobs contained in jar (ideally this is is true for
>>>every engine beyond Spark or Flink)
>>>- Get the list of required/optional parameters for each job
>>>- Besides the optionality of a parameter, each parameter should
>>>include an help description, a type (to validate the input param), a
>>>default value and a set of choices (when there's a limited number of
>>>options available)
>>>- obviously the job serve should be able to
>>>submit/run/cancel/monitor a job and upload/delete the uploaded jars
>>>- the job server should not depend on any target platform dependency
>>>(Spark or Flink) beyond the rest client: at the moment the rest client
>>>requires a lot of core libs (indeed because it needs to submit the job
>>>graph/plan)
>>>- in our vision, the flink client should be something like Apache
>>>Livy (https://livy.apache.org/)
>>>- One of the biggest  limitations we face when running a Flink job
>>>from the REST API is the fact that the job can't do anything after
>>>env.execute() while we need to call an external service to signal that 
>>> the
>>>job has ended + some other details
>>>
>>> Best,
>>> Flavio
>>>
>>> On Tue, Jul 23, 2019 at 3:44 AM Jeff Zhang  wrote:
>>>
>>>> Hi Flavio,
>>>>
>>>> Based on the discussion in the tickets you mentioned above, the
>>>> program-class attribute was a mistake and community is intended to use
>>>> main-class to replace it.
>>>>
>>>> Deprecating Program interface is a part of work of flink new client
>>>> api.
>>>> IIUC, your requirements are not so complicated. We can implement that
>>>> in the new flink client api. How about listing your requirement, and let's
>>>> discuss how we can make it in the new flink client api. BTW, I guess most
>>>> of your requirements are based on your flink job server, It would be
>>>> helpful if you could provide more info about your flink job server. Thanks
>>>>
>>>>
>>>>
>>>> Flavio Pompermaier  于2019年7月22日周一 下午8:59写道:
>>

Re: Re: Flink and Akka version incompatibility ..

2019-07-24 Thread Jeff Zhang
I think it is better to shade all the dependencies of flink so that all the
projects that use flink won't hit this kind of issue.


Haibo Sun  于2019年7月24日周三 下午4:07写道:

> Hi,   Debasish Ghosh
>
> I don't know why not shade Akka, maybe it can be shaded. Chesnay may be
> able to answer that.
> I recommend to shade Akka dependency of your application because it don't
> be known what's wrong with shading Flink's Akka.
>
> CC  @Chesnay Schepler
>
> Best,
> Haibo
>
> At 2019-07-24 15:48:59, "Debasish Ghosh"  wrote:
>
> The problem that I am facing is with Akka serialization .. Why not shade
> the whole of Akka ?
>
> java.lang.AbstractMethodError:
>> akka.remote.RemoteActorRefProvider.serializationInformation()Lakka/serialization/Serialization$Information;
>> at
>> akka.serialization.Serialization.serializationInformation(Serialization.scala:166)
>
>
> Akka 2.6 is just around the corner and I don't think Flink will upgrade to
> Akka 2.6 that soon .. so somehow this problem is bound to recur ..
>
> regards.
>
> On Wed, Jul 24, 2019 at 1:01 PM Zili Chen  wrote:
>
>> I can see that we relocate akka's netty, akka uncommon math but also
>> be curious why Flink doesn't shaded all of akka dependencies...
>>
>> Best,
>> tison.
>>
>>
>> Debasish Ghosh  于2019年7月24日周三 下午3:15写道:
>>
>>> Hello Haibo -
>>>
>>> Yes, my application depends on Akka 2.5.
>>> Just curious, why do you think it's recommended to shade Akka version of
>>> my application instead of Flink ?
>>>
>>> regards.
>>>
>>> On Wed, Jul 24, 2019 at 12:42 PM Haibo Sun  wrote:
>>>
>>>> Hi  Debasish Ghosh,
>>>>
>>>> Does your application have to depend on Akka 2.5? If not, it's a good
>>>> idea to always keep the Akka version that the application depend on in line
>>>> with Flink.
>>>> If you want to try shading Akka dependency, I think that it is more
>>>> recommended to shade Akka dependency of your application.
>>>>
>>>> Best,
>>>> Haibo
>>>>
>>>> At 2019-07-24 14:31:29, "Debasish Ghosh" 
>>>> wrote:
>>>>
>>>> Hello -
>>>>
>>>> An application that uses Akka 2.5 and Flink 1.8.0 gives runtime errors
>>>> because of version mismatch between Akka that we use and the one that Flink
>>>> uses (which is Akka 2.4). Anyone tried shading Akka dependency with Flink ?
>>>>
>>>> Or is there any other alternative way to handle this issue ? I know
>>>> Flink 1.9 has upgraded to Akka 2.5 but this is (I think) going to be a
>>>> recurring problem down the line with mismatch between the new releases of
>>>> Akka and Flink.
>>>>
>>>> regards.
>>>>
>>>> --
>>>> Debasish Ghosh
>>>> http://manning.com/ghosh2
>>>> http://manning.com/ghosh
>>>>
>>>> Twttr: @debasishg
>>>> Blog: http://debasishg.blogspot.com
>>>> Code: http://github.com/debasishg
>>>>
>>>>
>>>
>>> --
>>> Debasish Ghosh
>>> http://manning.com/ghosh2
>>> http://manning.com/ghosh
>>>
>>> Twttr: @debasishg
>>> Blog: http://debasishg.blogspot.com
>>> Code: http://github.com/debasishg
>>>
>>
>
> --
> Debasish Ghosh
> http://manning.com/ghosh2
> http://manning.com/ghosh
>
> Twttr: @debasishg
> Blog: http://debasishg.blogspot.com
> Code: http://github.com/debasishg
>
>

-- 
Best Regards

Jeff Zhang


Re: Error while running flink job on local environment

2019-07-30 Thread Jeff Zhang
@Andrey,

Although your approach will work, it requires the user to write different
code for local mode and other modes. This is inconvenient for users.
IMHO, we should not check these kinds of memory configuration in local
mode. Or implicitly set the memory of TM pretty large in local mode to
avoid this kind of problem.

Andrey Zagrebin  于2019年7月31日周三 上午1:32写道:

> Hi Vinayak,
>
> the error message provides a hint about changing config options, you could
> try to use StreamExecutionEnvironment.createLocalEnvironment(2,
> customConfig); to increase resources.
> this issue might also address the problem, it will be part of 1.9 release:
> https://issues.apache.org/jira/browse/FLINK-12852
>
> Best,
> Andrey
>
> On Tue, Jul 30, 2019 at 5:42 PM Vinayak Magadum 
> wrote:
>
>> Hi,
>>
>> I am using Flink version: 1.7.1
>>
>> I have a flink job that gets the execution environment as below and
>> executes the job.
>>
>> StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>>
>> When I run the code in cluster, it runs fine. But on local machine while
>> running the job via IntelliJ I get below error:
>>
>> org.apache.flink.runtime.client.JobExecutionException: Job execution
>> failed.
>> at
>> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
>> at
>> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:647)
>> at
>> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
>> 
>> Caused by: java.io.IOException: Insufficient number of network buffers:
>> required 8, but only 3 available. The total number of network buffers is
>> currently set to 12851 of 32768 bytes each. You can increase this number by
>> setting the configuration keys 'taskmanager.network.memory.fraction',
>> 'taskmanager.network.memory.min', and 'taskmanager.network.memory.max'.
>> at
>> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:272)
>> at
>> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:257)
>> at
>> org.apache.flink.runtime.io.network.NetworkEnvironment.setupInputGate(NetworkEnvironment.java:278)
>> at
>> org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:224)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:608)
>> at java.lang.Thread.run(Thread.java:748)
>>
>> Workaround that I tried to make it run on local is to use
>> StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.createLocalEnvironment(2);
>>
>> instead of StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>>
>> With Flink 1.4.2, StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment(); used to work on both
>> cluster as well as local environment.
>>
>> Is there any way to make
>> StreamExecutionEnvironment.getExecutionEnvironment(); work in both cluster
>> and local mode in flink 1.7.1? Specifically how to make it work locally via
>> IntelliJ.
>> 
>> Thanks & Regards,
>> Vinayak
>>
>

-- 
Best Regards

Jeff Zhang


Re: [ANNOUNCE] Hequn becomes a Flink committer

2019-08-07 Thread Jeff Zhang
Congrats Hequn!

Paul Lam  于2019年8月7日周三 下午5:08写道:

> Congrats Hequn! Well deserved!
>
> Best,
> Paul Lam
>
> 在 2019年8月7日,16:28,jincheng sun  写道:
>
> Hi everyone,
>
> I'm very happy to announce that Hequn accepted the offer of the Flink PMC
> to become a committer of the Flink project.
>
> Hequn has been contributing to Flink for many years, mainly working on
> SQL/Table API features. He's also frequently helping out on the user
> mailing lists and helping check/vote the release.
>
> Congratulations Hequn!
>
> Best, Jincheng
> (on behalf of the Flink PMC)
>
>
>

-- 
Best Regards

Jeff Zhang


Re: How to handle Flink Job with 400MB+ Uberjar with 800+ containers ?

2019-08-30 Thread Jeff Zhang
:205)
>>> at 
>>> org.apache.flink.yarn.YarnResourceManager.lambda$onContainersAllocated$1(YarnResourceManager.java:400)
>>> at 
>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
>>> at 
>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
>>> at 
>>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
>>> at 
>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
>>> at 
>>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
>>> at 
>>> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>>> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>>> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>>> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>> at 
>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>> at 
>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>> at 
>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>
>>>
>>>
>>>

-- 
Best Regards

Jeff Zhang


Re: [ANNOUNCE] Kostas Kloudas joins the Flink PMC

2019-09-06 Thread Jeff Zhang
Congrats Klou!

Zili Chen  于2019年9月6日周五 下午9:51写道:

> Congrats Klou!
>
> Best,
> tison.
>
>
> Till Rohrmann  于2019年9月6日周五 下午9:23写道:
>
>> Congrats Klou!
>>
>> Cheers,
>> Till
>>
>> On Fri, Sep 6, 2019 at 3:00 PM Dian Fu  wrote:
>>
>>> Congratulations Kostas!
>>>
>>> Regards,
>>> Dian
>>>
>>> > 在 2019年9月6日,下午8:58,Wesley Peng  写道:
>>> >
>>> > On 2019/9/6 8:55 下午, Fabian Hueske wrote:
>>> >> I'm very happy to announce that Kostas Kloudas is joining the Flink
>>> PMC.
>>> >> Kostas is contributing to Flink for many years and puts lots of
>>> effort in helping our users and growing the Flink community.
>>> >> Please join me in congratulating Kostas!
>>> >
>>> > congratulation Kostas!
>>> >
>>> > regards.
>>>
>>>

-- 
Best Regards

Jeff Zhang


Re: error: Static methods in interface require -target:jvm-1.8 using scala 2.11

2019-09-10 Thread Jeff Zhang
Add this to your scala-maven-plugin


  

  -target:jvm-1.8

  





Ben Yan  于2019年9月11日周三 下午12:07写道:

> The following is the environment I use:
> 1. flink.version: 1.9.0
> 2. java version "1.8.0_212"
> 3. scala version: 2.11.12
>
> When I wrote the following code in the scala programming language, I found
> the following error:
>
> // set up the batch execution environment
> val bbSettings = 
> EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()
> val bbTableEnv = TableEnvironment.create(bbSettings)
>
> error: Static methods in interface require -target:jvm-1.8
> [ERROR] val bbTableEnv = TableEnvironment.create(bbSettings)
>
> But when I use the java programming language or the version of scala in 2.12, 
> there is no problem.
>
> If I use the version of scala2.11, is there any way to solve this problem? 
> thanks
>
>
> Best,
>
> Ben
>
>

-- 
Best Regards

Jeff Zhang


Re: [ANNOUNCE] Zili Chen becomes a Flink committer

2019-09-11 Thread Jeff Zhang
Congratulations Zili Chen!

Wesley Peng  于2019年9月11日周三 下午5:25写道:

> Hi
>
> on 2019/9/11 17:22, Till Rohrmann wrote:
> > I'm very happy to announce that Zili Chen (some of you might also know
> > him as Tison Kun) accepted the offer of the Flink PMC to become a
> > committer of the Flink project.
>
> Congratulations Zili Chen.
>
> regards.
>


-- 
Best Regards

Jeff Zhang


Re: No result shown when submitting the SQL in cli

2021-05-11 Thread Jeff Zhang
The result is printed in TM.
It is local mode in IDE, so TM runs in your local jvm that's why you see
the result
While it is distributed mode (either yarn or standalone mode) when you are
in sql-client, you should be able to see the result in TM logs.


tao xiao  于2021年5月11日周二 下午11:40写道:

> Does anyone help with this question?
>
> On Thu, May 6, 2021 at 3:26 PM tao xiao  wrote:
>
>> Hi team,
>>
>> I wrote a simple SQL job to select data from Kafka. I can see results
>> printing out in IDE but when I submit the job to a standalone cluster in
>> CLI there is no result shown. I am sure the job is running well in the
>> cluster with debug log suggesting that the kafka consumer is fetching data
>> from Kafka. I enabled debug log in CLI and I don't see any obvious log.
>> Here is the job code snippet
>>
>> public static void main(String[] args) throws Exception {
>>   StreamTableEnvironment tableEnv = StreamTableEnvironment
>>   
>> .create(StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1));
>>
>>   String sqls = new String(Files.readAllBytes(Paths.get(args[0])));
>>   splitIgnoreQuota(sqls, ';').forEach(sql -> {
>> TableResult tableResult = tableEnv.executeSql(sql);
>> tableResult.print();
>>   });
>> }
>>
>> It simply parses a sql file and execute the statements
>>
>> Here is the SQL statements
>>
>> CREATE TABLE t1 (
>>   `f1` STRING,
>>   `f2` STRING
>> ) WITH (
>>   'connector' = 'kafka',
>>   'topic' = 'topic',
>>   'properties.group.id' = 'test1',
>>   'properties.max.partition.fetch.bytes' = '16384',
>>   'properties.enable.auto.commit' = 'false',
>>   'properties.bootstrap.servers' = 'kafka:9092',
>>   'scan.startup.mode' = 'earliest-offset',
>>   'format' = 'json'
>> );
>>
>> SELECT * FROM t1
>>
>>
>> Below is the result I got from IDE
>> | +I | b8f5 |   abcd |
>> | +I | b8f5 |   abcd |
>>
>> And this is the result from CLI
>> bin/flink run  -m localhost:8081 -c kafka.sample.flink.SQLSample
>> ~/workspaces/kafka-sample/target/kafka-sample-0.1.2-jar-with-dependencies.jar
>> /sample.sql
>> ++
>> | result |
>> ++
>> | OK |
>> ++
>> 1 row in set
>> Job has been submitted with JobID ace45d2ff850675243e2663d3bf11701
>> ++++
>> | op |   uuid |ots |
>> ++++
>>
>>
>> --
>> Regards,
>> Tao
>>
>
>
> --
> Regards,
> Tao
>


-- 
Best Regards

Jeff Zhang


Re: Jupyter PyFlink Web UI

2021-06-09 Thread Jeff Zhang
Hi Maciek,

You can try zeppelin which support pyflink and display flink job url inline.

http://zeppelin.apache.org/docs/0.9.0/interpreter/flink.html


Maciej Bryński  于2021年6月9日周三 下午1:53写道:

> Nope.
> I found the following solution.
>
> conf = Configuration()
> env =
> StreamExecutionEnvironment(get_gateway().jvm.org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf._j_configuration))
> env_settings =
> EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
> table_env = StreamTableEnvironment.create(stream_execution_environment=env,
> environment_settings=env_settings)
>
> I also created the bug report
> https://issues.apache.org/jira/browse/FLINK-22924.
> I think this API should be exposed in Python.
>
> śr., 9 cze 2021 o 04:57 Dian Fu  napisał(a):
> >
> > Hi Macike,
> >
> > You could try if the following works:
> >
> > ```
> > table_env.get_config().get_configuration().set_string("rest.bind-port",
> "xxx")
> > ```
> >
> > Regards,
> > Dian
> >
> > > 2021年6月8日 下午8:26,maverick  写道:
> > >
> > > Hi,
> > > I've got a question. I'm running PyFlink code from Jupyter Notebook
> starting
> > > TableEnvironment with following code:
> > >
> > > env_settings =
> > >
> EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
> > > table_env = TableEnvironment.create(env_settings)
> > >
> > > How can I enable Web UI in this code?
> > >
> > > Regards,
> > > Maciek
> > >
> > >
> > >
> > > --
> > > Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
> >
>
>
> --
> Maciek Bryński
>


-- 
Best Regards

Jeff Zhang


Re: Jupyter PyFlink Web UI

2021-06-09 Thread Jeff Zhang
BTW, you can also send email to zeppelin user maillist to join zeppelin
slack channel to discuss more details.
http://zeppelin.apache.org/community.html


Jeff Zhang  于2021年6月9日周三 下午6:34写道:

> Hi Maciek,
>
> You can try zeppelin which support pyflink and display flink job url
> inline.
>
> http://zeppelin.apache.org/docs/0.9.0/interpreter/flink.html
>
>
> Maciej Bryński  于2021年6月9日周三 下午1:53写道:
>
>> Nope.
>> I found the following solution.
>>
>> conf = Configuration()
>> env =
>> StreamExecutionEnvironment(get_gateway().jvm.org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf._j_configuration))
>> env_settings =
>> EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
>> table_env =
>> StreamTableEnvironment.create(stream_execution_environment=env,
>> environment_settings=env_settings)
>>
>> I also created the bug report
>> https://issues.apache.org/jira/browse/FLINK-22924.
>> I think this API should be exposed in Python.
>>
>> śr., 9 cze 2021 o 04:57 Dian Fu  napisał(a):
>> >
>> > Hi Macike,
>> >
>> > You could try if the following works:
>> >
>> > ```
>> > table_env.get_config().get_configuration().set_string("rest.bind-port",
>> "xxx")
>> > ```
>> >
>> > Regards,
>> > Dian
>> >
>> > > 2021年6月8日 下午8:26,maverick  写道:
>> > >
>> > > Hi,
>> > > I've got a question. I'm running PyFlink code from Jupyter Notebook
>> starting
>> > > TableEnvironment with following code:
>> > >
>> > > env_settings =
>> > >
>> EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
>> > > table_env = TableEnvironment.create(env_settings)
>> > >
>> > > How can I enable Web UI in this code?
>> > >
>> > > Regards,
>> > > Maciek
>> > >
>> > >
>> > >
>> > > --
>> > > Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>> >
>>
>>
>> --
>> Maciek Bryński
>>
>
>
> --
> Best Regards
>
> Jeff Zhang
>


-- 
Best Regards

Jeff Zhang


Registering custom metrics on the JobManager

2021-07-15 Thread Jeff Charles
Hi,

I would like to register a custom metric on the JobManager as opposed to a
TaskManager. I cannot seem to locate any documentation that indicates how
to do this or even if it's currently possible or not.

Does anyone have any guidance on how to register a custom metric on the
JobManager?

Jeff Charles


Re: Registering custom metrics on the JobManager

2021-07-15 Thread Jeff Charles
We would like a counter of exceptions so we can alert if there's an
anomalous increase in them. I realize a counter in the JobManager would not
capture anywhere close to all exceptions but even capturing a count of a
subset that we're able to track would be helpful.

On Thu, Jul 15, 2021 at 3:47 PM Chesnay Schepler  wrote:

> This is currently not possible. What metric are you interested in?
>
> On 15/07/2021 21:16, Jeff Charles wrote:
> > Hi,
> >
> > I would like to register a custom metric on the JobManager as opposed
> > to a TaskManager. I cannot seem to locate any documentation that
> > indicates how to do this or even if it's currently possible or not.
> >
> > Does anyone have any guidance on how to register a custom metric on
> > the JobManager?
> >
> > Jeff Charles
>
>
>


  1   2   >