Re: Community fork of Flink Scala API for Scala 2.12/2.13/3.x

2022-05-12 Thread Jeff Zhang
That's true scala shell is removed from flink . Fortunately, Apache
Zeppelin has its own scala repl for Flink. So if Flink can support scala
2.13, I am wondering whether it is possible to integrate it into scala
shell so that user can run flink scala code in notebook like spark.

On Thu, May 12, 2022 at 11:06 PM Roman Grebennikov  wrote:

> Hi,
>
> AFAIK scala REPL was removed completely in Flink 1.15 (
> https://issues.apache.org/jira/browse/FLINK-24360), so there is nothing
> to cross-build.
>
> Roman Grebennikov | g...@dfdx.me
>
>
> On Thu, May 12, 2022, at 14:55, Jeff Zhang wrote:
>
> Great work Roman, do you think it is possible to run in scala shell as
> well?
>
> On Thu, May 12, 2022 at 10:43 PM Roman Grebennikov  wrote:
>
>
> Hello,
>
> As far as I understand discussions in this mailist, now there is almost no
> people maintaining the official Scala API in Apache Flink. Due to some
> technical complexities it will be probably stuck for a very long time on
> Scala 2.12 (which is not EOL yet, but quite close to):
> * Traversable serializer relies a lot on CanBuildFrom (so it's read and
> compiled on restore), which is missing in Scala 2.13 and 3.x - migrating
> off from this approach maintaining a savepoint compatibility can be quite a
> complex task.
> * Scala API uses an implicitly generated TypeInformation, which is
> generated by a giant scary mkTypeInfo macro, which should be completely
> rewritten for Scala 3.x.
>
> But even in the current state, scala support in Flink has some issues with
> ADT (sealed traits, popular data modelling pattern) not being natively
> supported, so if you use them, you have to fall back to Kryo, which is not
> that fast: we've seed 3x-4x throughput drops in performance tests.
>
> In my current company we made a library (
> https://github.com/findify/flink-adt) which used Magnolia (
> https://github.com/softwaremill/magnolia) to do all the compile-time
> TypeInformation generation to make Scala ADT nice & fast in Flink. With a
> couple of community contributions it was now possible to cross-build it
> also for scala3.
>
> As Flink 1.15 core is scala free, we extracted the DataStream part of
> Flink Scala API into a separate project, glued it together with flink-adt
> and ClosureCleaner from Spark 3.2 (supporting Scala 2.13 and jvm17) and
> cross-compiled it for 2.12/2.13/3.x. You can check out the result on this
> github project: https://github.com/findify/flink-scala-api
>
> So technically speaking, now it's possible to migrate a scala flink job
> from 2.12 to 3.x with:
> * replace flink-streaming-scala dependency with flink-scala-api (optional,
> both libs can co-exist in classpath on 2.12)
> * replace all imports of org.apache.flink.streaming.api.scala._ with ones
> from the new library
> * rebuild the job for 3.x
>
> The main drawback is that there is no savepoint compatibility due to
> CanBuildFrom and different way of handling ADTs. But if you can afford
> re-bootstrapping the state - migration is quite straightforward.
>
> The README on github https://github.com/findify/flink-scala-api#readme
> has some more details on how and why this project was done in this way. And
> the project is a bit experimental, so if you're interested in scala3 on
> Flink, you're welcome to share your feedback and ideas.
>
> with best regards,
> Roman Grebennikov | g...@dfdx.me
>
>
>
> --
> Best Regards
>
> Jeff Zhang
>
>
>

-- 
Best Regards

Jeff Zhang


Re: Community fork of Flink Scala API for Scala 2.12/2.13/3.x

2022-05-12 Thread Jeff Zhang
Great work Roman, do you think it is possible to run in scala shell as well?

On Thu, May 12, 2022 at 10:43 PM Roman Grebennikov  wrote:

> Hello,
>
> As far as I understand discussions in this mailist, now there is almost no
> people maintaining the official Scala API in Apache Flink. Due to some
> technical complexities it will be probably stuck for a very long time on
> Scala 2.12 (which is not EOL yet, but quite close to):
> * Traversable serializer relies a lot on CanBuildFrom (so it's read and
> compiled on restore), which is missing in Scala 2.13 and 3.x - migrating
> off from this approach maintaining a savepoint compatibility can be quite a
> complex task.
> * Scala API uses an implicitly generated TypeInformation, which is
> generated by a giant scary mkTypeInfo macro, which should be completely
> rewritten for Scala 3.x.
>
> But even in the current state, scala support in Flink has some issues with
> ADT (sealed traits, popular data modelling pattern) not being natively
> supported, so if you use them, you have to fall back to Kryo, which is not
> that fast: we've seed 3x-4x throughput drops in performance tests.
>
> In my current company we made a library (
> https://github.com/findify/flink-adt) which used Magnolia (
> https://github.com/softwaremill/magnolia) to do all the compile-time
> TypeInformation generation to make Scala ADT nice & fast in Flink. With a
> couple of community contributions it was now possible to cross-build it
> also for scala3.
>
> As Flink 1.15 core is scala free, we extracted the DataStream part of
> Flink Scala API into a separate project, glued it together with flink-adt
> and ClosureCleaner from Spark 3.2 (supporting Scala 2.13 and jvm17) and
> cross-compiled it for 2.12/2.13/3.x. You can check out the result on this
> github project: https://github.com/findify/flink-scala-api
>
> So technically speaking, now it's possible to migrate a scala flink job
> from 2.12 to 3.x with:
> * replace flink-streaming-scala dependency with flink-scala-api (optional,
> both libs can co-exist in classpath on 2.12)
> * replace all imports of org.apache.flink.streaming.api.scala._ with ones
> from the new library
> * rebuild the job for 3.x
>
> The main drawback is that there is no savepoint compatibility due to
> CanBuildFrom and different way of handling ADTs. But if you can afford
> re-bootstrapping the state - migration is quite straightforward.
>
> The README on github https://github.com/findify/flink-scala-api#readme
> has some more details on how and why this project was done in this way. And
> the project is a bit experimental, so if you're interested in scala3 on
> Flink, you're welcome to share your feedback and ideas.
>
> with best regards,
> Roman Grebennikov | g...@dfdx.me
>
>

-- 
Best Regards

Jeff Zhang


Re: Unable to start sql-client when putting flink-table-planner_2.12-1.15.0.jar to lib folder

2022-05-08 Thread Jeff Zhang
Thanks Yuxia, that works.  Does that mean for one flink distribution, I can
either use java or use scala ? If so, it seems not user friendly.



On Sun, May 8, 2022 at 10:40 AM yuxia  wrote:

> Hi, you can move the flink-table-planner-loader to the /opt.  See more in
> https://issues.apache.org/jira/browse/FLINK-25128
>
>
> Best regards,
> Yuxia
>
> ------
> *发件人: *"Jeff Zhang" 
> *收件人: *"User" 
> *发送时间: *星期六, 2022年 5 月 07日 下午 10:05:55
> *主题: *Unable to start sql-client when putting
> flink-table-planner_2.12-1.15.0.jar to lib folder
>
> Hi folks,
> It looks like flink 1.15 changes its binary distribution because of scala
> free. The flink-table-planner_2.12-1.15.0.jar is put under the opt folder.
> Now I would like to use it for my scala flink app, so I move it to the lib
> folder, but after that, I can not start sql-client. Is it expected ? Here's
> the error I see
>
>
> -
> Exception in thread "main"
> org.apache.flink.table.client.SqlClientException: Unexpected exception.
> This is a bug. Please consider filing an issue.
> at org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:201)
> at org.apache.flink.table.client.SqlClient.main(SqlClient.java:161)
> Caused by: org.apache.flink.table.api.TableException: Could not
> instantiate the executor. Make sure a planner module is on the classpath
> at
> org.apache.flink.table.client.gateway.context.ExecutionContext.lookupExecutor(ExecutionContext.java:163)
> at
> org.apache.flink.table.client.gateway.context.ExecutionContext.createTableEnvironment(ExecutionContext.java:111)
> at
> org.apache.flink.table.client.gateway.context.ExecutionContext.(ExecutionContext.java:66)
> at
> org.apache.flink.table.client.gateway.context.SessionContext.create(SessionContext.java:247)
> at
> org.apache.flink.table.client.gateway.local.LocalContextUtils.buildSessionContext(LocalContextUtils.java:87)
> at
> org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:87)
> at org.apache.flink.table.client.SqlClient.start(SqlClient.java:88)
> at org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:187)
> ... 1 more
> Caused by: org.apache.flink.table.api.ValidationException: Multiple
> factories for identifier 'default' that implement
> 'org.apache.flink.table.delegation.ExecutorFactory' found in the classpath.
>
> Ambiguous factory classes are:
>
> org.apache.flink.table.planner.delegation.DefaultExecutorFactory
> org.apache.flink.table.planner.loader.DelegateExecutorFactory
> at
> org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:553)
> at
> org.apache.flink.table.client.gateway.context.ExecutionContext.lookupExecutor(ExecutionContext.java:154)
> ... 8 more
>
>
>
> --
> Best Regards
>
> Jeff Zhang
>
>

-- 
Best Regards

Jeff Zhang


Unable to start sql-client when putting flink-table-planner_2.12-1.15.0.jar to lib folder

2022-05-07 Thread Jeff Zhang
Hi folks,

It looks like flink 1.15 changes its binary distribution because of scala
free. The flink-table-planner_2.12-1.15.0.jar is put under the opt folder.
Now I would like to use it for my scala flink app, so I move it to the lib
folder, but after that, I can not start sql-client. Is it expected ? Here's
the error I see

-
Exception in thread "main"
org.apache.flink.table.client.SqlClientException: Unexpected exception.
This is a bug. Please consider filing an issue.
at org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:201)
at org.apache.flink.table.client.SqlClient.main(SqlClient.java:161)
Caused by: org.apache.flink.table.api.TableException: Could not instantiate
the executor. Make sure a planner module is on the classpath
at
org.apache.flink.table.client.gateway.context.ExecutionContext.lookupExecutor(ExecutionContext.java:163)
at
org.apache.flink.table.client.gateway.context.ExecutionContext.createTableEnvironment(ExecutionContext.java:111)
at
org.apache.flink.table.client.gateway.context.ExecutionContext.(ExecutionContext.java:66)
at
org.apache.flink.table.client.gateway.context.SessionContext.create(SessionContext.java:247)
at
org.apache.flink.table.client.gateway.local.LocalContextUtils.buildSessionContext(LocalContextUtils.java:87)
at
org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:87)
at org.apache.flink.table.client.SqlClient.start(SqlClient.java:88)
at org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:187)
... 1 more
Caused by: org.apache.flink.table.api.ValidationException: Multiple
factories for identifier 'default' that implement
'org.apache.flink.table.delegation.ExecutorFactory' found in the classpath.

Ambiguous factory classes are:

org.apache.flink.table.planner.delegation.DefaultExecutorFactory
org.apache.flink.table.planner.loader.DelegateExecutorFactory
at
org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:553)
at
org.apache.flink.table.client.gateway.context.ExecutionContext.lookupExecutor(ExecutionContext.java:154)
... 8 more



-- 
Best Regards

Jeff Zhang


Re: scala shell not part of 1.14.4 download

2022-03-18 Thread Jeff Zhang
Hi Georg,

You can try Zeppelin 0.10.1 which supports scala 2.12 for
Flink interpreter. Internally, Flink interpreter of Zeppelin use scala
shell, you can write scala code and run it in an interactive way.

https://zeppelin.apache.org/download.html
https://zeppelin.apache.org/docs/0.10.1/interpreter/flink.html
https://medium.com/analytics-vidhya/learn-flink-sql-the-easy-way-d9d48a95ae57


On Fri, Mar 18, 2022 at 8:10 PM Martijn Visser 
wrote:

> Hi Georg,
>
> As far as I know, there has never been a Scala Shell for Scala 2.12
> because it was not supported, only for Scala 2.11. The Scala Shell also
> also been completely dropped with Flink 1.15.
>
> Best regards,
>
> Martijn
>
> On Fri, 18 Mar 2022 at 12:43, Georg Heiler 
> wrote:
>
>> Hi,
>>
>>
>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/repls/scala_shell/
>> mentions:
>>
>> bin/start-scala-shell.sh local
>>
>> a script to start a scala REPL shell.
>>
>> But the download for Flink 
>> https://www.apache.org/dyn/closer.lua/flink/flink-1.14.4/flink-1.14.4-bin-scala_2.12.tgz
>>
>> Does not seem to include this script anymore.
>>
>> Am I missing something?
>>
>> How can I still start a scala repl?
>>
>> Best,
>>
>> Georg
>>
>>

-- 
Best Regards

Jeff Zhang


Re: How to solve the target:jvm-1.8 error when run start-scala-shell.sh

2021-10-30 Thread Jeff Zhang
The reason is that flink-scala-shell uses scala-2.11 which uses jvm-1.6 as
its target by default, that's why it can not use any library that depends
on jvm-1.8.

You can use Zeppelin instead which supports scala-shell of scala-2.12, I
have verified that it works in Zeppelin when you use scala-2.12.

Check these links for more details.

https://zeppelin.apache.org/docs/0.10.0/interpreter/flink.html
https://medium.com/analytics-vidhya/learn-flink-sql-the-easy-way-d9d48a95ae57



Jing Lu  于2021年10月30日周六 上午2:03写道:

> Hi Flink users,
>
> I am working on testing my code in start-scala-shell.sh. The flink version
> is: flink-1.12.0-bin-scala_2.11.tgz. I put the jar file (
> https://mvnrepository.com/artifact/software.amazon.awssdk/sts/2.15.65) in
> the lib folder.
>
> Then run the start-scala-shell.sh REPL:
>
> scala> import software.amazon.awssdk.services.sts.StsAsyncClient
> import software.amazon.awssdk.services.sts.StsAsyncClient
>
> scala> StsAsyncClient.builder
> :72: error: Static methods in interface require -target:jvm-1.8
>StsAsyncClient.builder
>
> Why do I have this error? Is there any way to solve this problem?
>
>
> Thanks,
> Jing
>
>

-- 
Best Regards

Jeff Zhang


Re: modify the classloader of JM dynamically to handle "ADD JAR hdfs://" statement

2021-10-16 Thread Jeff Zhang
Hi vtygoss,

Have you taken a look at Zeppelin ? Zeppelin support Flink UDF in several
different approaches, and as you said, it is not a good practice to put udf
jars under $FLINK_HOME/lib. In Zeppelin you don't do that, you can
dynamically load udf jars. Check the following doc for more details.

https://zeppelin.apache.org/docs/0.10.0/interpreter/flink.html#flink-udf



vtygoss  于2021年10月16日周六 下午4:54写道:

> Hi, community!
>
>
> I am working on building a stream processing platform using Flink 1.12.0.
> I met a problem in the scenario of SQL Application migration from
> SparkSQL/HiveSQL to FlinkSQL.
>
>
> How to dynamically modify the classloader of the JobManager already
> launched to handle "ADD JAR HDFS://..." statement in sql text?
>
>
> I already found the way to modify the classloader of all TaskManager
> dynamically through "Catalog#getHiveConf#setAuxJar" or
> "pipeline.classpath"; But i can't find the way to modify the classloader of
> JobManager dynamically, then the Application will fail because of UDF
> ClassNotFoundException. And i don't want put the udf-jar into
> $FLINK_HOME/lib, that's too heavy.
>
>
> Thanks for your any suggestions or replies!
>
>
> Best Regards!
>
>
>
>

-- 
Best Regards

Jeff Zhang


【Announce】Zeppelin 0.10.0 is released, Flink on Zeppelin Improved

2021-08-25 Thread Jeff Zhang
Hi Flink users,

We (Zeppelin community) are very excited to announce Zeppelin 0.10.0 is
officially released. In this version, we made several improvements on Flink
interpreter.  Here's the main features of Flink on Zeppelin:

   - Support multiple versions of Flink
   - Support multiple versions of Scala
   - Support multiple languages
   - Support multiple execution modes
   - Support Hive
   - Interactive development
   - Enhancement on Flink SQL
   - Multi-tenancy
   - Rest API Support

Take a look at this document for more details:
https://zeppelin.apache.org/docs/0.10.0/interpreter/flink.html
The quickest way to try Flink on Zeppelin is via its docker image
https://zeppelin.apache.org/docs/0.10.0/interpreter/flink.html#play-flink-in-zeppelin-docker

Besides these, here’s one blog about how to run Flink sql cookbook on
Zeppelin,
https://medium.com/analytics-vidhya/learn-flink-sql-the-easy-way-d9d48a95ae57
The easy way to learn Flink Sql.

Hope it would be helpful for you and welcome to join our community to
discuss with others. http://zeppelin.apache.org/community.html


-- 
Best Regards

Jeff Zhang


Re: Obtain JobManager Web Interface URL

2021-08-03 Thread Jeff Zhang
m=FVv2XIIuWzaAGdj6tz9whXTJ5GQ_xgAqIgesdgtEjG4
>> > > &s=Cu-w4-hIu8MGtvnq2Ob8StpWCZhbFmwN4knnt35NqOM&e=
>> > >
>> > > Best,
>> > > Yangze Guo
>> > >
>> > > On Fri, Jul 30, 2021 at 1:41 AM Hailu, Andreas [Engineering] <
>> andreas.ha...@gs.com> wrote:
>> > > >
>> > > > Hi team,
>> > > >
>> > > >
>> > > >
>> > > > Is there a method available to obtain the JobManager’s REST url? We
>> originally overloaded CliFrontend#executeProgram and nabbed it from the
>> ClusterClient#getWebInterfaceUrl method, but it seems this method’s
>> signature has been changed and no longer available as of 1.10.0.
>> > > >
>> > > >
>> > > >
>> > > > Best,
>> > > >
>> > > > Andreas
>> > > >
>> > > >
>> > > >
>> > > >
>> > > > 
>> > > >
>> > > > Your Personal Data: We may collect and process information about you
>> > > > that may be subject to data protection laws. For more information
>> > > > about how we use and disclose your personal data, how we protect
>> > > > your information, our legal basis to use your information, your
>> > > > rights and who you can contact, please refer to:
>> > > > http://www.gs.com/privacy-notices
>> > >
>> > > 
>> > >
>> > > Your Personal Data: We may collect and process information about you
>> > > that may be subject to data protection laws. For more information
>> > > about how we use and disclose your personal data, how we protect your
>> > > information, our legal basis to use your information, your rights and
>> > > who you can contact, please refer to:
>> > > www.gs.com/privacy-notices<http://www.gs.com/privacy-notices>
>> >
>> > 
>> >
>> > Your Personal Data: We may collect and process information about you
>> that may be subject to data protection laws. For more information about how
>> we use and disclose your personal data, how we protect your information,
>> our legal basis to use your information, your rights and who you can
>> contact, please refer to: www.gs.com/privacy-notices<
>> http://www.gs.com/privacy-notices>
>>
>

-- 
Best Regards

Jeff Zhang


Re: Jupyter PyFlink Web UI

2021-06-09 Thread Jeff Zhang
BTW, you can also send email to zeppelin user maillist to join zeppelin
slack channel to discuss more details.
http://zeppelin.apache.org/community.html


Jeff Zhang  于2021年6月9日周三 下午6:34写道:

> Hi Maciek,
>
> You can try zeppelin which support pyflink and display flink job url
> inline.
>
> http://zeppelin.apache.org/docs/0.9.0/interpreter/flink.html
>
>
> Maciej Bryński  于2021年6月9日周三 下午1:53写道:
>
>> Nope.
>> I found the following solution.
>>
>> conf = Configuration()
>> env =
>> StreamExecutionEnvironment(get_gateway().jvm.org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf._j_configuration))
>> env_settings =
>> EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
>> table_env =
>> StreamTableEnvironment.create(stream_execution_environment=env,
>> environment_settings=env_settings)
>>
>> I also created the bug report
>> https://issues.apache.org/jira/browse/FLINK-22924.
>> I think this API should be exposed in Python.
>>
>> śr., 9 cze 2021 o 04:57 Dian Fu  napisał(a):
>> >
>> > Hi Macike,
>> >
>> > You could try if the following works:
>> >
>> > ```
>> > table_env.get_config().get_configuration().set_string("rest.bind-port",
>> "xxx")
>> > ```
>> >
>> > Regards,
>> > Dian
>> >
>> > > 2021年6月8日 下午8:26,maverick  写道:
>> > >
>> > > Hi,
>> > > I've got a question. I'm running PyFlink code from Jupyter Notebook
>> starting
>> > > TableEnvironment with following code:
>> > >
>> > > env_settings =
>> > >
>> EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
>> > > table_env = TableEnvironment.create(env_settings)
>> > >
>> > > How can I enable Web UI in this code?
>> > >
>> > > Regards,
>> > > Maciek
>> > >
>> > >
>> > >
>> > > --
>> > > Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>> >
>>
>>
>> --
>> Maciek Bryński
>>
>
>
> --
> Best Regards
>
> Jeff Zhang
>


-- 
Best Regards

Jeff Zhang


Re: Jupyter PyFlink Web UI

2021-06-09 Thread Jeff Zhang
Hi Maciek,

You can try zeppelin which support pyflink and display flink job url inline.

http://zeppelin.apache.org/docs/0.9.0/interpreter/flink.html


Maciej Bryński  于2021年6月9日周三 下午1:53写道:

> Nope.
> I found the following solution.
>
> conf = Configuration()
> env =
> StreamExecutionEnvironment(get_gateway().jvm.org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf._j_configuration))
> env_settings =
> EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
> table_env = StreamTableEnvironment.create(stream_execution_environment=env,
> environment_settings=env_settings)
>
> I also created the bug report
> https://issues.apache.org/jira/browse/FLINK-22924.
> I think this API should be exposed in Python.
>
> śr., 9 cze 2021 o 04:57 Dian Fu  napisał(a):
> >
> > Hi Macike,
> >
> > You could try if the following works:
> >
> > ```
> > table_env.get_config().get_configuration().set_string("rest.bind-port",
> "xxx")
> > ```
> >
> > Regards,
> > Dian
> >
> > > 2021年6月8日 下午8:26,maverick  写道:
> > >
> > > Hi,
> > > I've got a question. I'm running PyFlink code from Jupyter Notebook
> starting
> > > TableEnvironment with following code:
> > >
> > > env_settings =
> > >
> EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
> > > table_env = TableEnvironment.create(env_settings)
> > >
> > > How can I enable Web UI in this code?
> > >
> > > Regards,
> > > Maciek
> > >
> > >
> > >
> > > --
> > > Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
> >
>
>
> --
> Maciek Bryński
>


-- 
Best Regards

Jeff Zhang


Re: No result shown when submitting the SQL in cli

2021-05-11 Thread Jeff Zhang
The result is printed in TM.
It is local mode in IDE, so TM runs in your local jvm that's why you see
the result
While it is distributed mode (either yarn or standalone mode) when you are
in sql-client, you should be able to see the result in TM logs.


tao xiao  于2021年5月11日周二 下午11:40写道:

> Does anyone help with this question?
>
> On Thu, May 6, 2021 at 3:26 PM tao xiao  wrote:
>
>> Hi team,
>>
>> I wrote a simple SQL job to select data from Kafka. I can see results
>> printing out in IDE but when I submit the job to a standalone cluster in
>> CLI there is no result shown. I am sure the job is running well in the
>> cluster with debug log suggesting that the kafka consumer is fetching data
>> from Kafka. I enabled debug log in CLI and I don't see any obvious log.
>> Here is the job code snippet
>>
>> public static void main(String[] args) throws Exception {
>>   StreamTableEnvironment tableEnv = StreamTableEnvironment
>>   
>> .create(StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1));
>>
>>   String sqls = new String(Files.readAllBytes(Paths.get(args[0])));
>>   splitIgnoreQuota(sqls, ';').forEach(sql -> {
>> TableResult tableResult = tableEnv.executeSql(sql);
>> tableResult.print();
>>   });
>> }
>>
>> It simply parses a sql file and execute the statements
>>
>> Here is the SQL statements
>>
>> CREATE TABLE t1 (
>>   `f1` STRING,
>>   `f2` STRING
>> ) WITH (
>>   'connector' = 'kafka',
>>   'topic' = 'topic',
>>   'properties.group.id' = 'test1',
>>   'properties.max.partition.fetch.bytes' = '16384',
>>   'properties.enable.auto.commit' = 'false',
>>   'properties.bootstrap.servers' = 'kafka:9092',
>>   'scan.startup.mode' = 'earliest-offset',
>>   'format' = 'json'
>> );
>>
>> SELECT * FROM t1
>>
>>
>> Below is the result I got from IDE
>> | +I | b8f5 |   abcd |
>> | +I | b8f5 |   abcd |
>>
>> And this is the result from CLI
>> bin/flink run  -m localhost:8081 -c kafka.sample.flink.SQLSample
>> ~/workspaces/kafka-sample/target/kafka-sample-0.1.2-jar-with-dependencies.jar
>> /sample.sql
>> ++
>> | result |
>> ++
>> | OK |
>> ++
>> 1 row in set
>> Job has been submitted with JobID ace45d2ff850675243e2663d3bf11701
>> ++++
>> | op |   uuid |ots |
>> ++++
>>
>>
>> --
>> Regards,
>> Tao
>>
>
>
> --
> Regards,
> Tao
>


-- 
Best Regards

Jeff Zhang


[Announce] Zeppelin 0.9.0 is released (Flink on Zeppelin)

2021-01-17 Thread Jeff Zhang
Hi flink folks,

I'd like to tell you that Zeppelin 0.9.0 is officially released, in this
new version we made a big refactoring and improvement on flink support. It
supports 3 major versions of flink (1.10, 1.11, 1.12)

You can download zeppelin 0.9.0 here.
http://zeppelin.apache.org/download.html

And check the following 2 links for more details of how to use flink on
zeppelin
https://app.gitbook.com/@jeffzhang/s/flink-on-zeppelin/
http://zeppelin.apache.org/docs/0.9.0/interpreter/flink.html


-- 
Best Regards

Jeff Zhang


Re: Safer handling of Scala immutable collections

2020-10-14 Thread Jeff Zhang
Could you share your code to reproduce it ?

Rex Fenley  于2020年10月15日周四 上午5:54写道:

> Hello,
>
> I've been playing with UDFs using the Scala API and have repeatedly run
> into issues such as this:
> ```
> flink-taskmanager_1| java.lang.ClassCastException:
> scala.collection.immutable.Set$EmptySet$ cannot be cast to [J
> ```
> Is there something that can be done on Flink's end, either to catch these
> errors in type checking or to cast them in a sane manner during runtime?
>
> Thanks!
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>  |
>  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
> <https://www.facebook.com/remindhq>
>


-- 
Best Regards

Jeff Zhang


Re: pyflink数据查询

2020-06-09 Thread Jeff Zhang
可以用zeppelin的z.show 来查询job结果。这边有pyflink在zeppelin上的入门教程
https://www.bilibili.com/video/BV1Te411W73b?p=20
可以加入钉钉群讨论:30022475



jack  于2020年6月9日周二 下午5:28写道:

> 问题请教:
> 描述: pyflink 从source通过sql对数据进行查询聚合等操作
> 不输出到sink中,而是可以直接作为结果,我这边可以通过开发web接口直接查询这个结果,不必去sink中进行查询。
>
> flink能否实现这样的方式?
> 感谢
>


-- 
Best Regards

Jeff Zhang


Re: Run command after Batch is finished

2020-06-06 Thread Jeff Zhang
It would run in the client side where ExecutionEnvironment is created.

Mark Davis  于2020年6月6日周六 下午8:14写道:

> Hi Jeff,
>
> Thank you very much! That is exactly what I need.
>
> Where the listener code will run in the cluster deployment(YARN, k8s)?
> Will it be sent over the network?
>
> Thank you!
>
>   Mark
>
> ‐‐‐ Original Message ‐‐‐
> On Friday, June 5, 2020 6:13 PM, Jeff Zhang  wrote:
>
> You can try JobListener which you can register to ExecutionEnvironment.
>
>
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/execution/JobListener.java
>
>
> Mark Davis  于2020年6月6日周六 上午12:00写道:
>
>> Hi there,
>>
>> I am running a Batch job with several outputs.
>> Is there a way to run some code(e.g. release a distributed lock) after
>> all outputs are finished?
>>
>> Currently I do this in a try-finally block around
>> ExecutionEnvironment.execute() call, but I have to switch to the detached
>> execution mode - in this mode the finally block is never run.
>>
>> Thank you!
>>
>>   Mark
>>
>
>
> --
> Best Regards
>
> Jeff Zhang
>
>
>

-- 
Best Regards

Jeff Zhang


Re: Run command after Batch is finished

2020-06-05 Thread Jeff Zhang
You can try JobListener which you can register to ExecutionEnvironment.

https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/execution/JobListener.java


Mark Davis  于2020年6月6日周六 上午12:00写道:

> Hi there,
>
> I am running a Batch job with several outputs.
> Is there a way to run some code(e.g. release a distributed lock) after all
> outputs are finished?
>
> Currently I do this in a try-finally block around
> ExecutionEnvironment.execute() call, but I have to switch to the detached
> execution mode - in this mode the finally block is never run.
>
> Thank you!
>
>   Mark
>


-- 
Best Regards

Jeff Zhang


Re: Table Environment for Remote Execution

2020-06-03 Thread Jeff Zhang
Hi Satyam,

I also meet the same issue when I integrate flink with zeppelin. Here's
what I did.

https://github.com/apache/zeppelin/blob/master/flink/src/main/java/org/apache/zeppelin/flink/TableEnvFactory.java#L226

If you are interested in flink on zeppelin, you can refer the following
blogs and videos.

Flink on Zeppelin video
https://www.youtube.com/watch?v=YxPo0Fosjjg&list=PL4oy12nnS7FFtg3KV1iS5vDb0pTz12VcX

Flink on Zeppelin tutorial blogs: 1) Get started
https://link.medium.com/oppqD6dIg5
<https://www.youtube.com/redirect?q=https%3A%2F%2Flink.medium.com%2FoppqD6dIg5&redir_token=qt__OhcVRsf8bhBrj4t08HvKm_l8MTU5MTMyNTYwMkAxNTkxMjM5MjAy&v=YxPo0Fosjjg&event=video_description>
2) Batch https://link.medium.com/3qumbwRIg5
<https://www.youtube.com/redirect?q=https%3A%2F%2Flink.medium.com%2F3qumbwRIg5&redir_token=qt__OhcVRsf8bhBrj4t08HvKm_l8MTU5MTMyNTYwMkAxNTkxMjM5MjAy&v=YxPo0Fosjjg&event=video_description>
3) Streaming https://link.medium.com/RBHa2lTIg5
<https://www.youtube.com/redirect?q=https%3A%2F%2Flink.medium.com%2FRBHa2lTIg5&redir_token=qt__OhcVRsf8bhBrj4t08HvKm_l8MTU5MTMyNTYwMkAxNTkxMjM5MjAy&v=YxPo0Fosjjg&event=video_description>
4) Advanced usage https://link.medium.com/CAekyoXIg5
<https://www.youtube.com/redirect?q=https%3A%2F%2Flink.medium.com%2FCAekyoXIg5&redir_token=qt__OhcVRsf8bhBrj4t08HvKm_l8MTU5MTMyNTYwMkAxNTkxMjM5MjAy&v=YxPo0Fosjjg&event=video_description>



Satyam Shekhar  于2020年6月4日周四 上午2:27写道:

>
> Thanks, Jark & Godfrey.
>
> The workaround was successful.
>
> I have created the following ticket to track the issue -
> https://issues.apache.org/jira/browse/FLINK-18095
>
> Regards,
> Satyam
>
> On Wed, Jun 3, 2020 at 3:26 AM Jark Wu  wrote:
>
>> Hi Satyam,
>>
>> In the long term, TableEnvironment is the entry point for pure Table/SQL
>> users. So it should have all the ability of StreamExecutionEnvironment.
>> I think remote execution is a reasonable feature, could you create an
>> JIRA issue for this?
>>
>> As a workaround, you can construct `StreamTableEnvironmentImpl` by
>> yourself via constructor, it can support batch mode
>> and StreamExecutionEnvironment.
>>
>> Best,
>> Jark
>>
>>
>> On Wed, 3 Jun 2020 at 16:35, Satyam Shekhar 
>> wrote:
>>
>>> Thanks for the reply, Godfrey.
>>>
>>> I would also love to learn the reasoning behind that limitation.
>>>
>>> For more context, I am building a Java application that receives some
>>> user input via a GRPC service. The user's input is translated to some SQL
>>> that may be executed in streaming or batch mode based on custom business
>>> logic and submitted it to Flink for execution. In my current setup, I
>>> create an ExecutionEnvironment, register sources, and execute the
>>> corresponding SQL. I was able to achieve the desired behavior with
>>> StreamTableEnvironment but it has limitations around supported SQL in batch
>>> mode.
>>>
>>> While invoking the CLI from java program might be a solution, it doesn't
>>> feel like the most natural solution for the problem. Are there other ways
>>> to address this?
>>>
>>> Regards,
>>> Satyam
>>>
>>> On Wed, Jun 3, 2020 at 12:50 AM godfrey he  wrote:
>>>
>>>> Hi Satyam,
>>>>
>>>> for blink batch mode, only TableEnvironment can be used,
>>>> and TableEnvironment do not take StreamExecutionEnvironment as argument.
>>>> Instead StreamExecutionEnvironment instance is created internally.
>>>>
>>>> back to your requirement, you can build your table program as user jar,
>>>> and submit the job through flink cli [1] to remote environment.
>>>>
>>>> Bests,
>>>> Godfrey
>>>>
>>>> [1] https://ci.apache.org/projects/flink/flink-docs-stable/ops/cli.html
>>>>
>>>>
>>>>
>>>> Satyam Shekhar  于2020年6月3日周三 下午2:59写道:
>>>>
>>>>> Hello,
>>>>>
>>>>> I am running into a very basic problem while working with Table API. I
>>>>> wish to create a TableEnvironment connected to a remote environment that
>>>>> uses Blink planner in batch mode. Examples and documentation I have come
>>>>> across so far recommend the following pattern to create such an 
>>>>> environment
>>>>> -
>>>>>
>>>>> var settings = EnvironmentSettings.newInstance()
>>>>>   .useBlinkPlanner()
>>>>>   .inBatchMode()
>>>>>   .build();
>>>>> var tEnv = TableEnvironment.create(settings);
>>>>>
>>>>> The above configuration, however, does not connect to a remote
>>>>> environment. Tracing code in TableEnvironment.java, I see the
>>>>> following method in BlinkExecutorFactory.java that appears to
>>>>> relevant -
>>>>>
>>>>> Executor create(Map, StreamExecutionEnvironment);
>>>>>
>>>>> However, it seems to be only accessible through the Scala bridge. I
>>>>> can't seem to find a way to instantiate a TableEnvironment that takes
>>>>> StreamExecutionEnvironment as an argument. How do I achieve that?
>>>>>
>>>>> Regards,
>>>>> Satyam
>>>>>
>>>>

-- 
Best Regards

Jeff Zhang


Re: What's the best practice to determine whether a job has finished or not?

2020-05-08 Thread Jeff Zhang
I use JobListener#onJobExecuted to be notified that the flink job is done.
It is pretty reliable for me, the only exception is the client process is
down.

BTW, the reason you see ApplicationNotFound exception is that yarn app is
terminated which means the flink cluster is shutdown. While for standalone
mode, the flink cluster is always up.


Caizhi Weng  于2020年5月8日周五 下午2:47写道:

> Hi dear Flink community,
>
> I would like to determine whether a job has finished (no matter
> successfully or exceptionally) in my code.
>
> I used to think that JobClient#getJobStatus is a good idea, but I found
> that it behaves quite differently under different executing environments.
> For example, under a standalone session cluster it will return the FINISHED
> status for a finished job, while under a yarn per job cluster it will throw
> a ApplicationNotFound exception. I'm afraid that there might be other
> behaviors for other environments.
>
> So what's the best practice to determine whether a job has finished or
> not? Note that I'm not waiting for the job to finish. If the job hasn't
> finished I would like to know it and do something else.
>


-- 
Best Regards

Jeff Zhang


Re: Enable custom REST APIs in Flink

2020-04-21 Thread Jeff Zhang
I know some users do the same thing in spark. Usually the service run spark
driver side. But flink is different from spark. Spark driver is equal to
flink client + flink job manager. I don't think currently we allow to run
any user code in job manager. So allow running user defined service in job
manager might a big change for flink.



Flavio Pompermaier  于2020年4月21日周二 下午11:06写道:

> In my mind the user API could run everywhere but the simplest thing is to
> make them available in the Job Manager (where the other REST API lives).
> They could become a very simple but powerful way to add valuable services
> to Flink without adding useless complexity to the overall architecture for
> just a few methods.
> I don't know whether Spark or Beam allow you to do something like that but
> IMHO it's super useful (especially from a maintenance point of view wrt the
> overall architecture complexity).
>
> @Oytun indeed we'd like to avoid recompiling everything when a single user
> class (i.e. not related to Flink classes) is modified or added. Glad to see
> that there are other people having the same problem here
>
> On Tue, Apr 21, 2020 at 4:39 PM Jeff Zhang  wrote:
>
>> Hi Flavio,
>>
>> I am curious know where service run, Do you create this service in UDF
>> and run it  in TM ?
>>
>> Flavio Pompermaier  于2020年4月21日周二 下午8:30写道:
>>
>>> Hi to all,
>>> many times it happens that we use Flink as a broker towards the data
>>> layer but we need to be able to get some specific info from the
>>> data sources we use (i.e. get triggers and relationships from jdbc).
>>> The quick and dirty way of achieving this is to run a Flink job that
>>> calls another service to store the required info. Another solution could be
>>> to add a custom REST service that contains a lot of dependencies already
>>> provided by Flink, with the risk of having misaligned versions between the
>>> 2..
>>> It would be much simpler to enable users to add custom REST services to
>>> Flink in a configurable file. something like:
>>> /myservice1/* -> com.example.MyRestService1
>>> /myservice2/* -> com.example.MyRestService2
>>>
>>> The listed classes should be contained in a jar within the Flink lib dir
>>> and should implement a common interface.
>>> In order to avoid path collisions with already existing FLINK services,
>>> the configured path can be further prefixed with some other token (e.g.
>>> /userapi/*).
>>>
>>> What do you think about this? Does it sound reasonable to you?
>>> Am I the only one that thinks this could be useful for many use cases?
>>>
>>> Best,
>>> Flavio
>>>
>>
>>
>> --
>> Best Regards
>>
>> Jeff Zhang
>>
>

-- 
Best Regards

Jeff Zhang


Re: Enable custom REST APIs in Flink

2020-04-21 Thread Jeff Zhang
Hi Flavio,

I am curious know where service run, Do you create this service in UDF and
run it  in TM ?

Flavio Pompermaier  于2020年4月21日周二 下午8:30写道:

> Hi to all,
> many times it happens that we use Flink as a broker towards the data layer
> but we need to be able to get some specific info from the data sources we
> use (i.e. get triggers and relationships from jdbc).
> The quick and dirty way of achieving this is to run a Flink job that calls
> another service to store the required info. Another solution could be to
> add a custom REST service that contains a lot of dependencies already
> provided by Flink, with the risk of having misaligned versions between the
> 2..
> It would be much simpler to enable users to add custom REST services to
> Flink in a configurable file. something like:
> /myservice1/* -> com.example.MyRestService1
> /myservice2/* -> com.example.MyRestService2
>
> The listed classes should be contained in a jar within the Flink lib dir
> and should implement a common interface.
> In order to avoid path collisions with already existing FLINK services,
> the configured path can be further prefixed with some other token (e.g.
> /userapi/*).
>
> What do you think about this? Does it sound reasonable to you?
> Am I the only one that thinks this could be useful for many use cases?
>
> Best,
> Flavio
>


-- 
Best Regards

Jeff Zhang


Re: FLINK JOB

2020-04-20 Thread Jeff Zhang
I see, so you are running flink interpreter in local mode. But you access
zeppelin from a remote machine, right ?  Do you mean you can access it
after changing localhost to ip ? If so, then I can add one configuration in
zeppelin side to replace the localhost to real ip.

Som Lima  于2020年4月20日周一 下午4:44写道:

> I am only running the zeppelin  word count example by clicking the
> zeppelin run arrow.
>
>
> On Mon, 20 Apr 2020, 09:42 Jeff Zhang,  wrote:
>
>> How do you run flink job ? It should not always be localhost:8081
>>
>> Som Lima  于2020年4月20日周一 下午4:33写道:
>>
>>> Hi,
>>>
>>> FLINK JOB  url  defaults to localhost
>>>
>>> i.e. localhost:8081.
>>>
>>> I have to manually change it to server :8081 to get Apache  flink
>>> Web Dashboard to display.
>>>
>>>
>>>
>>>
>>>
>>
>> --
>> Best Regards
>>
>> Jeff Zhang
>>
>

-- 
Best Regards

Jeff Zhang


Re: FLINK JOB

2020-04-20 Thread Jeff Zhang
How do you run flink job ? It should not always be localhost:8081

Som Lima  于2020年4月20日周一 下午4:33写道:

> Hi,
>
> FLINK JOB  url  defaults to localhost
>
> i.e. localhost:8081.
>
> I have to manually change it to server :8081 to get Apache  flink  Web
> Dashboard to display.
>
>
>
>
>

-- 
Best Regards

Jeff Zhang


Re: Job manager URI rpc address:port

2020-04-19 Thread Jeff Zhang
Glad to hear that.

Som Lima  于2020年4月20日周一 上午8:08写道:

> I will thanks.  Once I had it set up and working.
> I switched  my computers around from client to server to server to client.
> With your excellent instructions I was able to do it in 5 .minutes
>
> On Mon, 20 Apr 2020, 00:05 Jeff Zhang,  wrote:
>
>> Som, Let us know when you have any problems
>>
>> Som Lima  于2020年4月20日周一 上午2:31写道:
>>
>>> Thanks for the info and links.
>>>
>>> I had a lot of problems I am not sure what I was doing wrong.
>>>
>>> May be conflicts with setup from apache spark.  I think I may need to
>>> setup users for each development.
>>>
>>>
>>> Anyway I kept doing fresh installs about four altogether I think.
>>>
>>> Everything works fine now
>>> Including remote access  of zeppelin on machines across the local area
>>> network.
>>>
>>> Next step  setup remote clusters
>>>  Wish me luck !
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Sun, 19 Apr 2020, 14:58 Jeff Zhang,  wrote:
>>>
>>>> Hi Som,
>>>>
>>>> You can take a look at flink on zeppelin, in zeppelin you can connect
>>>> to a remote flink cluster via a few configuration, and you don't need to
>>>> worry about the jars. Flink interpreter will ship necessary jars for you.
>>>> Here's a list of tutorials.
>>>>
>>>> 1) Get started https://link.medium.com/oppqD6dIg5
>>>> <https://t.co/PTouUYYTrv?amp=1> 2) Batch https://
>>>> link.medium.com/3qumbwRIg5 <https://t.co/Yo9QAY0Joj?amp=1> 3)
>>>> Streaming https://link.medium.com/RBHa2lTIg5
>>>> <https://t.co/sUapN40tvI?amp=1> 4) Advanced usage https://
>>>> link.medium.com/CAekyoXIg5 <https://t.co/MXolULmafZ?amp=1>
>>>>
>>>>
>>>> Zahid Rahman  于2020年4月19日周日 下午7:27写道:
>>>>
>>>>> Hi Tison,
>>>>>
>>>>> I think I may have found what I want in example 22.
>>>>>
>>>>> https://www.programcreek.com/java-api-examples/?api=org.apache.flink.configuration.Configuration
>>>>>
>>>>> I need to create Configuration object first as shown .
>>>>>
>>>>> Also I think  flink-conf.yaml file may contain configuration for
>>>>> client rather than  server. So before starting is irrelevant.
>>>>> I am going to play around and see but if the Configuration class
>>>>> allows me to set configuration programmatically and overrides the yaml 
>>>>> file
>>>>> then that would be great.
>>>>>
>>>>>
>>>>>
>>>>> On Sun, 19 Apr 2020, 11:35 Som Lima,  wrote:
>>>>>
>>>>>> Thanks.
>>>>>> flink-conf.yaml does allow me to do what I need to do without making
>>>>>> any changes to client source code.
>>>>>>
>>>>>> But
>>>>>> RemoteStreamEnvironment constructor  expects a jar file as the third
>>>>>> parameter also.
>>>>>>
>>>>>> RemoteStreamEnvironment
>>>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.7/api/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.html#RemoteStreamEnvironment-java.lang.String-int-java.lang.String...->
>>>>>> (String
>>>>>> <http://docs.oracle.com/javase/7/docs/api/java/lang/String.html?is-external=true>
>>>>>>  host,
>>>>>> int port, String
>>>>>> <http://docs.oracle.com/javase/7/docs/api/java/lang/String.html?is-external=true>
>>>>>> ... jarFiles)
>>>>>> Creates a new RemoteStreamEnvironment that points to the master
>>>>>> (JobManager) described by the given host name and port.
>>>>>>
>>>>>> On Sun, 19 Apr 2020, 11:02 tison,  wrote:
>>>>>>
>>>>>>> You can change flink-conf.yaml "jobmanager.address" or
>>>>>>> "jobmanager.port" options before run the program or take a look at
>>>>>>> RemoteStreamEnvironment which enables configuring host and port.
>>>>>>>
>>>>>>> Best,
>>>>>>> tison.
>>>>>>>
>>>>>>>
>>>>>>> Som Lima  于2020年4月19日周日 下午5:58写道:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> After running
>>>>>>>>
>>>>>>>> $ ./bin/start-cluster.sh
>>>>>>>>
>>>>>>>> The following line of code defaults jobmanager  to localhost:6123
>>>>>>>>
>>>>>>>> final  ExecutionEnvironment env =
>>>>>>>> Environment.getExecutionEnvironment();
>>>>>>>>
>>>>>>>> which is same on spark.
>>>>>>>>
>>>>>>>> val spark =
>>>>>>>> SparkSession.builder.master(local[*]).appname("anapp").getOrCreate
>>>>>>>>
>>>>>>>> However if I wish to run the servers on a different physical
>>>>>>>> computer.
>>>>>>>> Then in Spark I can do it this way using the spark URI in my IDE.
>>>>>>>>
>>>>>>>> Conf =
>>>>>>>> SparkConf().setMaster("spark://:").setAppName("anapp")
>>>>>>>>
>>>>>>>> Can you please tell me the equivalent change to make so I can run
>>>>>>>> my servers and my IDE from different physical computers.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>
>>>> --
>>>> Best Regards
>>>>
>>>> Jeff Zhang
>>>>
>>>
>>
>> --
>> Best Regards
>>
>> Jeff Zhang
>>
>

-- 
Best Regards

Jeff Zhang


Re: Job manager URI rpc address:port

2020-04-19 Thread Jeff Zhang
Som, Let us know when you have any problems

Som Lima  于2020年4月20日周一 上午2:31写道:

> Thanks for the info and links.
>
> I had a lot of problems I am not sure what I was doing wrong.
>
> May be conflicts with setup from apache spark.  I think I may need to
> setup users for each development.
>
>
> Anyway I kept doing fresh installs about four altogether I think.
>
> Everything works fine now
> Including remote access  of zeppelin on machines across the local area
> network.
>
> Next step  setup remote clusters
>  Wish me luck !
>
>
>
>
>
>
>
> On Sun, 19 Apr 2020, 14:58 Jeff Zhang,  wrote:
>
>> Hi Som,
>>
>> You can take a look at flink on zeppelin, in zeppelin you can connect to
>> a remote flink cluster via a few configuration, and you don't need to worry
>> about the jars. Flink interpreter will ship necessary jars for you. Here's
>> a list of tutorials.
>>
>> 1) Get started https://link.medium.com/oppqD6dIg5
>> <https://t.co/PTouUYYTrv?amp=1> 2) Batch https://
>> link.medium.com/3qumbwRIg5 <https://t.co/Yo9QAY0Joj?amp=1> 3) Streaming
>> https://link.medium.com/RBHa2lTIg5 <https://t.co/sUapN40tvI?amp=1> 4)
>> Advanced usage https://link.medium.com/CAekyoXIg5
>> <https://t.co/MXolULmafZ?amp=1>
>>
>>
>> Zahid Rahman  于2020年4月19日周日 下午7:27写道:
>>
>>> Hi Tison,
>>>
>>> I think I may have found what I want in example 22.
>>>
>>> https://www.programcreek.com/java-api-examples/?api=org.apache.flink.configuration.Configuration
>>>
>>> I need to create Configuration object first as shown .
>>>
>>> Also I think  flink-conf.yaml file may contain configuration for client
>>> rather than  server. So before starting is irrelevant.
>>> I am going to play around and see but if the Configuration class allows
>>> me to set configuration programmatically and overrides the yaml file then
>>> that would be great.
>>>
>>>
>>>
>>> On Sun, 19 Apr 2020, 11:35 Som Lima,  wrote:
>>>
>>>> Thanks.
>>>> flink-conf.yaml does allow me to do what I need to do without making
>>>> any changes to client source code.
>>>>
>>>> But
>>>> RemoteStreamEnvironment constructor  expects a jar file as the third
>>>> parameter also.
>>>>
>>>> RemoteStreamEnvironment
>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.7/api/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.html#RemoteStreamEnvironment-java.lang.String-int-java.lang.String...->
>>>> (String
>>>> <http://docs.oracle.com/javase/7/docs/api/java/lang/String.html?is-external=true>
>>>>  host,
>>>> int port, String
>>>> <http://docs.oracle.com/javase/7/docs/api/java/lang/String.html?is-external=true>
>>>> ... jarFiles)
>>>> Creates a new RemoteStreamEnvironment that points to the master
>>>> (JobManager) described by the given host name and port.
>>>>
>>>> On Sun, 19 Apr 2020, 11:02 tison,  wrote:
>>>>
>>>>> You can change flink-conf.yaml "jobmanager.address" or
>>>>> "jobmanager.port" options before run the program or take a look at
>>>>> RemoteStreamEnvironment which enables configuring host and port.
>>>>>
>>>>> Best,
>>>>> tison.
>>>>>
>>>>>
>>>>> Som Lima  于2020年4月19日周日 下午5:58写道:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> After running
>>>>>>
>>>>>> $ ./bin/start-cluster.sh
>>>>>>
>>>>>> The following line of code defaults jobmanager  to localhost:6123
>>>>>>
>>>>>> final  ExecutionEnvironment env =
>>>>>> Environment.getExecutionEnvironment();
>>>>>>
>>>>>> which is same on spark.
>>>>>>
>>>>>> val spark =
>>>>>> SparkSession.builder.master(local[*]).appname("anapp").getOrCreate
>>>>>>
>>>>>> However if I wish to run the servers on a different physical computer.
>>>>>> Then in Spark I can do it this way using the spark URI in my IDE.
>>>>>>
>>>>>> Conf =
>>>>>> SparkConf().setMaster("spark://:").setAppName("anapp")
>>>>>>
>>>>>> Can you please tell me the equivalent change to make so I can run my
>>>>>> servers and my IDE from different physical computers.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>
>> --
>> Best Regards
>>
>> Jeff Zhang
>>
>

-- 
Best Regards

Jeff Zhang


Re: Job manager URI rpc address:port

2020-04-19 Thread Jeff Zhang
Hi Som,

You can take a look at flink on zeppelin, in zeppelin you can connect to a
remote flink cluster via a few configuration, and you don't need to worry
about the jars. Flink interpreter will ship necessary jars for you. Here's
a list of tutorials.

1) Get started https://link.medium.com/oppqD6dIg5
<https://t.co/PTouUYYTrv?amp=1> 2) Batch https://link.medium.com/3qumbwRIg5
<https://t.co/Yo9QAY0Joj?amp=1> 3) Streaming https://
link.medium.com/RBHa2lTIg5 <https://t.co/sUapN40tvI?amp=1> 4) Advanced
usage https://link.medium.com/CAekyoXIg5 <https://t.co/MXolULmafZ?amp=1>


Zahid Rahman  于2020年4月19日周日 下午7:27写道:

> Hi Tison,
>
> I think I may have found what I want in example 22.
>
> https://www.programcreek.com/java-api-examples/?api=org.apache.flink.configuration.Configuration
>
> I need to create Configuration object first as shown .
>
> Also I think  flink-conf.yaml file may contain configuration for client
> rather than  server. So before starting is irrelevant.
> I am going to play around and see but if the Configuration class allows me
> to set configuration programmatically and overrides the yaml file then that
> would be great.
>
>
>
> On Sun, 19 Apr 2020, 11:35 Som Lima,  wrote:
>
>> Thanks.
>> flink-conf.yaml does allow me to do what I need to do without making any
>> changes to client source code.
>>
>> But
>> RemoteStreamEnvironment constructor  expects a jar file as the third
>> parameter also.
>>
>> RemoteStreamEnvironment
>> <https://ci.apache.org/projects/flink/flink-docs-release-1.7/api/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.html#RemoteStreamEnvironment-java.lang.String-int-java.lang.String...->
>> (String
>> <http://docs.oracle.com/javase/7/docs/api/java/lang/String.html?is-external=true>
>>  host,
>> int port, String
>> <http://docs.oracle.com/javase/7/docs/api/java/lang/String.html?is-external=true>
>> ... jarFiles)
>> Creates a new RemoteStreamEnvironment that points to the master
>> (JobManager) described by the given host name and port.
>>
>> On Sun, 19 Apr 2020, 11:02 tison,  wrote:
>>
>>> You can change flink-conf.yaml "jobmanager.address" or "jobmanager.port"
>>> options before run the program or take a look at RemoteStreamEnvironment
>>> which enables configuring host and port.
>>>
>>> Best,
>>> tison.
>>>
>>>
>>> Som Lima  于2020年4月19日周日 下午5:58写道:
>>>
>>>> Hi,
>>>>
>>>> After running
>>>>
>>>> $ ./bin/start-cluster.sh
>>>>
>>>> The following line of code defaults jobmanager  to localhost:6123
>>>>
>>>> final  ExecutionEnvironment env = Environment.getExecutionEnvironment();
>>>>
>>>> which is same on spark.
>>>>
>>>> val spark =
>>>> SparkSession.builder.master(local[*]).appname("anapp").getOrCreate
>>>>
>>>> However if I wish to run the servers on a different physical computer.
>>>> Then in Spark I can do it this way using the spark URI in my IDE.
>>>>
>>>> Conf =
>>>> SparkConf().setMaster("spark://:").setAppName("anapp")
>>>>
>>>> Can you please tell me the equivalent change to make so I can run my
>>>> servers and my IDE from different physical computers.
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>

-- 
Best Regards

Jeff Zhang


Re: Flink SQL Gateway

2020-04-16 Thread Jeff Zhang
Hi Flavio,

If you would like to use have a UI to register data sources, run flink sql
and preview the sql result, then you can use zeppelin directly. You can
check the tutorial here,
1) Get started https://link.medium.com/oppqD6dIg5
<https://t.co/PTouUYYTrv?amp=1> 2) Batch https://link.medium.com/3qumbwRIg5
<https://t.co/Yo9QAY0Joj?amp=1> 3) Streaming https://
link.medium.com/RBHa2lTIg5 <https://t.co/sUapN40tvI?amp=1> 4) Advanced
usage https://link.medium.com/CAekyoXIg5 <https://t.co/MXolULmafZ?amp=1>

And here's one article shared by someone else about how to use flink on
zeppelin.
https://medium.com/@abdelkrim.hadjidj/event-driven-supply-chain-for-crisis-with-flinksql-be80cb3ad4f9

Besides that, Zeppelin provides rest api which you can use to integarte
with other system, but it is not standard jdbc protocol.
http://zeppelin.apache.org/docs/0.9.0-preview1/usage/rest_api/notebook.html

And I am doing more improvement recently, I will reveal more details in
next week's flink forward.
https://www.flink-forward.org/sf-2020/conference-program#it%E2%80%99s-finally-here--python-on-flink---flink-on-zeppelin





Flavio Pompermaier  于2020年4月16日周四 下午8:24写道:

> Basically we want to give a UI to the user to register its data sources
> (i.e. catalogs in the Flink world), preview them (SELECT * LIMIT 100 for
> example) but, in the case of JDBC catalogs, also to see relationships and
> triggers.
> We don't want to reimplement the wheel so we would like to reuse and
> contribute to Flink as much as possible (since then in the batch jobs we
> use Flink and we don't like to do the same work twice..).
> In this way we can contribute to Flink if something is missing in the SQL
> Gateway. However I don't know how to extend the existing stuff (for example
> if I want table relationships and triggers)..
>
> Best,
> Flavio
>
> On Thu, Apr 16, 2020 at 1:38 PM godfrey he  wrote:
>
>> Hi Flavio,
>>
>> Since 1.11(master), Flink supports "CREATE CATALOG ..." [1],
>> we can use this statement create catalog dynamically.
>>
>> Currently, Catalog[2] dose not supports any operations on TRIGGER.
>> Flink can't also use such info now. What's your user scenario?
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-15349
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/catalogs.html#catalog-api
>>
>> Best,
>> Godfrey
>>
>> Flavio Pompermaier  于2020年4月16日周四 下午6:16写道:
>>
>>> Hi Godfrey,
>>> I'd like to use the SQL gateway as a data proxy in our architecture.
>>> However, catalogs in our use case are not know at configuration time..
>>> is there a way to permit to register a JDBC catalog (for example when I
>>> want to connect to a Postgres database)?
>>> What if I want to add SHOW TRIGGERS? Do you think it could be
>>> interesting?
>>>
>>> On Thu, Apr 16, 2020 at 10:55 AM godfrey he  wrote:
>>>
>>>> Hi Flavio,
>>>>
>>>> We prose FLIP-91[1] to support SQL Gateway at the beginning of this
>>>> year.
>>>> After a long discussion, we reached an agreement that
>>>> SQL Gateway is an eco-system under ververia as first step.[2]
>>>> Which could help SQL Gateway move forward faster.
>>>> Now we almost finish first version development, some users are trying
>>>> it out.
>>>> Any suggestions are welcome!
>>>>
>>>> [1]
>>>> https://docs.google.com/document/d/1DKpFdov1o_ObvrCmU-5xi-VrT6nR2gxq-BbswSSI9j8/edit?ts=5e4cd711#heading=h.sru3xzbt2i1k
>>>> [2] https://github.com/ververica/flink-sql-gateway
>>>>
>>>> Best,
>>>> Godfrey
>>>>
>>>> Flavio Pompermaier  于2020年4月16日周四 下午4:42写道:
>>>>
>>>>> Hi Jeff,
>>>>> FLIP-24 [1] proposed to develop a SQL gateway to query Flink via SQL
>>>>> but since then no progress has been made on that point. Do you think that
>>>>> Zeppelin could be used somehow as a SQL Gateway towards Flink for the
>>>>> moment?
>>>>> Any chance that a Flink SQL Gateway could ever be developed? Is there
>>>>> anybody interested in this?
>>>>>
>>>>> Best,
>>>>> Flavio
>>>>>
>>>>> [1]
>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-24+-+SQL+Client
>>>>>
>>>>

-- 
Best Regards

Jeff Zhang


[ANNOUNCE] Flink on Zeppelin (Zeppelin 0.9 is released)

2020-03-30 Thread Jeff Zhang
Hi Folks,

I am very excited to announce the integration work of flink on apache
zeppelin notebook is completed. You can now run flink jobs via datastream
api, table api, sql, pyflink in apache apache zeppelin notebook. Download
it here http://zeppelin.apache.org/download.html),

Here's some highlights of this work

1. Support 3 kind of execution mode: local, remote, yarn
2. Support multiple languages  in one flink session: scala, python, sql
3. Support hive connector (reading from hive and writing to hive)
4. Dependency management
5. UDF support (scala, pyflink)
6. Support both batch sql and streaming sql

For more details and usage instructions, you can refer following 4 blogs

1) Get started https://link.medium.com/oppqD6dIg5
<https://t.co/PTouUYYTrv?amp=1> 2) Batch https://link.medium.com/3qumbwRIg5
<https://t.co/Yo9QAY0Joj?amp=1> 3) Streaming https://
link.medium.com/RBHa2lTIg5 <https://t.co/sUapN40tvI?amp=1> 4) Advanced
usage https://link.medium.com/CAekyoXIg5 <https://t.co/MXolULmafZ?amp=1>

Welcome to use flink on zeppelin and give feedback and comments.

-- 
Best Regards

Jeff Zhang


Re: [Third-party Tool] Flink memory calculator

2020-03-29 Thread Jeff Zhang
Hi Yangze,

Does this tool just parse the configuration in flink-conf.yaml ?  Maybe it
could be done in JobListener [1] (we should enhance it via adding hook
before job submission), so that it could all the cases (e.g. parameters
coming from command line)

[1]
https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/execution/JobListener.java#L35


Yangze Guo  于2020年3月30日周一 上午9:40写道:

> Hi, Yun,
>
> I'm sorry that it currently could not handle it. But I think it is a
> really good idea and that feature would be added to the next version.
>
> Best,
> Yangze Guo
>
> On Mon, Mar 30, 2020 at 12:21 AM Yun Tang  wrote:
> >
> > Very interesting and convenient tool, just a quick question: could this
> tool also handle deployment cluster commands like "-tm" mixed with
> configuration in `flink-conf.yaml` ?
> >
> > Best
> > Yun Tang
> > 
> > From: Yangze Guo 
> > Sent: Friday, March 27, 2020 18:00
> > To: user ; user...@flink.apache.org <
> user...@flink.apache.org>
> > Subject: [Third-party Tool] Flink memory calculator
> >
> > Hi, there.
> >
> > In release-1.10, the memory setup of task managers has changed a lot.
> > I would like to provide here a third-party tool to simulate and get
> > the calculation result of Flink's memory configuration.
> >
> >  Although there is already a detailed setup guide[1] and migration
> > guide[2] officially, the calculator could further allow users to:
> > - Verify if there is any conflict in their configuration. The
> > calculator is more lightweight than starting a Flink cluster,
> > especially when running Flink on Yarn/Kubernetes. User could make sure
> > their configuration is correct locally before deploying it to external
> > resource managers.
> > - Get all of the memory configurations before deploying. User may set
> > taskmanager.memory.task.heap.size and taskmanager.memory.managed.size.
> > But they also want to know the total memory consumption of Flink. With
> > this tool, users could get all of the memory configurations they are
> > interested in. If anything is unexpected, they would not need to
> > re-deploy a Flink cluster.
> >
> > The repo link of this tool is
> > https://github.com/KarmaGYZ/flink-memory-calculator. It reuses the
> > BashJavaUtils.jar of Flink and ensures the calculation result is
> > exactly the same as your Flink dist. For more details, please take a
> > look at the README.
> >
> > Any feedback or suggestion is welcomed!
> >
> > [1]
> https://ci.apache.org/projects/flink/flink-docs-master/ops/memory/mem_setup.html
> > [2]
> https://ci.apache.org/projects/flink/flink-docs-master/ops/memory/mem_migration.html
> >
> > Best,
> > Yangze Guo
>


-- 
Best Regards

Jeff Zhang


Re: Scala Shell gives error "rest.address must be set"

2020-03-17 Thread Jeff Zhang
I agree, this is really confusing for users. Do you mind to create a ticket
for that ?

Craig Foster  于2020年3月18日周三 上午8:36写道:

> If I specify these options, it seems to work...but I thought I could
> have this dynamically determined when submitting jobs just using the
> "yarn" option:
>
> /usr/lib/flink/bin/start-scala-shell.sh yarn -s 4 -jm 1024m -tm 4096m
>
> I guess what isn't clear here to me is that if you use `yarn` alone
> there needs to be an existing yarn cluster already started.
>
>
> On Tue, Mar 17, 2020 at 4:22 PM Craig Foster 
> wrote:
> >
> > Yeah, I was wondering about that. I'm using
> > `/usr/lib/flink/bin/start-scala-shell.sh yarn`-- previously I'd use
> > `/usr/lib/flink/bin/start-scala-shell.sh yarn -n ${NUM}`
> >  but that deprecated option was removed.
> >
> >
> > On Tue, Mar 17, 2020 at 4:11 PM Jeff Zhang  wrote:
> > >
> > > It looks like you are running under standalone mode, what is your
> command to start scala shell. ?
> > >
> > > Craig Foster  于2020年3月18日周三 上午5:23写道:
> > >>
> > >> Hi:
> > >> When I upgraded from Flink 1.9.1 to Flink 1.10.0 I can't execute
> > >> programs at the Scala shell.
> > >>
> > >> It gives me an error that the REST address must be set. This looks
> > >> like it comes from HA but I don't have HA configured for Flink and it
> > >> was very hard to find this documented other than in the PR/JIRA in the
> > >> history so don't have much context. Can someone point me to how to
> > >> configure this properly? For reference, I put the example stacktrace
> > >> below.
> > >>
> > >> scala> val text = benv.fromElements("To be, or not to be,--that is the
> > >> question:--");
> > >> text: org.apache.flink.api.scala.DataSet[String] =
> > >> org.apache.flink.api.scala.DataSet@2396408a
> > >>
> > >> scala> val counts = text.flatMap { _.toLowerCase.split("\\W+")}.map {
> > >> (_, 1) }.groupBy(0).sum(1);
> > >> counts: org.apache.flink.api.scala.AggregateDataSet[(String, Int)] =
> > >> org.apache.flink.api.scala.AggregateDataSet@38bce2ed
> > >>
> > >> scala> counts.print()
> > >> 20/03/17 21:15:34 INFO java.ExecutionEnvironment: The job has 0
> > >> registered types and 0 default Kryo serializers
> > >> 20/03/17 21:15:34 INFO configuration.GlobalConfiguration: Loading
> > >> configuration property: env.yarn.conf.dir, /etc/hadoop/conf
> > >> 20/03/17 21:15:34 INFO configuration.GlobalConfiguration: Loading
> > >> configuration property: env.hadoop.conf.dir, /etc/hadoop/conf
> > >> java.lang.RuntimeException: Couldn't retrieve standalone cluster
> > >>   at
> org.apache.flink.client.deployment.StandaloneClusterDescriptor.lambda$retrieve$0(StandaloneClusterDescriptor.java:53)
> > >>   at
> org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor.execute(AbstractSessionClusterExecutor.java:64)
> > >>   at
> org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:944)
> > >>   at
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:860)
> > >>   at
> org.apache.flink.api.java.ScalaShellEnvironment.execute(ScalaShellEnvironment.java:81)
> > >>   at
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:844)
> > >>   at org.apache.flink.api.java.DataSet.collect(DataSet.java:413)
> > >>   at org.apache.flink.api.java.DataSet.print(DataSet.java:1652)
> > >>   at org.apache.flink.api.scala.DataSet.print(DataSet.scala:1864)
> > >>   ... 30 elided
> > >> Caused by: java.lang.NullPointerException: rest.address must be set
> > >>   at
> org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:104)
> > >>   at
> org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.getWebMonitorAddress(HighAvailabilityServicesUtils.java:196)
> > >>   at
> org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createClientHAService(HighAvailabilityServicesUtils.java:146)
> > >>   at
> org.apache.flink.client.program.rest.RestClusterClient.(RestClusterClient.java:161)
> > >>   at
> org.apache.flink.client.deployment.StandaloneClusterDescriptor.lambda$retrieve$0(StandaloneClusterDescriptor.java:51)
> > >>   ... 38 more
> > >
> > >
> > >
> > > --
> > > Best Regards
> > >
> > > Jeff Zhang
>


-- 
Best Regards

Jeff Zhang


Re: Scala Shell gives error "rest.address must be set"

2020-03-17 Thread Jeff Zhang
It looks like you are running under standalone mode, what is
your command to start scala shell. ?

Craig Foster  于2020年3月18日周三 上午5:23写道:

> Hi:
> When I upgraded from Flink 1.9.1 to Flink 1.10.0 I can't execute
> programs at the Scala shell.
>
> It gives me an error that the REST address must be set. This looks
> like it comes from HA but I don't have HA configured for Flink and it
> was very hard to find this documented other than in the PR/JIRA in the
> history so don't have much context. Can someone point me to how to
> configure this properly? For reference, I put the example stacktrace
> below.
>
> scala> val text = benv.fromElements("To be, or not to be,--that is the
> question:--");
> text: org.apache.flink.api.scala.DataSet[String] =
> org.apache.flink.api.scala.DataSet@2396408a
>
> scala> val counts = text.flatMap { _.toLowerCase.split("\\W+")}.map {
> (_, 1) }.groupBy(0).sum(1);
> counts: org.apache.flink.api.scala.AggregateDataSet[(String, Int)] =
> org.apache.flink.api.scala.AggregateDataSet@38bce2ed
>
> scala> counts.print()
> 20/03/17 21:15:34 INFO java.ExecutionEnvironment: The job has 0
> registered types and 0 default Kryo serializers
> 20/03/17 21:15:34 INFO configuration.GlobalConfiguration: Loading
> configuration property: env.yarn.conf.dir, /etc/hadoop/conf
> 20/03/17 21:15:34 INFO configuration.GlobalConfiguration: Loading
> configuration property: env.hadoop.conf.dir, /etc/hadoop/conf
> java.lang.RuntimeException: Couldn't retrieve standalone cluster
>   at
> org.apache.flink.client.deployment.StandaloneClusterDescriptor.lambda$retrieve$0(StandaloneClusterDescriptor.java:53)
>   at
> org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor.execute(AbstractSessionClusterExecutor.java:64)
>   at
> org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:944)
>   at
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:860)
>   at
> org.apache.flink.api.java.ScalaShellEnvironment.execute(ScalaShellEnvironment.java:81)
>   at
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:844)
>   at org.apache.flink.api.java.DataSet.collect(DataSet.java:413)
>   at org.apache.flink.api.java.DataSet.print(DataSet.java:1652)
>   at org.apache.flink.api.scala.DataSet.print(DataSet.scala:1864)
>   ... 30 elided
> Caused by: java.lang.NullPointerException: rest.address must be set
>   at
> org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:104)
>   at
> org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.getWebMonitorAddress(HighAvailabilityServicesUtils.java:196)
>   at
> org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createClientHAService(HighAvailabilityServicesUtils.java:146)
>   at
> org.apache.flink.client.program.rest.RestClusterClient.(RestClusterClient.java:161)
>   at
> org.apache.flink.client.deployment.StandaloneClusterDescriptor.lambda$retrieve$0(StandaloneClusterDescriptor.java:51)
>   ... 38 more
>


-- 
Best Regards

Jeff Zhang


Re: How to override flink-conf parameters for SQL Client

2020-03-05 Thread Jeff Zhang
There's 2 kinds of configuration: job level & cluster level. I am afraid we
don't have document to differentiate that, it depends on how user
understand these configuration. We may need to improve document on that.

Kurt Young  于2020年3月6日周五 上午8:34写道:

> If you already have a running flink cluster and you want submit another
> job to this cluster, then all the configurations
> relates to process parameters like TM memory, slot number etc are not be
> able to modify.
>
> Best,
> Kurt
>
>
> On Thu, Mar 5, 2020 at 11:08 PM Gyula Fóra  wrote:
>
>> Kurt can you please explain which conf parameters do you mean?
>>
>> In regular executions (Yarn for instance) we  have dynamic config
>> parameters overriding any flink-conf argument.
>> So it is not about setting them in the user code but it should happen
>> before the ClusterDescriptors are created (ie in the together with the
>> CustomCommandLine logic)
>>
>> Gyula
>>
>> On Thu, Mar 5, 2020 at 3:49 PM Kurt Young  wrote:
>>
>>> IIRC the tricky thing here is not all the config options belong to
>>> flink-conf.yaml can be adjust dynamically in user's program.
>>> So it will end up like some of the configurations can be overridden but
>>> some are not. The experience is not quite good for users.
>>>
>>> Best,
>>> Kurt
>>>
>>>
>>> On Thu, Mar 5, 2020 at 10:15 PM Jeff Zhang  wrote:
>>>
>>>> Hi Gyula,
>>>>
>>>> I am doing integration Flink with Zeppelin. One feature in Zeppelin is
>>>> that user could override any features in flink-conf.yaml. (Actually any
>>>> features here
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/config.html).
>>>> Of course you can run flink sql in Zeppelin, and could also leverage other
>>>> features of Zeppelin, like visualization.
>>>>
>>>> If you are interested, you could try the master branch of Zeppelin +
>>>> this improvement PR
>>>>
>>>> https://github.com/apache/zeppelin
>>>> https://github.com/apache/zeppelin/pull/3676
>>>> https://github.com/apache/zeppelin/blob/master/docs/interpreter/flink.md
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> Gyula Fóra  于2020年3月5日周四 下午6:51写道:
>>>>
>>>>> I could basically list a few things I want to set (execution.target
>>>>> for example), but it's fair to assume that I would like to be able to set
>>>>> anything :)
>>>>>
>>>>> Gyula
>>>>>
>>>>> On Thu, Mar 5, 2020 at 11:35 AM Jingsong Li 
>>>>> wrote:
>>>>>
>>>>>> Hi Gyula,
>>>>>>
>>>>>> Maybe Blink planner has invoked
>>>>>> "StreamExecutionEnvironment.configure", which planner do you use?
>>>>>>
>>>>>> But "StreamExecutionEnvironment.configure" is only for partial
>>>>>> configuration, can not for all configuration in flink-conf.yaml.
>>>>>> So what's the config do you want to set? I know some config like
>>>>>> "taskmanager.network.blocking-shuffle.compression.enabled" can not 
>>>>>> set
>>>>>>
>>>>>> Best,
>>>>>> Jingsong Lee
>>>>>>
>>>>>> On Thu, Mar 5, 2020 at 6:17 PM Jark Wu  wrote:
>>>>>>
>>>>>>> Hi Gyula,
>>>>>>>
>>>>>>> Flink configurations can be overrided via
>>>>>>> `TableConfig#getConfiguration()`, however, SQL CLI only allows to set 
>>>>>>> Table
>>>>>>> specific configs.
>>>>>>> I will think it as a bug/improvement of SQL CLI which should be
>>>>>>> fixed in 1.10.1.
>>>>>>>
>>>>>>> Best,
>>>>>>> Jark
>>>>>>>
>>>>>>> On Thu, 5 Mar 2020 at 18:12, Gyula Fóra 
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Thanks Caizhi,
>>>>>>>>
>>>>>>>> This seems like a pretty big shortcoming for any
>>>>>>>> multi-user/multi-app environment. I will open a jira for this.
>>>>>>>>
>>>>>>>> Gyula
>>>>>>>>
>>

Re: How to override flink-conf parameters for SQL Client

2020-03-05 Thread Jeff Zhang
Hi Gyula,

I am doing integration Flink with Zeppelin. One feature in Zeppelin is that
user could override any features in flink-conf.yaml. (Actually any features
here
https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/config.html).
Of course you can run flink sql in Zeppelin, and could also leverage other
features of Zeppelin, like visualization.

If you are interested, you could try the master branch of Zeppelin + this
improvement PR

https://github.com/apache/zeppelin
https://github.com/apache/zeppelin/pull/3676
https://github.com/apache/zeppelin/blob/master/docs/interpreter/flink.md






Gyula Fóra  于2020年3月5日周四 下午6:51写道:

> I could basically list a few things I want to set (execution.target for
> example), but it's fair to assume that I would like to be able to set
> anything :)
>
> Gyula
>
> On Thu, Mar 5, 2020 at 11:35 AM Jingsong Li 
> wrote:
>
>> Hi Gyula,
>>
>> Maybe Blink planner has invoked "StreamExecutionEnvironment.configure",
>> which planner do you use?
>>
>> But "StreamExecutionEnvironment.configure" is only for partial
>> configuration, can not for all configuration in flink-conf.yaml.
>> So what's the config do you want to set? I know some config like
>> "taskmanager.network.blocking-shuffle.compression.enabled" can not set
>>
>> Best,
>> Jingsong Lee
>>
>> On Thu, Mar 5, 2020 at 6:17 PM Jark Wu  wrote:
>>
>>> Hi Gyula,
>>>
>>> Flink configurations can be overrided via
>>> `TableConfig#getConfiguration()`, however, SQL CLI only allows to set Table
>>> specific configs.
>>> I will think it as a bug/improvement of SQL CLI which should be fixed in
>>> 1.10.1.
>>>
>>> Best,
>>> Jark
>>>
>>> On Thu, 5 Mar 2020 at 18:12, Gyula Fóra  wrote:
>>>
>>>> Thanks Caizhi,
>>>>
>>>> This seems like a pretty big shortcoming for any multi-user/multi-app
>>>> environment. I will open a jira for this.
>>>>
>>>> Gyula
>>>>
>>>> On Thu, Mar 5, 2020 at 10:58 AM Caizhi Weng 
>>>> wrote:
>>>>
>>>>> Hi Gyula.
>>>>>
>>>>> I'm afraid there is no way to override all Flink configurations
>>>>> currently. SQL client yaml file can only override some of the Flink
>>>>> configurations.
>>>>>
>>>>> Configuration entries indeed can only set Table specific configs,
>>>>> while deployment entires are used to set the result fetching address and
>>>>> port. There is currently no way to change the execution target from the 
>>>>> SQL
>>>>> client.
>>>>>
>>>>> Gyula Fóra  于2020年3月5日周四 下午4:31写道:
>>>>>
>>>>>> Hi All!
>>>>>>
>>>>>> I am trying to understand if there is any way to override flink
>>>>>> configuration parameters when starting the SQL Client.
>>>>>>
>>>>>> It seems that the only way to pass any parameters is through the
>>>>>> environment yaml.
>>>>>>
>>>>>> There I found 2 possible routes:
>>>>>>
>>>>>> configuration: this doesn't work as it only sets Table specific
>>>>>> configs apparently, but maybe I am wrong.
>>>>>>
>>>>>> deployment: I tried using dynamic properties options here but
>>>>>> unfortunately we normalize (lowercase) the YAML keys so it is impossible 
>>>>>> to
>>>>>> pass options like -yD or -D.
>>>>>>
>>>>>> Does anyone have any suggestions?
>>>>>>
>>>>>> Thanks
>>>>>> Gyula
>>>>>>
>>>>>
>>
>> --
>> Best, Jingsong Lee
>>
>

-- 
Best Regards

Jeff Zhang


Re: SHOW CREATE TABLE in Flink SQL

2020-03-02 Thread Jeff Zhang
+1 for this, maybe we can add 'describe extended table' like hive

Gyula Fóra  于2020年3月2日周一 下午8:49写道:

> Hi All!
>
> I am looking for the functionality to show how a table was created or show
> all the properties (connector, etc.)
>
> I could only find DESCRIBE at this point which only shows the schema.
>
> Is there anything similar to "SHOW CREATE TABLE" or is this something that
> we should maybe add in the future?
>
> Thank you!
> Gyula
>


-- 
Best Regards

Jeff Zhang


Re: [ANNOUNCE] Jingsong Lee becomes a Flink committer

2020-02-20 Thread Jeff Zhang
Congratulations!Jingsong. You deserve it

wenlong.lwl  于2020年2月21日周五 上午11:43写道:

> Congrats Jingsong!
>
> On Fri, 21 Feb 2020 at 11:41, Dian Fu  wrote:
>
> > Congrats Jingsong!
> >
> > > 在 2020年2月21日,上午11:39,Jark Wu  写道:
> > >
> > > Congratulations Jingsong! Well deserved.
> > >
> > > Best,
> > > Jark
> > >
> > > On Fri, 21 Feb 2020 at 11:32, zoudan  wrote:
> > >
> > >> Congratulations! Jingsong
> > >>
> > >>
> > >> Best,
> > >> Dan Zou
> > >>
> >
> >
>


-- 
Best Regards

Jeff Zhang


Re: [ANNOUNCE] Apache Flink 1.10.0 released

2020-02-12 Thread Jeff Zhang
Congratulations! Really appreciated your hard work.

Yangze Guo  于2020年2月13日周四 上午9:29写道:

> Thanks, Gary & Yu. Congrats to everyone involved!
>
> Best,
> Yangze Guo
>
> On Thu, Feb 13, 2020 at 9:23 AM Jingsong Li 
> wrote:
> >
> > Congratulations! Great work.
> >
> > Best,
> > Jingsong Lee
> >
> > On Wed, Feb 12, 2020 at 11:05 PM Leonard Xu  wrote:
> >>
> >> Great news!
> >> Thanks everyone involved !
> >> Thanks Gary and Yu for being the release manager !
> >>
> >> Best,
> >> Leonard Xu
> >>
> >> 在 2020年2月12日,23:02,Stephan Ewen  写道:
> >>
> >> Congrats to us all.
> >>
> >> A big piece of work, nicely done.
> >>
> >> Let's hope that this helps our users make their existing use cases
> easier and also opens up new use cases.
> >>
> >> On Wed, Feb 12, 2020 at 3:31 PM 张光辉  wrote:
> >>>
> >>> Greet work.
> >>>
> >>> Congxian Qiu  于2020年2月12日周三 下午10:11写道:
> >>>>
> >>>> Great work.
> >>>> Thanks everyone involved.
> >>>> Thanks Gary and Yu for being the release manager
> >>>>
> >>>>
> >>>> Best,
> >>>> Congxian
> >>>>
> >>>>
> >>>> Jark Wu  于2020年2月12日周三 下午9:46写道:
> >>>>>
> >>>>> Congratulations to everyone involved!
> >>>>> Great thanks to Yu & Gary for being the release manager!
> >>>>>
> >>>>> Best,
> >>>>> Jark
> >>>>>
> >>>>> On Wed, 12 Feb 2020 at 21:42, Zhu Zhu  wrote:
> >>>>>>
> >>>>>> Cheers!
> >>>>>> Thanks Gary and Yu for the great job as release managers.
> >>>>>> And thanks to everyone whose contribution makes the release
> possible!
> >>>>>>
> >>>>>> Thanks,
> >>>>>> Zhu Zhu
> >>>>>>
> >>>>>> Wyatt Chun  于2020年2月12日周三 下午9:36写道:
> >>>>>>>
> >>>>>>> Sounds great. Congrats & Thanks!
> >>>>>>>
> >>>>>>> On Wed, Feb 12, 2020 at 9:31 PM Yu Li  wrote:
> >>>>>>>>
> >>>>>>>> The Apache Flink community is very happy to announce the release
> of Apache Flink 1.10.0, which is the latest major release.
> >>>>>>>>
> >>>>>>>> Apache Flink® is an open-source stream processing framework for
> distributed, high-performing, always-available, and accurate data streaming
> applications.
> >>>>>>>>
> >>>>>>>> The release is available for download at:
> >>>>>>>> https://flink.apache.org/downloads.html
> >>>>>>>>
> >>>>>>>> Please check out the release blog post for an overview of the
> improvements for this new major release:
> >>>>>>>> https://flink.apache.org/news/2020/02/11/release-1.10.0.html
> >>>>>>>>
> >>>>>>>> The full release notes are available in Jira:
> >>>>>>>>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12345845
> >>>>>>>>
> >>>>>>>> We would like to thank all contributors of the Apache Flink
> community who made this release possible!
> >>>>>>>>
> >>>>>>>> Cheers,
> >>>>>>>> Gary & Yu
> >>
> >>
> >
> >
> > --
> > Best, Jingsong Lee
>


-- 
Best Regards

Jeff Zhang


Re: [DISCUSS] Upload the Flink Python API 1.9.x to PyPI for user convenience.

2020-02-03 Thread Jeff Zhang
+1


Xingbo Huang  于2020年2月4日周二 下午1:07写道:

> Hi Jincheng,
>
> Thanks for driving this.
> +1 for this proposal.
>
> Compared to building from source, downloading directly from PyPI will
> greatly save the development cost of Python users.
>
> Best,
> Xingbo
>
>
>
> Wei Zhong  于2020年2月4日周二 下午12:43写道:
>
>> Hi Jincheng,
>>
>> Thanks for bring up this discussion!
>>
>> +1 for this proposal. Building from source takes long time and requires a
>> good network environment. Some users may not have such an environment.
>> Uploading to PyPI will greatly improve the user experience.
>>
>> Best,
>> Wei
>>
>> jincheng sun  于2020年2月4日周二 上午11:49写道:
>>
>>> Hi folks,
>>>
>>> I am very happy to receive some user inquiries about the use of Flink
>>> Python API (PyFlink) recently. One of the more common questions is
>>> whether
>>> it is possible to install PyFlink without using source code build. The
>>> most
>>> convenient and natural way for users is to use `pip install
>>> apache-flink`.
>>> We originally planned to support the use of `pip install apache-flink` in
>>> Flink 1.10, but the reason for this decision was that when Flink 1.9 was
>>> released at August 22, 2019[1], Flink's PyPI account system was not
>>> ready.
>>> At present, our PyPI account is available at October 09, 2019 [2](Only
>>> PMC
>>> can access), So for the convenience of users I propose:
>>>
>>> - Publish the latest release version (1.9.2) of Flink 1.9 to PyPI.
>>> - Update Flink 1.9 documentation to add support for `pip install`.
>>>
>>> As we all know, Flink 1.9.2 was just completed released at January 31,
>>> 2020
>>> [3]. There is still at least 1 to 2 months before the release of 1.9.3,
>>> so
>>> my proposal is completely considered from the perspective of user
>>> convenience. Although the proposed work is not large, we have not set a
>>> precedent for independent release of the Flink Python API(PyFlink) in the
>>> previous release process. I hereby initiate the current discussion and
>>> look
>>> forward to your feedback!
>>>
>>> Best,
>>> Jincheng
>>>
>>> [1]
>>>
>>> https://lists.apache.org/thread.html/4a4d23c449f26b66bc58c71cc1a5c6079c79b5049c6c6744224c5f46%40%3Cdev.flink.apache.org%3E
>>> [2]
>>>
>>> https://lists.apache.org/thread.html/8273a5e8834b788d8ae552a5e177b69e04e96c0446bb90979444deee%40%3Cprivate.flink.apache.org%3E
>>> [3]
>>>
>>> https://lists.apache.org/thread.html/ra27644a4e111476b6041e8969def4322f47d5e0aae8da3ef30cd2926%40%3Cdev.flink.apache.org%3E
>>>
>>

-- 
Best Regards

Jeff Zhang


Re: [ANNOUNCE] Dian Fu becomes a Flink committer

2020-01-16 Thread Jeff Zhang
Congrats Dian Fu !

jincheng sun  于2020年1月16日周四 下午5:58写道:

> Hi everyone,
>
> I'm very happy to announce that Dian accepted the offer of the Flink PMC
> to become a committer of the Flink project.
>
> Dian Fu has been contributing to Flink for many years. Dian Fu played an
> essential role in PyFlink/CEP/SQL/Table API modules. Dian Fu has
> contributed several major features, reported and fixed many bugs, spent a
> lot of time reviewing pull requests and also frequently helping out on the
> user mailing lists and check/vote the release.
>
> Please join in me congratulating Dian for becoming a Flink committer !
>
> Best,
> Jincheng(on behalf of the Flink PMC)
>


-- 
Best Regards

Jeff Zhang


Re: [DISCUSS] Set default planner for SQL Client to Blink planner in 1.10 release

2020-01-03 Thread Jeff Zhang
+1, I have already made blink as the default planner of flink interpreter
in Zeppelin


Jingsong Li  于2020年1月3日周五 下午4:37写道:

> Hi Jark,
>
> +1 for default blink planner in SQL-CLI.
> I believe this new planner can be put into practice in production.
> We've worked hard for nearly a year, but the old planner didn't move on.
>
> And I'd like to cc to user@flink.apache.org.
> If anyone finds that blink planner has any significant defects and has a
> larger regression than the old planner, please let us know. We will be very
> grateful.
>
> Best,
> Jingsong Lee
>
> On Fri, Jan 3, 2020 at 4:14 PM Leonard Xu  wrote:
>
>> +1 for this.
>> We bring many SQL/API features and enhance stability in 1.10 release, and
>> almost all of them happens in Blink planner.
>> SQL CLI is the most convenient entrypoint for me, I believe many users
>> will have a better experience If we set Blink planner as default planner.
>>
>> Best,
>> Leonard
>>
>> > 在 2020年1月3日,15:16,Terry Wang  写道:
>> >
>> > Since what blink planner can do is a superset of flink planner, big +1
>> for changing the default planner to Blink planner from my side.
>> >
>> > Best,
>> > Terry Wang
>> >
>> >
>> >
>> >> 2020年1月3日 15:00,Jark Wu  写道:
>> >>
>> >> Hi everyone,
>> >>
>> >> In 1.10 release, Flink SQL supports many awesome features and
>> improvements,
>> >> including:
>> >> - support watermark statement and computed column in DDL
>> >> - fully support all data types in Hive
>> >> - Batch SQL performance improvements (TPC-DS 7x than Hive MR)
>> >> - support INSERT OVERWRITE and INSERT PARTITION
>> >>
>> >> However, all the features and improvements are only avaiable in Blink
>> >> planner, not in Old planner.
>> >> There are also some other features are limited in Blink planner, e.g.
>> >> Dimension Table Join [1],
>> >> TopN [2], Deduplicate [3], streaming aggregates optimization [4], and
>> so on.
>> >>
>> >> But Old planner is still the default planner in Table API & SQL. It is
>> >> frustrating for users to set
>> >> to blink planner manually when every time start a SQL CLI. And it's
>> >> surprising to see unsupported
>> >> exception if they trying out the new features but not switch planner.
>> >>
>> >> SQL CLI is a very important entrypoint for trying out new feautures and
>> >> prototyping for users.
>> >> In order to give new planner more exposures, I would like to suggest
>> to set
>> >> default planner
>> >> for SQL Client to Blink planner before 1.10 release.
>> >>
>> >> The approach is just changing the default SQL CLI yaml
>> configuration[5]. In
>> >> this way, the existing
>> >> environment is still compatible and unaffected.
>> >>
>> >> Changing the default planner for the whole Table API & SQL is another
>> topic
>> >> and is out of scope of this discussion.
>> >>
>> >> What do you think?
>> >>
>> >> Best,
>> >> Jark
>> >>
>> >> [1]:
>> >>
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table
>> >> [2]:
>> >>
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#top-n
>> >> [3]:
>> >>
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication
>> >> [4]:
>> >>
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/tuning/streaming_aggregation_optimization.html
>> >> [5]:
>> >>
>> https://github.com/apache/flink/blob/master/flink-table/flink-sql-client/conf/sql-client-defaults.yaml#L100
>> >
>>
>>
>
> --
> Best, Jingsong Lee
>


-- 
Best Regards

Jeff Zhang


Re: [DISCUSS] have separate Flink distributions with built-in Hive dependencies

2019-12-13 Thread Jeff Zhang
+1, this is definitely necessary for better user experience. Setting up
environment is always painful for many big data tools.



Bowen Li  于2019年12月13日周五 下午5:02写道:

> cc user ML in case anyone want to chime in
>
> On Fri, Dec 13, 2019 at 00:44 Bowen Li  wrote:
>
>> Hi all,
>>
>> I want to propose to have a couple separate Flink distributions with Hive
>> dependencies on specific Hive versions (2.3.4 and 1.2.1). The distributions
>> will be provided to users on Flink download page [1].
>>
>> A few reasons to do this:
>>
>> 1) Flink-Hive integration is important to many many Flink and Hive users
>> in two dimensions:
>>  a) for Flink metadata: HiveCatalog is the only persistent catalog to
>> manage Flink tables. With Flink 1.10 supporting more DDL, the persistent
>> catalog would be playing even more critical role in users' workflow
>>  b) for Flink data: Hive data connector (source/sink) helps both
>> Flink and Hive users to unlock new use cases in streaming,
>> near-realtime/realtime data warehouse, backfill, etc.
>>
>> 2) currently users have to go thru a *really* tedious process to get
>> started, because it requires lots of extra jars (see [2]) that are absent
>> in Flink's lean distribution. We've had so many users from public mailing
>> list, private email, DingTalk groups who got frustrated on spending lots of
>> time figuring out the jars themselves. They would rather have a more "right
>> out of box" quickstart experience, and play with the catalog and
>> source/sink without hassle.
>>
>> 3) it's easier for users to replace those Hive dependencies for their own
>> Hive versions - just replace those jars with the right versions and no need
>> to find the doc.
>>
>> * Hive 2.3.4 and 1.2.1 are two versions that represent lots of user base
>> out there, and that's why we are using them as examples for dependencies in
>> [1] even though we've supported almost all Hive versions [3] now.
>>
>> I want to hear what the community think about this, and how to achieve it
>> if we believe that's the way to go.
>>
>> Cheers,
>> Bowen
>>
>> [1] https://flink.apache.org/downloads.html
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/hive/#dependencies
>> [3]
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/hive/#supported-hive-versions
>>
>

-- 
Best Regards

Jeff Zhang


Re: [ANNOUNCE] Apache Flink 1.8.3 released

2019-12-11 Thread Jeff Zhang
Great work, Hequn

Dian Fu  于2019年12月12日周四 下午2:32写道:

> Thanks Hequn for being the release manager and everyone who contributed to
> this release.
>
> Regards,
> Dian
>
> 在 2019年12月12日,下午2:24,Hequn Cheng  写道:
>
> Hi,
>
> The Apache Flink community is very happy to announce the release of Apache
> Flink 1.8.3, which is the third bugfix release for the Apache Flink 1.8
> series.
>
> Apache Flink® is an open-source stream processing framework for
> distributed, high-performing, always-available, and accurate data streaming
> applications.
>
> The release is available for download at:
> https://flink.apache.org/downloads.html
>
> Please check out the release blog post for an overview of the improvements
> for this bugfix release:
> https://flink.apache.org/news/2019/12/11/release-1.8.3.html
>
> The full release notes are available in Jira:
> https://issues.apache.org/jira/projects/FLINK/versions/12346112
>
> We would like to thank all contributors of the Apache Flink community who
> made this release possible!
> Great thanks to @Jincheng as a mentor during this release.
>
> Regards,
> Hequn
>
>
>

-- 
Best Regards

Jeff Zhang


Re: Can flink 1.9.1 use flink-shaded 2.8.3-1.8.2

2019-10-25 Thread Jeff Zhang
Thanks Chesnay, is there any document to explain which version of
flink-shaded-hadoop-jar should I use for specific version of flink ?
e.g. The document of flink 1.9 here
https://flink.apache.org/downloads.html#apache-flink-191 point me to
flink-shaded-hadoop-jar 7.0, but the latest version
of flink-shaded-hadoop-jar is 8.0, then when should I
use flink-shaded-hadoop-jar 8.0 ?

Another question is that whether flink-shaded-hadoop-2-uber 2.8.3-7.0 could
be used for hadoop 2.8.5 as well ? I believe so, but want to confirm it.
And if it works for all hadoop 2.8.x, then it may make more sense to omit
the hadoop minor version, e.g. name it as 2.8-7.0, otherwise it may make
user confused.


Chesnay Schepler  于2019年10月25日周五 下午4:21写道:

> If you need hadoop, but the approach outlined here
> <https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/projectsetup/dependencies.html>
> doesn't work for you, then you still need a flink-shaded-hadoop-jar that
> you can download here
> <https://flink.apache.org/downloads.html#apache-flink-191>.
>
> On 25/10/2019 09:54, Jeff Zhang wrote:
>
> Hi all,
>
> There's no new flink shaded release for flink 1.9, so I'd like to confirm
> with you can flink 1.9.1 use flink-shaded 2.8.3-1.8.2 ? Or the flink shaded
> is not necessary for flink 1.9 afterwards ?
>
> https://mvnrepository.com/artifact/org.apache.flink/flink-shaded-hadoop2
>
> --
> Best Regards
>
> Jeff Zhang
>
>
>

-- 
Best Regards

Jeff Zhang


Can flink 1.9.1 use flink-shaded 2.8.3-1.8.2

2019-10-25 Thread Jeff Zhang
Hi all,

There's no new flink shaded release for flink 1.9, so I'd like to confirm
with you can flink 1.9.1 use flink-shaded 2.8.3-1.8.2 ? Or the flink shaded
is not necessary for flink 1.9 afterwards ?

https://mvnrepository.com/artifact/org.apache.flink/flink-shaded-hadoop2

-- 
Best Regards

Jeff Zhang


Re: [ANNOUNCE] Zili Chen becomes a Flink committer

2019-09-11 Thread Jeff Zhang
Congratulations Zili Chen!

Wesley Peng  于2019年9月11日周三 下午5:25写道:

> Hi
>
> on 2019/9/11 17:22, Till Rohrmann wrote:
> > I'm very happy to announce that Zili Chen (some of you might also know
> > him as Tison Kun) accepted the offer of the Flink PMC to become a
> > committer of the Flink project.
>
> Congratulations Zili Chen.
>
> regards.
>


-- 
Best Regards

Jeff Zhang


Re: error: Static methods in interface require -target:jvm-1.8 using scala 2.11

2019-09-10 Thread Jeff Zhang
Add this to your scala-maven-plugin


  

  -target:jvm-1.8

  





Ben Yan  于2019年9月11日周三 下午12:07写道:

> The following is the environment I use:
> 1. flink.version: 1.9.0
> 2. java version "1.8.0_212"
> 3. scala version: 2.11.12
>
> When I wrote the following code in the scala programming language, I found
> the following error:
>
> // set up the batch execution environment
> val bbSettings = 
> EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()
> val bbTableEnv = TableEnvironment.create(bbSettings)
>
> error: Static methods in interface require -target:jvm-1.8
> [ERROR] val bbTableEnv = TableEnvironment.create(bbSettings)
>
> But when I use the java programming language or the version of scala in 2.12, 
> there is no problem.
>
> If I use the version of scala2.11, is there any way to solve this problem? 
> thanks
>
>
> Best,
>
> Ben
>
>

-- 
Best Regards

Jeff Zhang


Re: [ANNOUNCE] Kostas Kloudas joins the Flink PMC

2019-09-06 Thread Jeff Zhang
Congrats Klou!

Zili Chen  于2019年9月6日周五 下午9:51写道:

> Congrats Klou!
>
> Best,
> tison.
>
>
> Till Rohrmann  于2019年9月6日周五 下午9:23写道:
>
>> Congrats Klou!
>>
>> Cheers,
>> Till
>>
>> On Fri, Sep 6, 2019 at 3:00 PM Dian Fu  wrote:
>>
>>> Congratulations Kostas!
>>>
>>> Regards,
>>> Dian
>>>
>>> > 在 2019年9月6日,下午8:58,Wesley Peng  写道:
>>> >
>>> > On 2019/9/6 8:55 下午, Fabian Hueske wrote:
>>> >> I'm very happy to announce that Kostas Kloudas is joining the Flink
>>> PMC.
>>> >> Kostas is contributing to Flink for many years and puts lots of
>>> effort in helping our users and growing the Flink community.
>>> >> Please join me in congratulating Kostas!
>>> >
>>> > congratulation Kostas!
>>> >
>>> > regards.
>>>
>>>

-- 
Best Regards

Jeff Zhang


Re: How to handle Flink Job with 400MB+ Uberjar with 800+ containers ?

2019-08-30 Thread Jeff Zhang
:205)
>>> at 
>>> org.apache.flink.yarn.YarnResourceManager.lambda$onContainersAllocated$1(YarnResourceManager.java:400)
>>> at 
>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
>>> at 
>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
>>> at 
>>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
>>> at 
>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
>>> at 
>>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
>>> at 
>>> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>>> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>>> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>>> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>> at 
>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>> at 
>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>> at 
>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>
>>>
>>>
>>>

-- 
Best Regards

Jeff Zhang


Re: [ANNOUNCE] Hequn becomes a Flink committer

2019-08-07 Thread Jeff Zhang
Congrats Hequn!

Paul Lam  于2019年8月7日周三 下午5:08写道:

> Congrats Hequn! Well deserved!
>
> Best,
> Paul Lam
>
> 在 2019年8月7日,16:28,jincheng sun  写道:
>
> Hi everyone,
>
> I'm very happy to announce that Hequn accepted the offer of the Flink PMC
> to become a committer of the Flink project.
>
> Hequn has been contributing to Flink for many years, mainly working on
> SQL/Table API features. He's also frequently helping out on the user
> mailing lists and helping check/vote the release.
>
> Congratulations Hequn!
>
> Best, Jincheng
> (on behalf of the Flink PMC)
>
>
>

-- 
Best Regards

Jeff Zhang


Re: Error while running flink job on local environment

2019-07-30 Thread Jeff Zhang
@Andrey,

Although your approach will work, it requires the user to write different
code for local mode and other modes. This is inconvenient for users.
IMHO, we should not check these kinds of memory configuration in local
mode. Or implicitly set the memory of TM pretty large in local mode to
avoid this kind of problem.

Andrey Zagrebin  于2019年7月31日周三 上午1:32写道:

> Hi Vinayak,
>
> the error message provides a hint about changing config options, you could
> try to use StreamExecutionEnvironment.createLocalEnvironment(2,
> customConfig); to increase resources.
> this issue might also address the problem, it will be part of 1.9 release:
> https://issues.apache.org/jira/browse/FLINK-12852
>
> Best,
> Andrey
>
> On Tue, Jul 30, 2019 at 5:42 PM Vinayak Magadum 
> wrote:
>
>> Hi,
>>
>> I am using Flink version: 1.7.1
>>
>> I have a flink job that gets the execution environment as below and
>> executes the job.
>>
>> StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>>
>> When I run the code in cluster, it runs fine. But on local machine while
>> running the job via IntelliJ I get below error:
>>
>> org.apache.flink.runtime.client.JobExecutionException: Job execution
>> failed.
>> at
>> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
>> at
>> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:647)
>> at
>> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
>> 
>> Caused by: java.io.IOException: Insufficient number of network buffers:
>> required 8, but only 3 available. The total number of network buffers is
>> currently set to 12851 of 32768 bytes each. You can increase this number by
>> setting the configuration keys 'taskmanager.network.memory.fraction',
>> 'taskmanager.network.memory.min', and 'taskmanager.network.memory.max'.
>> at
>> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:272)
>> at
>> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:257)
>> at
>> org.apache.flink.runtime.io.network.NetworkEnvironment.setupInputGate(NetworkEnvironment.java:278)
>> at
>> org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:224)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:608)
>> at java.lang.Thread.run(Thread.java:748)
>>
>> Workaround that I tried to make it run on local is to use
>> StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.createLocalEnvironment(2);
>>
>> instead of StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>>
>> With Flink 1.4.2, StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment(); used to work on both
>> cluster as well as local environment.
>>
>> Is there any way to make
>> StreamExecutionEnvironment.getExecutionEnvironment(); work in both cluster
>> and local mode in flink 1.7.1? Specifically how to make it work locally via
>> IntelliJ.
>> 
>> Thanks & Regards,
>> Vinayak
>>
>

-- 
Best Regards

Jeff Zhang


Re: Re: Flink and Akka version incompatibility ..

2019-07-24 Thread Jeff Zhang
I think it is better to shade all the dependencies of flink so that all the
projects that use flink won't hit this kind of issue.


Haibo Sun  于2019年7月24日周三 下午4:07写道:

> Hi,   Debasish Ghosh
>
> I don't know why not shade Akka, maybe it can be shaded. Chesnay may be
> able to answer that.
> I recommend to shade Akka dependency of your application because it don't
> be known what's wrong with shading Flink's Akka.
>
> CC  @Chesnay Schepler
>
> Best,
> Haibo
>
> At 2019-07-24 15:48:59, "Debasish Ghosh"  wrote:
>
> The problem that I am facing is with Akka serialization .. Why not shade
> the whole of Akka ?
>
> java.lang.AbstractMethodError:
>> akka.remote.RemoteActorRefProvider.serializationInformation()Lakka/serialization/Serialization$Information;
>> at
>> akka.serialization.Serialization.serializationInformation(Serialization.scala:166)
>
>
> Akka 2.6 is just around the corner and I don't think Flink will upgrade to
> Akka 2.6 that soon .. so somehow this problem is bound to recur ..
>
> regards.
>
> On Wed, Jul 24, 2019 at 1:01 PM Zili Chen  wrote:
>
>> I can see that we relocate akka's netty, akka uncommon math but also
>> be curious why Flink doesn't shaded all of akka dependencies...
>>
>> Best,
>> tison.
>>
>>
>> Debasish Ghosh  于2019年7月24日周三 下午3:15写道:
>>
>>> Hello Haibo -
>>>
>>> Yes, my application depends on Akka 2.5.
>>> Just curious, why do you think it's recommended to shade Akka version of
>>> my application instead of Flink ?
>>>
>>> regards.
>>>
>>> On Wed, Jul 24, 2019 at 12:42 PM Haibo Sun  wrote:
>>>
>>>> Hi  Debasish Ghosh,
>>>>
>>>> Does your application have to depend on Akka 2.5? If not, it's a good
>>>> idea to always keep the Akka version that the application depend on in line
>>>> with Flink.
>>>> If you want to try shading Akka dependency, I think that it is more
>>>> recommended to shade Akka dependency of your application.
>>>>
>>>> Best,
>>>> Haibo
>>>>
>>>> At 2019-07-24 14:31:29, "Debasish Ghosh" 
>>>> wrote:
>>>>
>>>> Hello -
>>>>
>>>> An application that uses Akka 2.5 and Flink 1.8.0 gives runtime errors
>>>> because of version mismatch between Akka that we use and the one that Flink
>>>> uses (which is Akka 2.4). Anyone tried shading Akka dependency with Flink ?
>>>>
>>>> Or is there any other alternative way to handle this issue ? I know
>>>> Flink 1.9 has upgraded to Akka 2.5 but this is (I think) going to be a
>>>> recurring problem down the line with mismatch between the new releases of
>>>> Akka and Flink.
>>>>
>>>> regards.
>>>>
>>>> --
>>>> Debasish Ghosh
>>>> http://manning.com/ghosh2
>>>> http://manning.com/ghosh
>>>>
>>>> Twttr: @debasishg
>>>> Blog: http://debasishg.blogspot.com
>>>> Code: http://github.com/debasishg
>>>>
>>>>
>>>
>>> --
>>> Debasish Ghosh
>>> http://manning.com/ghosh2
>>> http://manning.com/ghosh
>>>
>>> Twttr: @debasishg
>>> Blog: http://debasishg.blogspot.com
>>> Code: http://github.com/debasishg
>>>
>>
>
> --
> Debasish Ghosh
> http://manning.com/ghosh2
> http://manning.com/ghosh
>
> Twttr: @debasishg
> Blog: http://debasishg.blogspot.com
> Code: http://github.com/debasishg
>
>

-- 
Best Regards

Jeff Zhang


Re: [SURVEY] How many people implement Flink job based on the interface Program?

2019-07-23 Thread Jeff Zhang
IIUC the list of jobs contained in jar means the jobs you defined in the
pipeline. Then I don't think it is flink's responsibility to maintain the
job list info, it is the job scheduler that define the pipeline. So the job
scheduler should maintain the job list.



Flavio Pompermaier  于2019年7月23日周二 下午5:23写道:

> The jobs are somehow related to each other in the sense that we have a
> configurable pipeline where there are optional steps you can enable/disable
> (and thus we create a single big jar).
> Because of this, we have our application REST service that actually works
> also as a job scheduler and use the job server as a proxy towards Flink:
> when one steps ends (this is what is signalled back after the env.execute()
> from Flink to the application REST service) our application tells the job
> server to execute the next job of the pipeline on the cluster.
> Of course this is a "dirty" solution (because we should user a workflow
> scheduler like Airflow or Luigi or similar) but we wanted to keep things as
> simplest as possible for the moment.
> In the future, if our customers would ever improve this part, we will
> integrate our application with a dedicated job scheduler like the one
> listed before (probably)..I don't know if some of them are nowadays already
> integrated with Flink..when we started coding our frontend application (2
> ears ago) none of them were using it.
>
> Best,
> Flavio
>
> On Tue, Jul 23, 2019 at 10:40 AM Jeff Zhang  wrote:
>
>> Thanks Flavio,
>>
>> I get most of your points except one
>>
>>- Get the list of jobs contained in jar (ideally this is is true for
>>every engine beyond Spark or Flink)
>>
>> Just curious to know how you submit job via rest api, if there're
>> multiple jobs in one jar, then do you need to submit jar one time and
>> submit jobs multiple times ?
>> And is there any relationship between these jobs in the same jar ?
>>
>>
>>
>> Flavio Pompermaier  于2019年7月23日周二 下午4:01写道:
>>
>>> Hi Jeff, the thing about the manifest is really about to have a way to
>>> list multiple main classes in the jart (without the need to inspect every
>>> Java class or forcing a 1-to-1 between jar and job like it is now).
>>> My requirements were driven by the UI we're using in our framework:
>>>
>>>- Get the list of jobs contained in jar (ideally this is is true for
>>>every engine beyond Spark or Flink)
>>>- Get the list of required/optional parameters for each job
>>>- Besides the optionality of a parameter, each parameter should
>>>include an help description, a type (to validate the input param), a
>>>default value and a set of choices (when there's a limited number of
>>>options available)
>>>- obviously the job serve should be able to
>>>submit/run/cancel/monitor a job and upload/delete the uploaded jars
>>>- the job server should not depend on any target platform dependency
>>>(Spark or Flink) beyond the rest client: at the moment the rest client
>>>requires a lot of core libs (indeed because it needs to submit the job
>>>graph/plan)
>>>- in our vision, the flink client should be something like Apache
>>>    Livy (https://livy.apache.org/)
>>>- One of the biggest  limitations we face when running a Flink job
>>>from the REST API is the fact that the job can't do anything after
>>>env.execute() while we need to call an external service to signal that 
>>> the
>>>job has ended + some other details
>>>
>>> Best,
>>> Flavio
>>>
>>> On Tue, Jul 23, 2019 at 3:44 AM Jeff Zhang  wrote:
>>>
>>>> Hi Flavio,
>>>>
>>>> Based on the discussion in the tickets you mentioned above, the
>>>> program-class attribute was a mistake and community is intended to use
>>>> main-class to replace it.
>>>>
>>>> Deprecating Program interface is a part of work of flink new client
>>>> api.
>>>> IIUC, your requirements are not so complicated. We can implement that
>>>> in the new flink client api. How about listing your requirement, and let's
>>>> discuss how we can make it in the new flink client api. BTW, I guess most
>>>> of your requirements are based on your flink job server, It would be
>>>> helpful if you could provide more info about your flink job server. Thanks
>>>>
>>>>
>>>>
>>>> Flavio Pompermaier  于2019年7月22日周一 下午8:59写道:
>>

Re: [SURVEY] How many people implement Flink job based on the interface Program?

2019-07-23 Thread Jeff Zhang
Thanks Flavio,

I get most of your points except one

   - Get the list of jobs contained in jar (ideally this is is true for
   every engine beyond Spark or Flink)

Just curious to know how you submit job via rest api, if there're multiple
jobs in one jar, then do you need to submit jar one time and submit jobs
multiple times ?
And is there any relationship between these jobs in the same jar ?



Flavio Pompermaier  于2019年7月23日周二 下午4:01写道:

> Hi Jeff, the thing about the manifest is really about to have a way to
> list multiple main classes in the jart (without the need to inspect every
> Java class or forcing a 1-to-1 between jar and job like it is now).
> My requirements were driven by the UI we're using in our framework:
>
>- Get the list of jobs contained in jar (ideally this is is true for
>every engine beyond Spark or Flink)
>- Get the list of required/optional parameters for each job
>- Besides the optionality of a parameter, each parameter should
>include an help description, a type (to validate the input param), a
>default value and a set of choices (when there's a limited number of
>options available)
>- obviously the job serve should be able to submit/run/cancel/monitor
>a job and upload/delete the uploaded jars
>- the job server should not depend on any target platform dependency
>(Spark or Flink) beyond the rest client: at the moment the rest client
>requires a lot of core libs (indeed because it needs to submit the job
>graph/plan)
>- in our vision, the flink client should be something like Apache Livy
>(https://livy.apache.org/)
>- One of the biggest  limitations we face when running a Flink job
>from the REST API is the fact that the job can't do anything after
>env.execute() while we need to call an external service to signal that the
>    job has ended + some other details
>
> Best,
> Flavio
>
> On Tue, Jul 23, 2019 at 3:44 AM Jeff Zhang  wrote:
>
>> Hi Flavio,
>>
>> Based on the discussion in the tickets you mentioned above, the
>> program-class attribute was a mistake and community is intended to use
>> main-class to replace it.
>>
>> Deprecating Program interface is a part of work of flink new client api.
>> IIUC, your requirements are not so complicated. We can implement that in
>> the new flink client api. How about listing your requirement, and let's
>> discuss how we can make it in the new flink client api. BTW, I guess most
>> of your requirements are based on your flink job server, It would be
>> helpful if you could provide more info about your flink job server. Thanks
>>
>>
>>
>> Flavio Pompermaier  于2019年7月22日周一 下午8:59写道:
>>
>>> Hi Tison,
>>> we use a modified version of the Program interface to enable a web UI do
>>> properly detect and run Flink jobs contained in a jar + their parameters.
>>> As stated in [1], we dected multiple Main classes per jar by handling an
>>> extra comma-separeted Manifest entry (i.e. 'Main-classes').
>>>
>>> As mentioned on the discussion on the dev ML, our revised Program
>>> interface looks like this:
>>>
>>> public interface FlinkJob {
>>>   String getDescription();
>>>   List getParameters();
>>>   boolean isStreamingOrBatch();
>>> }
>>>
>>> public class FlinkJobParameter {
>>>   private String paramName;
>>>   private String paramType = "string";
>>>   private String paramDesc;
>>>   private String paramDefaultValue;
>>>   private Set choices;
>>>   private boolean mandatory;
>>> }
>>>
>>> I've also opened some JIRA issues related to this topic:
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-10864
>>> [2] https://issues.apache.org/jira/browse/FLINK-10862
>>> [3] https://issues.apache.org/jira/browse/FLINK-10879.
>>>
>>> Best,
>>> Flavio
>>>
>>>
>>> On Mon, Jul 22, 2019 at 1:46 PM Zili Chen  wrote:
>>>
>>>> Hi guys,
>>>>
>>>> We want to have an accurate idea of how many people are implementing
>>>> Flink job based on the interface Program, and how they actually
>>>> implement it.
>>>>
>>>> The reason I ask for the survey is from this thread[1] where we notice
>>>> this codepath is stale and less useful than it should be. As it is an
>>>> interface marked as @PublicEvolving it is originally aimed at serving
>>>> as user interface. Thus before doing deprecation or dropping, we'd like
>>>> to see if there are users implementing their job based on this
>>>> interface(org.apache.flink.api.common.Program) and if there is any,
>>>> we are curious about how it is used.
>>>>
>>>> If little or none of Flink user based on this interface, we would
>>>> propose deprecating or dropping it.
>>>>
>>>> I really appreciate your time and your insight.
>>>>
>>>> Best,
>>>> tison.
>>>>
>>>> [1]
>>>> https://lists.apache.org/thread.html/7ffc9936a384b891dbcf0a481d26c6d13b2125607c200577780d1e18@%3Cdev.flink.apache.org%3E
>>>>
>>>
>>>
>>>
>>
>> --
>> Best Regards
>>
>> Jeff Zhang
>>
>
>
>

-- 
Best Regards

Jeff Zhang


Re: [SURVEY] How many people implement Flink job based on the interface Program?

2019-07-22 Thread Jeff Zhang
Hi Flavio,

Based on the discussion in the tickets you mentioned above, the
program-class attribute was a mistake and community is intended to use
main-class to replace it.

Deprecating Program interface is a part of work of flink new client api.
IIUC, your requirements are not so complicated. We can implement that in
the new flink client api. How about listing your requirement, and let's
discuss how we can make it in the new flink client api. BTW, I guess most
of your requirements are based on your flink job server, It would be
helpful if you could provide more info about your flink job server. Thanks



Flavio Pompermaier  于2019年7月22日周一 下午8:59写道:

> Hi Tison,
> we use a modified version of the Program interface to enable a web UI do
> properly detect and run Flink jobs contained in a jar + their parameters.
> As stated in [1], we dected multiple Main classes per jar by handling an
> extra comma-separeted Manifest entry (i.e. 'Main-classes').
>
> As mentioned on the discussion on the dev ML, our revised Program
> interface looks like this:
>
> public interface FlinkJob {
>   String getDescription();
>   List getParameters();
>   boolean isStreamingOrBatch();
> }
>
> public class FlinkJobParameter {
>   private String paramName;
>   private String paramType = "string";
>   private String paramDesc;
>   private String paramDefaultValue;
>   private Set choices;
>   private boolean mandatory;
> }
>
> I've also opened some JIRA issues related to this topic:
>
> [1] https://issues.apache.org/jira/browse/FLINK-10864
> [2] https://issues.apache.org/jira/browse/FLINK-10862
> [3] https://issues.apache.org/jira/browse/FLINK-10879.
>
> Best,
> Flavio
>
>
> On Mon, Jul 22, 2019 at 1:46 PM Zili Chen  wrote:
>
>> Hi guys,
>>
>> We want to have an accurate idea of how many people are implementing
>> Flink job based on the interface Program, and how they actually
>> implement it.
>>
>> The reason I ask for the survey is from this thread[1] where we notice
>> this codepath is stale and less useful than it should be. As it is an
>> interface marked as @PublicEvolving it is originally aimed at serving
>> as user interface. Thus before doing deprecation or dropping, we'd like
>> to see if there are users implementing their job based on this
>> interface(org.apache.flink.api.common.Program) and if there is any,
>> we are curious about how it is used.
>>
>> If little or none of Flink user based on this interface, we would
>> propose deprecating or dropping it.
>>
>> I really appreciate your time and your insight.
>>
>> Best,
>> tison.
>>
>> [1]
>> https://lists.apache.org/thread.html/7ffc9936a384b891dbcf0a481d26c6d13b2125607c200577780d1e18@%3Cdev.flink.apache.org%3E
>>
>
>
>

-- 
Best Regards

Jeff Zhang


Re: Jython support for Flink

2019-07-19 Thread Jeff Zhang
Hi Dante,

Flink 1.9 support python api, which may be what you want. See
https://ci.apache.org/projects/flink/flink-docs-master/tutorials/python_table_api.html


Dante Van den Broeke  于2019年7月19日周五 下午10:40写道:

> Dear,
>
>
> I'm a student currently working on a project involving apache kafka and
> flink. The project itself is revolved around path prediction and machine
> learning for websites. To test a prove of concept I setup a kafka server
> locally (goal is to expend this to a google cloud server or similar later)
> and a kafka producer (written in java intelliJ idea project). The producer
> would send JSON data (currently just a local file but later json data from
> the website itself) to a flink-kafka connection and the data transformation
> (key-windowed by user id) would than happen in the flink framework.
>
>
> The problem i'm facing however is that i wrote all the algorithms for
> transformation of the data in python and i'm struggling with initializing a
> jython environment to setup the flink-kafka connection.
>
> I was wondering whether or not there is a working example for this setup /
> some documentation regarding the framework as i'm struggling to find a lot
> of documentation for my application online.
>
>
> thanks in advance.
>
>
> kind regards,
>
> Dante Van den Broeke
>
>

-- 
Best Regards

Jeff Zhang


Re: user test can't run flink-1.8.1 wordcount

2019-07-12 Thread Jeff Zhang
You need to provide more details( like how it doesn't work), otherwise it
is difficult for people to help you.



&#38;#38;#38;#38;#38;#10084; <799326...@qq.com> 于2019年7月12日周五 下午3:52写道:

> Dear all,
> I got some issues about flink that Flink-1.8.1’s HA cluster to execute
> test cases on the cli client to start the users of outside the cluster. For
> instance, the command “flink run WordCounter.jar”  it’s doesn’s work. So,
> could you give me some successful examples, please.
>
>
> Thanks!
>


-- 
Best Regards

Jeff Zhang


Re: [ANNOUNCE] Rong Rong becomes a Flink committer

2019-07-11 Thread Jeff Zhang
Congrats, Rong!


vino yang  于2019年7月12日周五 上午10:08写道:

> congratulations Rong Rong!
>
> Fabian Hueske  于2019年7月11日周四 下午10:25写道:
>
>> Hi everyone,
>>
>> I'm very happy to announce that Rong Rong accepted the offer of the Flink
>> PMC to become a committer of the Flink project.
>>
>> Rong has been contributing to Flink for many years, mainly working on SQL
>> and Yarn security features. He's also frequently helping out on the
>> user@f.a.o mailing lists.
>>
>> Congratulations Rong!
>>
>> Best, Fabian
>> (on behalf of the Flink PMC)
>>
>

-- 
Best Regards

Jeff Zhang


Re: Apache Flink - Running application with different flink configurations (flink-conf.yml) on EMR

2019-06-28 Thread Jeff Zhang
This is due to flink doesn't unify the execution in different enviroments.
The community has discuss it before about how to enhance the flink client
api. The initial proposal is to introduce FlinkConf which contains all the
configuration so that we can unify the executions in all environments (IDE,
CLI, SQL Client, Scala Shell, downstream project)

Here's the sample code:

val conf = new FlinkConf().setProperty(“key_1”, “value_1”) // create
FlinkConf

val env = new ExecutionEnvironment(conf)   // create ExecutionEnvironment

val jobId = env.submit(...)   // non-blocking job submission
(detached mode)

val jobStatus = env.getClusterClient().queryJobStatus(jobId)   // approach
1: query job status via ClusterClient

val jobStatus = env.queryJobStatus(jobId)   // approach 2: query job status
via ExecutionEnvironment.


And you can refer this for more details:

https://docs.google.com/document/d/1VavBrYn8vJeZs-Mhu5VzKO6xrWCF40aY0nlQ_UVVTRg/edit?usp=sharing




Xintong Song  于2019年6月28日周五 下午10:28写道:

> Hi, Singh,
>
> I don't think that should work. The -D or -yD parameters needs to be
> passed to the Flink start-up scripts or the "flink run" command. I don't
> think the IntelliJ VM arguments are equivalent to that. In fact, I'm not
> aware of any method to set "-D" parameters when running jobs IDE.
>
> Thank you~
>
> Xintong Song
>
>
>
> On Fri, Jun 28, 2019 at 8:45 PM M Singh  wrote:
>
>> Hi Xintong:
>>
>> I passed the -Dparallelism.default=2 in the  run configuration VM
>> arguments for IntelliJ.
>>
>> So what I am looking for is a way to overwrite the config parameters
>> which are defined in the flink-config.yaml file (parallelism.default is
>> just an example) which would be picked up regardless of the env (eg:
>> locally, on yarn or IDE).  When I run the application in IDE (locally) with
>> the above mentioned VM parameter, the StreamExecutionEnvironment.config
>> does not show this value and the Flink UI shows configuration parameter
>> parallelism as 8.  Is there any other place where I can see the parameter
>> settings ?
>>
>> Thanks.
>>
>> On Thursday, June 27, 2019, 11:58:28 PM EDT, Xintong Song <
>> tonysong...@gmail.com> wrote:
>>
>>
>> Could you provide some more details on how you run your job with -D
>> options in IDE?
>>
>>
>> Thank you~
>>
>> Xintong Song
>>
>>
>>
>> On Fri, Jun 28, 2019 at 5:03 AM M Singh  wrote:
>>
>> Hi Xintong:  Thanks for your pointers.
>>
>> I tried -Dparallelism.default=2 locally (in IDE) and it did not work.  Do
>> you know if there is a common way that would work both for emr, locally and
>> ide ?
>>
>> Thanks again.
>>
>> On Thursday, June 27, 2019, 12:03:06 AM EDT, Xintong Song <
>> tonysong...@gmail.com> wrote:
>>
>>
>> Hi Singh,
>>
>> You can use the environment variable "FLINK_CONF_DIR" to specify path to
>> the directory of config files. You can also override config options with
>> command line arguments prefixed -D (for yarn-session.sh) or -yD (for 'flink
>> run' command).
>>
>> Thank you~
>>
>> Xintong Song
>>
>>
>>
>> On Wed, Jun 26, 2019 at 9:13 PM M Singh  wrote:
>>
>> Hi:
>>
>> I have a single EMR cluster with Flink and want to run multiple
>> applications on it with different flink configurations.  Is there a way to
>>
>> 1. Pass the config file name for each application, or
>> 2. Overwrite the config parameters via command line arguments for the
>> application.  This is similar to how we can overwrite the default
>> parameters in spark
>>
>> I searched the documents and have tried using ParameterTool with the
>> config parameter names, but it has not worked as yet.
>>
>> Thanks for your help.
>>
>> Mans
>>
>>

-- 
Best Regards

Jeff Zhang


Re: Building some specific modules in flink

2019-06-24 Thread Jeff Zhang
You need to specify flink-dist in -pl. Module flink-dist will build the
flink binary distribution.

syed  于2019年6月25日周二 上午9:14写道:

> Hi;
> I am trying to modify some core functionalities of flink for my through
> understanding about flink.  I already build the flink from source, now I am
> looking to build only a few modules which I have modified. Is this
> possible,
> or every time I have to build the flink in full (all modules). As it takes
> me about 30-35 minutes to build the flink in full.
>
> Specifically, I have modified some classes in *flink-streaming-java* and
> *flink-runtime* modules. I am looking to build only these two modules and
> integrate these into already build flink (all modules). I alrady tried
> using
> –pl option using mvn, it installs these modules but changes are not updated
> in already build binaries.
> Please guide me how can I do this.
> Kind regards;
> syed
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


-- 
Best Regards

Jeff Zhang


Re: [DISCUSS] Deprecate previous Python APIs

2019-06-11 Thread Jeff Zhang
+1

Stephan Ewen  于2019年6月11日周二 下午9:30写道:

> Hi all!
>
> I would suggest to deprecating the existing python APIs for DataSet and
> DataStream API with the 1.9 release.
>
> Background is that there is a new Python API under development.
> The new Python API is initially against the Table API. Flink 1.9 will
> support Table API programs without UDFs, 1.10 is planned to support UDFs.
> Future versions would support also the DataStream API.
>
> In the long term, Flink should have one Python API for DataStream and
> Table APIs. We should not maintain multiple different implementations and
> confuse users that way.
> Given that the existing Python APIs are a bit limited and not under active
> development, I would suggest to deprecate them in favor of the new API.
>
> Best,
> Stephan
>
>

-- 
Best Regards

Jeff Zhang


Re: What does flink session mean ?

2019-06-04 Thread Jeff Zhang
Thanks for the reply, @Till Rohrmann .  Regarding
reuse computed results. I think JM keep all the metadata of intermediate
data, and interactive programming is also trying to reuse computed results.
It looks like it may not be necessary to introduce the session concept as
long as we can achieve reusing computed results. Let me if I understand it
correctly.



Till Rohrmann  于2019年6月4日周二 下午4:03写道:

> Hi Jeff,
>
> the session functionality which you find in Flink's client are the
> remnants of an uncompleted feature which was abandoned. The idea was that
> one could submit multiple parts of a job to the same cluster where these
> parts are added to the same ExecutionGraph. That way we wanted to allow to
> reuse computed results when using a notebook for ad-hoc queries, for
> example. But as I said, this feature has never been completed.
>
> Cheers,
> Till
>
> On Sun, Jun 2, 2019 at 3:20 PM Jeff Zhang  wrote:
>
>>
>> Hi Folks,
>>
>>
>> When I read the flink client api code, the concept of session is a little
>> vague and unclear to me. It looks like the session concept is only applied
>> in batch mode (I only see it in ExecutionEnvironment but not in
>> StreamExecutionEnvironment). But for local mode
>> (LocalExecutionEnvironment), starting one new session is starting one new
>> MiniCluster, but in remote mode (RemoteExecutionEnvironment), starting one
>> new session is just starting one new ClusterClient instead of one new
>> cluster. So I am confused what does flink session really mean. Could anyone
>> help me understand this ? Thanks.
>>
>>
>>
>>
>> --
>> Best Regards
>>
>> Jeff Zhang
>>
>

-- 
Best Regards

Jeff Zhang


What does flink session mean ?

2019-06-02 Thread Jeff Zhang
Hi Folks,


When I read the flink client api code, the concept of session is a little
vague and unclear to me. It looks like the session concept is only applied
in batch mode (I only see it in ExecutionEnvironment but not in
StreamExecutionEnvironment). But for local mode
(LocalExecutionEnvironment), starting one new session is starting one new
MiniCluster, but in remote mode (RemoteExecutionEnvironment), starting one
new session is just starting one new ClusterClient instead of one new
cluster. So I am confused what does flink session really mean. Could anyone
help me understand this ? Thanks.




-- 
Best Regards

Jeff Zhang


Re: Ask about running Flink sql-client.sh

2019-05-01 Thread Jeff Zhang
Try ./sql-client.sh embedded



Rad Rad  于2019年5月1日周三 下午8:28写道:

>
> Hi
> I would ask about the command for running sql-client.sh
>
> These  commands don't work
> ./sql-client.sh OR ./flink sql-client
>
> Regards.
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


-- 
Best Regards

Jeff Zhang


Re: Zeppelin

2019-04-25 Thread Jeff Zhang
Thanks Dawid,

Hi Sergey,

I am working on update the flink interpreter of zeppelin to support flink
1.9 (supposed to be released this summer).
For the current flink interpreter of zeppelin 0.9, I haven't verified it
against flink 1.8. could you show the full interpreter log ? And what is
the size your input file ?



Dawid Wysakowicz  于2019年4月25日周四 下午6:31写道:

> Hi Sergey,
>
> I am not very familiar with Zepellin. But I know Jeff (cc'ed) is working
> on integrating Flink with some notebooks. He might be able to help you.
>
> Best,
>
> Dawid
> On 25/04/2019 08:42, Smirnov Sergey Vladimirovich (39833) wrote:
>
> Hello,
>
>
>
> Trying to link Zeppelin 0.9 with Flink 1.8. It`s a small dev cluster
> deployed in standalone manner.
>
> Got the same error as described here
> https://stackoverflow.com/questions/54257671/runnning-a-job-in-apache-flink-standalone-mode-on-zeppelin-i-have-this-error-to
>
> Would appreciate for any support for helping to resolve that problem.
>
>
>
> Regards,
>
> Sergey
>
>
>
>

-- 
Best Regards

Jeff Zhang


Re: [Discuss] Add JobListener (hook) in flink job lifecycle

2019-04-25 Thread Jeff Zhang
Hi  Beckett,

Thanks for your feedback, See my comments inline

>>>  How do user specify the listener? *
What I proposal is to register JobListener in ExecutionEnvironment. I don't
think we should make ClusterClient as public api.

>>> Where should the listener run? *
I don't think it is proper to run listener in JobMaster. The listener is
user code, and usually it is depends on user's other component. So running
it in client side make more sense to me.

>>> What should be reported to the Listener? *
I am open to add other api in this JobListener. But for now, I am afraid
the ExecutionEnvironment is not aware of failover, so it is not possible to
report failover event.

>>> What can the listeners do on notifications? *
Do you mean to pass JobGraph to these methods ? like following ( I am
afraid JobGraph is not a public and stable api, we should not expose it to
users)

public interface JobListener {

void onJobSubmitted(JobGraph graph, JobID jobId);

void onJobExecuted(JobGraph graph, JobExecutionResult jobResult);

void onJobCanceled(JobGraph graph, JobID jobId, String savepointPath);
}


Becket Qin  于2019年4月25日周四 下午7:40写道:

> Thanks for the proposal, Jeff. Adding a listener to allow users handle
> events during the job lifecycle makes a lot of sense to me.
>
> Here are my two cents.
>
> * How do user specify the listener? *
> It is not quite clear to me whether we consider ClusterClient as a public
> interface? From what I understand ClusterClient is not a public interface
> right now. In contrast, ExecutionEnvironment is the de facto interface for
> administrative work. After job submission, it is essentially bound to a job
> as an administrative handle. Given this current state, personally I feel
> acceptable to have the listener registered to the ExecutionEnvironment.
>
> * Where should the listener run? *
> If the listener runs on the client side, the client have to be always
> connected to the Flink cluster. This does not quite work if the Job is a
> streaming job. Should we provide the option to run the listener in
> JobMaster as well?
>
> * What should be reported to the Listener? *
> Besides the proposed APIs, does it make sense to also report events such
> as failover?
>
> * What can the listeners do on notifications? *
> If the listeners are expected to do anything on the job, should some
> helper class to manipulate the jobs be passed to the listener method?
> Otherwise users may not be able to easily take action.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
>
>
>
> On Wed, Apr 24, 2019 at 2:43 PM Jeff Zhang  wrote:
>
>> Hi Till,
>>
>> IMHO, allow adding hooks involves 2 steps.
>> 1. Provide hook interface, and call these hook in flink (ClusterClient)
>> at the right place. This should be done by framework (flink)
>> 2. Implement new hook implementation and add/register them into
>> framework(flink)
>>
>> What I am doing is step 1 which should be done by flink, step 2 is done
>> by users. But IIUC, your suggestion of using custom ClusterClient seems
>> mixing these 2 steps together. Say I'd like to add new hooks, I have to
>> implement a new custom ClusterClient, add new hooks and call them in the
>> custom ClusterClient at the right place.
>> This doesn't make sense to me. For a user who want to add hooks, he is
>> not supposed to understand the mechanism of ClusterClient, and should not
>> touch ClusterClient. What do you think ?
>>
>>
>>
>>
>> Till Rohrmann  于2019年4月23日周二 下午4:24写道:
>>
>>> I think we should not expose the ClusterClient configuration via the
>>> ExecutionEnvironment (env.getClusterClient().addJobListener) because this
>>> is effectively the same as exposing the JobListener interface directly on
>>> the ExecutionEnvironment. Instead I think it could be possible to provide a
>>> ClusterClient factory which is picked up from the Configuration or some
>>> other mechanism for example. That way it would not need to be exposed via
>>> the ExecutionEnvironment at all.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Fri, Apr 19, 2019 at 11:12 AM Jeff Zhang  wrote:
>>>
>>>> >>>  The ExecutionEnvironment is usually used by the user who writes
>>>> the code and this person (I assume) would not be really interested in these
>>>> callbacks.
>>>>
>>>> Usually ExecutionEnvironment is used by the user who write the code,
>>>> but it doesn't needs to be created and configured by this person. e.g. in
>>>> Zeppelin notebook, ExecutionEnvironment is created by Zeppelin, user just
>>>>

Re: [Discuss] Add JobListener (hook) in flink job lifecycle

2019-04-23 Thread Jeff Zhang
Hi Till,

IMHO, allow adding hooks involves 2 steps.
1. Provide hook interface, and call these hook in flink (ClusterClient) at
the right place. This should be done by framework (flink)
2. Implement new hook implementation and add/register them into
framework(flink)

What I am doing is step 1 which should be done by flink, step 2 is done by
users. But IIUC, your suggestion of using custom ClusterClient seems mixing
these 2 steps together. Say I'd like to add new hooks, I have to implement
a new custom ClusterClient, add new hooks and call them in the custom
ClusterClient at the right place.
This doesn't make sense to me. For a user who want to add hooks, he is not
supposed to understand the mechanism of ClusterClient, and should not touch
ClusterClient. What do you think ?




Till Rohrmann  于2019年4月23日周二 下午4:24写道:

> I think we should not expose the ClusterClient configuration via the
> ExecutionEnvironment (env.getClusterClient().addJobListener) because this
> is effectively the same as exposing the JobListener interface directly on
> the ExecutionEnvironment. Instead I think it could be possible to provide a
> ClusterClient factory which is picked up from the Configuration or some
> other mechanism for example. That way it would not need to be exposed via
> the ExecutionEnvironment at all.
>
> Cheers,
> Till
>
> On Fri, Apr 19, 2019 at 11:12 AM Jeff Zhang  wrote:
>
>> >>>  The ExecutionEnvironment is usually used by the user who writes the
>> code and this person (I assume) would not be really interested in these
>> callbacks.
>>
>> Usually ExecutionEnvironment is used by the user who write the code, but
>> it doesn't needs to be created and configured by this person. e.g. in
>> Zeppelin notebook, ExecutionEnvironment is created by Zeppelin, user just
>> use ExecutionEnvironment to write flink program.  You are right that the
>> end user would not be interested in these callback, but the third party
>> library that integrate with zeppelin would be interested in these callbacks.
>>
>> >>> In your case, it could be sufficient to offer some hooks for the
>> ClusterClient or being able to provide a custom ClusterClient.
>>
>> Actually in my initial PR (https://github.com/apache/flink/pull/8190), I
>> do pass JobListener to ClusterClient and invoke it there.
>> But IMHO, ClusterClient is not supposed be a public api for users.
>> Instead JobClient is the public api that user should use to control job. So
>> adding hooks to ClusterClient directly and provide a custom ClusterClient
>> doesn't make sense to me. IIUC, you are suggesting the following approach
>>  env.getClusterClient().addJobListener(jobListener)
>> but I don't see its benefit compared to this.
>>  env.addJobListener(jobListener)
>>
>> Overall, I think adding hooks is orthogonal with fine grained job
>> control. And I agree that we should refactor the flink client component,
>> but I don't think it would affect the JobListener interface. What do you
>> think ?
>>
>>
>>
>>
>> Till Rohrmann  于2019年4月18日周四 下午8:57写道:
>>
>>> Thanks for starting this discussion Jeff. I can see the need for
>>> additional hooks for third party integrations.
>>>
>>> The thing I'm wondering is whether we really need/want to expose a
>>> JobListener via the ExecutionEnvironment. The ExecutionEnvironment is
>>> usually used by the user who writes the code and this person (I assume)
>>> would not be really interested in these callbacks. If he would, then one
>>> should rather think about a better programmatic job control where the
>>> `ExecutionEnvironment#execute` call returns a `JobClient` instance.
>>> Moreover, we would effectively make this part of the public API and every
>>> implementation would need to offer it.
>>>
>>> In your case, it could be sufficient to offer some hooks for the
>>> ClusterClient or being able to provide a custom ClusterClient. The
>>> ClusterClient is the component responsible for the job submission and
>>> retrieval of the job result and, hence, would be able to signal when a job
>>> has been submitted or completed.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Thu, Apr 18, 2019 at 8:57 AM vino yang  wrote:
>>>
>>>> Hi Jeff,
>>>>
>>>> I personally like this proposal. From the perspective of
>>>> programmability, the JobListener can make the third program more
>>>> appreciable.
>>>>
>>>> The scene where I need the listener is the Flink cube engine for Apache
>>

Re: [Discuss] Add JobListener (hook) in flink job lifecycle

2019-04-19 Thread Jeff Zhang
>>>  The ExecutionEnvironment is usually used by the user who writes the
code and this person (I assume) would not be really interested in these
callbacks.

Usually ExecutionEnvironment is used by the user who write the code, but it
doesn't needs to be created and configured by this person. e.g. in Zeppelin
notebook, ExecutionEnvironment is created by Zeppelin, user just use
ExecutionEnvironment to write flink program.  You are right that the end
user would not be interested in these callback, but the third party library
that integrate with zeppelin would be interested in these callbacks.

>>> In your case, it could be sufficient to offer some hooks for the
ClusterClient or being able to provide a custom ClusterClient.

Actually in my initial PR (https://github.com/apache/flink/pull/8190), I do
pass JobListener to ClusterClient and invoke it there.
But IMHO, ClusterClient is not supposed be a public api for users. Instead
JobClient is the public api that user should use to control job. So adding
hooks to ClusterClient directly and provide a custom ClusterClient doesn't
make sense to me. IIUC, you are suggesting the following approach
 env.getClusterClient().addJobListener(jobListener)
but I don't see its benefit compared to this.
 env.addJobListener(jobListener)

Overall, I think adding hooks is orthogonal with fine grained job control.
And I agree that we should refactor the flink client component, but I don't
think it would affect the JobListener interface. What do you think ?




Till Rohrmann  于2019年4月18日周四 下午8:57写道:

> Thanks for starting this discussion Jeff. I can see the need for
> additional hooks for third party integrations.
>
> The thing I'm wondering is whether we really need/want to expose a
> JobListener via the ExecutionEnvironment. The ExecutionEnvironment is
> usually used by the user who writes the code and this person (I assume)
> would not be really interested in these callbacks. If he would, then one
> should rather think about a better programmatic job control where the
> `ExecutionEnvironment#execute` call returns a `JobClient` instance.
> Moreover, we would effectively make this part of the public API and every
> implementation would need to offer it.
>
> In your case, it could be sufficient to offer some hooks for the
> ClusterClient or being able to provide a custom ClusterClient. The
> ClusterClient is the component responsible for the job submission and
> retrieval of the job result and, hence, would be able to signal when a job
> has been submitted or completed.
>
> Cheers,
> Till
>
> On Thu, Apr 18, 2019 at 8:57 AM vino yang  wrote:
>
>> Hi Jeff,
>>
>> I personally like this proposal. From the perspective of programmability,
>> the JobListener can make the third program more appreciable.
>>
>> The scene where I need the listener is the Flink cube engine for Apache
>> Kylin. In the case, the Flink job program is embedded into the Kylin's
>> executable context.
>>
>> If we could have this listener, it would be easier to integrate with
>> Kylin.
>>
>> Best,
>> Vino
>>
>> Jeff Zhang  于2019年4月18日周四 下午1:30写道:
>>
>>>
>>> Hi All,
>>>
>>> I created FLINK-12214
>>> <https://issues.apache.org/jira/browse/FLINK-12214> for adding
>>> JobListener (hook) in flink job lifecycle. Since this is a new public api
>>> for flink, so I'd like to discuss it more widely in community to get more
>>> feedback.
>>>
>>> The background and motivation is that I am integrating flink into apache
>>> zeppelin <http://zeppelin.apache.org/>(which is a notebook in case you
>>> don't know). And I'd like to capture some job context (like jobId) in the
>>> lifecycle of flink job (submission, executed, cancelled) so that I can
>>> manipulate job in more fined grained control (e.g. I can capture the jobId
>>> when job is submitted, and then associate it with one paragraph, and when
>>> user click the cancel button, I can call the flink cancel api to cancel
>>> this job)
>>>
>>> I believe other projects which integrate flink would need similar
>>> mechanism. I plan to add api addJobListener in
>>> ExecutionEnvironment/StreamExecutionEnvironment so that user can add
>>> customized hook in flink job lifecycle.
>>>
>>> Here's draft interface JobListener.
>>>
>>> public interface JobListener {
>>>
>>> void onJobSubmitted(JobID jobId);
>>>
>>> void onJobExecuted(JobExecutionResult jobResult);
>>>
>>> void onJobCanceled(JobID jobId, String savepointPath);
>>> }
>>>
>>> Let me know your comment and concern, thanks.
>>>
>>>
>>> --
>>> Best Regards
>>>
>>> Jeff Zhang
>>>
>>

-- 
Best Regards

Jeff Zhang


[Discuss] Add JobListener (hook) in flink job lifecycle

2019-04-17 Thread Jeff Zhang
Hi All,

I created FLINK-12214 <https://issues.apache.org/jira/browse/FLINK-12214> for
adding JobListener (hook) in flink job lifecycle. Since this is a new
public api for flink, so I'd like to discuss it more widely in community to
get more feedback.

The background and motivation is that I am integrating flink into apache
zeppelin <http://zeppelin.apache.org/>(which is a notebook in case you
don't know). And I'd like to capture some job context (like jobId) in the
lifecycle of flink job (submission, executed, cancelled) so that I can
manipulate job in more fined grained control (e.g. I can capture the jobId
when job is submitted, and then associate it with one paragraph, and when
user click the cancel button, I can call the flink cancel api to cancel
this job)

I believe other projects which integrate flink would need similar
mechanism. I plan to add api addJobListener in
ExecutionEnvironment/StreamExecutionEnvironment so that user can add
customized hook in flink job lifecycle.

Here's draft interface JobListener.

public interface JobListener {

void onJobSubmitted(JobID jobId);

void onJobExecuted(JobExecutionResult jobResult);

void onJobCanceled(JobID jobId, String savepointPath);
}

Let me know your comment and concern, thanks.


-- 
Best Regards

Jeff Zhang


Re: [DISCUSS] Adding a mid-term roadmap to the Flink website

2019-02-14 Thread Jeff Zhang
cific
>>>> timeline. Instead, we share our vision for the future and major initiatives
>>>> that are receiving attention and give users and contributors an
>>>> understanding what they can look forward to.
>>>>
>>>> *Future Role of Table API and DataStream API*
>>>>   - Table API becomes first class citizen
>>>>   - Table API becomes primary API for analytics use cases
>>>>   * Declarative, automatic optimizations
>>>>   * No manual control over state and timers
>>>>   - DataStream API becomes primary API for applications and data
>>>> pipeline use cases
>>>>   * Physical, user controls data types, no magic or optimizer
>>>>   * Explicit control over state and time
>>>>
>>>> *Batch Streaming Unification*
>>>>   - Table API unification (environments) (FLIP-32)
>>>>   - New unified source interface (FLIP-27)
>>>>   - Runtime operator unification & code reuse between DataStream / Table
>>>>   - Extending Table API to make it convenient API for all analytical
>>>> use cases (easier mix in of UDFs)
>>>>   - Same join operators on bounded/unbounded Table API and DataStream
>>>> API
>>>>
>>>> *Faster Batch (Bounded Streams)*
>>>>   - Much of this comes via Blink contribution/merging
>>>>   - Fine-grained Fault Tolerance on bounded data (Table API)
>>>>   - Batch Scheduling on bounded data (Table API)
>>>>   - External Shuffle Services Support on bounded streams
>>>>   - Caching of intermediate results on bounded data (Table API)
>>>>   - Extending DataStream API to explicitly model bounded streams (API
>>>> breaking)
>>>>   - Add fine fault tolerance, scheduling, caching also to DataStream API
>>>>
>>>> *Streaming State Evolution*
>>>>   - Let all built-in serializers support stable evolution
>>>>   - First class support for other evolvable formats (Protobuf, Thrift)
>>>>   - Savepoint input/output format to modify / adjust savepoints
>>>>
>>>> *Simpler Event Time Handling*
>>>>   - Event Time Alignment in Sources
>>>>   - Simpler out-of-the box support in sources
>>>>
>>>> *Checkpointing*
>>>>   - Consistency of Side Effects: suspend / end with savepoint (FLIP-34)
>>>>   - Failed checkpoints explicitly aborted on TaskManagers (not only on
>>>> coordinator)
>>>>
>>>> *Automatic scaling (adjusting parallelism)*
>>>>   - Reactive scaling
>>>>   - Active scaling policies
>>>>
>>>> *Kubernetes Integration*
>>>>   - Active Kubernetes Integration (Flink actively manages containers)
>>>>
>>>> *SQL Ecosystem*
>>>>   - Extended Metadata Stores / Catalog / Schema Registries support
>>>>   - DDL support
>>>>   - Integration with Hive Ecosystem
>>>>
>>>> *Simpler Handling of Dependencies*
>>>>   - Scala in the APIs, but not in the core (hide in separate class
>>>> loader)
>>>>   - Hadoop-free by default
>>>>
>>>>

-- 
Best Regards

Jeff Zhang


Re: [ANNOUNCE] New Flink PMC member Thomas Weise

2019-02-12 Thread Jeff Zhang
Congrats Thomas !

Fabian Hueske  于2019年2月12日周二 下午5:59写道:

> Hi everyone,
>
> On behalf of the Flink PMC I am happy to announce Thomas Weise as a new
> member of the Apache Flink PMC.
>
> Thomas is a long time contributor and member of our community.
> He is starting and participating in lots of discussions on our mailing
> lists, working on topics that are of joint interest of Flink and Beam, and
> giving talks on Flink at many events.
>
> Please join me in welcoming and congratulating Thomas!
>
> Best,
> Fabian
>


-- 
Best Regards

Jeff Zhang


Re: Flink 1.7 Notebook Environment

2019-02-10 Thread Jeff Zhang
Hi Faizan,

I have implemented one flink interpreter for blink which is donated by
alibaba to flink community recently. Maybe you notice this news recently.

Here's some tutorials which you may be interested.

https://flink-china.org/doc/blink/ops/zeppelin.html
https://flink-china.org/doc/blink/quickstart/zeppelin_quickstart.html

And here's the code base: https://github.com/zjffdu/zeppelin/tree/blink_poc


Faizan Ahmed  于2019年2月11日周一 上午11:44写道:

> Hi all,
> I have been searching around quite a bit and doing my own experiments to
> make the latest Flink release 1.7.1 to work with Apache Zeppelin however
> Apache Zeppelin's Flink interpreter is quite outdated (Flink 1.3). AFAIK
> its not possible to use Flink running on YARN via Zeppelin as it only works
> with a local cluster.
>
> Has anyone been able to run Flink's latest release on Zeppelin? If yes
> then please share some instructions/tutorial. If no then is there any other
> suitable notebook environment for running Flink (maybe Jupyter)? I want to
> prototype my ideas in Flink and since I'm coming from Spark background it
> would be really useful to have notebook environment for vaildation of flink
> apps.
>
> Looking forward to your response
>
> Thanks
>


-- 
Best Regards

Jeff Zhang


Re: Is there a way to get all flink build-in SQL functions

2019-01-25 Thread Jeff Zhang
I believe it make sense to list available udf programmatically. e.g. Users
may want to see available udfs in sql-client. It would also benefit other
downstream project that use flink sql. Besides that I think flink should
also provide api for querying the description of udf about how to use it.

yinhua.dai  于2019年1月25日周五 下午5:12写道:

> Thanks Guys.
> I just wondering if there is another way except hard code the list:)
> Thanks anyway.
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


-- 
Best Regards

Jeff Zhang


Re: [DISCUSS] Towards a leaner flink-dist

2019-01-21 Thread Jeff Zhang
Thanks Chesnay for raising this discussion thread.  I think there are 3
major use scenarios for flink binary distribution.

1. Use it to set up standalone cluster
2. Use it to experience features of flink, such as via scala-shell,
sql-client
3. Downstream project use it to integrate with their system

I did a size estimation of flink dist folder, lib folder take around 100M
and opt folder take around 200M. Overall I agree to make a thin flink dist.
So the next problem is which components to drop. I check the opt folder,
and I think the filesystem components and metrics components could be moved
out. Because they are pluggable components and is only used in scenario 1 I
think (setting up standalone cluster). Other components like flink-table,
flink-ml, flnk-gellay, we should still keep them IMHO, because new user may
still use it to try the features of flink. For me, scala-shell is the first
option to try new features of flink.



Fabian Hueske  于2019年1月18日周五 下午7:34写道:

> Hi Chesnay,
>
> Thank you for the proposal.
> I think this is a good idea.
> We follow a similar approach already for Hadoop dependencies and
> connectors (although in application space).
>
> +1
>
> Fabian
>
> Am Fr., 18. Jan. 2019 um 10:59 Uhr schrieb Chesnay Schepler <
> ches...@apache.org>:
>
>> Hello,
>>
>> the binary distribution that we release by now contains quite a lot of
>> optional components, including various filesystems, metric reporters and
>> libraries. Most users will only use a fraction of these, and as such
>> pretty much only increase the size of flink-dist.
>>
>> With Flink growing more and more in scope I don't believe it to be
>> feasible to ship everything we have with every distribution, and instead
>> suggest more of a "pick-what-you-need" model, where flink-dist is rather
>> lean and additional components are downloaded separately and added by
>> the user.
>>
>> This would primarily affect the /opt directory, but could also be
>> extended to cover flink-dist. For example, the yarn and mesos code could
>> be spliced out into separate jars that could be added to lib manually.
>>
>> Let me know what you think.
>>
>> Regards,
>>
>> Chesnay
>>
>>

-- 
Best Regards

Jeff Zhang


Re: Query on retract stream

2019-01-21 Thread Jeff Zhang
I am thinking of another approach instead of retract stream. Is it possible
to define a custom window to do this ? This window is defined for each
order. And then you just need to analyze the events in this window.

Piotr Nowojski  于2019年1月21日周一 下午8:44写道:

> Hi,
>
> There is a missing feature in Flink Table API/SQL of supporting retraction
> streams as the input (or conversions from append stream to retraction
> stream) at the moment. With that your problem would simplify to one simple
> `SELECT uid, count(*) FROM Changelog GROUP BY uid`. There is an ongoing
> work with related work [1], so this might be supported in the next couple
> of months.
>
> There might a workaround at the moment that could work. I think you would
> need to write your own custom `LAST_ROW(x)` aggregation function, which
> would just return the value of the most recent aggregated row. With that
> you could write a query like this:
>
> SELECT
> uid, count(*)
> FROM (
> SELECT
> *
> FROM (
> SELECT
> uid, LAST_ROW(status)
> FROM
> changelog
> GROUP BY
> uid, oid)
> WHERE status = `pending`)
> GROUP BY
> uid
>
> Where `changelog` is an append only stream with the following content:
>
> *user, order, status, event_time*
> u1, o1, pending, t1
> u2, o2, failed, t2
> *u1, o3, pending, t3*
> *u1, o3, success, t4*
> u2, o4, pending, t5
> u2, o4, pending, t6
>
>
>
> Besides that, you could also write your own a relatively simple Data
> Stream application to do the same thing.
>
> I’m CC’ing Timo, maybe he will have another better idea.
>
> Piotrek
>
> [1] https://issues.apache.org/jira/browse/FLINK-8577
>
> On 18 Jan 2019, at 18:30, Gagan Agrawal  wrote:
>
> Hi,
> I have a requirement and need to understand if same can be achieved with
> Flink retract stream. Let's say we have stream with 4 attributes userId,
> orderId, status, event_time where orderId is unique and hence any change in
> same orderId updates previous value as below
>
> *Changelog* *Event Stream*
>
> *user, order, status, event_time*
> u1, o1, pending, t1
> u2, o2, failed, t2
> *u1, o3, pending, t3*
> *u1, o3, success, t4*
> u2, o4, pending, t5
> u2, o4, pending, t6
>
> *Snapshot view at time t6 (as viewed in mysql)*
> u1, o1, pending, t1
> u2, o2, failed, t2
> u1, o3, success, t4
> u4, o4, pending, t6
> (Here rows at time t3 and t5 are deleted as they have been updated for
> respective order ids)
>
> What I need is to maintain count of "Pending" orders against a user and if
> they go beyond configured threshold, then push that user and pending count
> to Kafka. Here there can be multiple updates to order status e.g Pending ->
> Success or Pending -> Failed. Also in some cases there may not be any
> change in status but we may still get a row (may be due to some other
> attribute update which we are not concerned about). So is it possible to
> have running count in flink as below at respective event times. Here
> Pending count is decreased from 2 to 1 for user u1 at t4 since one of it's
> order status was changed from Pending to Success. Similarly for user u2, at
> time t6, there was no change in running count as there was no change in
> status for order o4
>
> t1 -> u1 : 1, u2 : 0
> t2 -> u1 : 1, u2 : 0
> t3 -> u1 : 2, u2 : 0
> *t4 -> u1 : 1, u2 : 0 (since o3 moved pending to success, so count is
> decreased for u1)*
> t5 -> u1 : 1, u2 : 1
> *t6 -> u1 : 1, u2 : 1 (no increase in count of u2 as o4 update has no
> change)*
>
> As I understand may be retract stream can achieve this. However I am not
> sure how. Any samples around this would be of great help.
>
> Gagan
>
>
>

-- 
Best Regards

Jeff Zhang


Re: [ANNOUNCE] Apache Flink 1.5.6 released

2018-12-26 Thread Jeff Zhang
Thanks Thomas. It's nice to have a more stable flink 1.5.x

vino yang  于2018年12月27日周四 上午9:43写道:

> Thomas, thanks for being a release manager.
> And Thanks for the whole community.
> I think the release of Flink 1.5.6 makes sense for many users who are
> currently unable to upgrade major versions.
>
> Best,
> Vino
>
> jincheng sun  于2018年12月27日周四 上午8:00写道:
>
>> Thanks a lot for being our release manager Thomas.
>> Thanks a lot for made this release possible!
>>
>> Cheers,
>> Jincheng
>>
>> Thomas Weise  于2018年12月27日周四 上午4:03写道:
>>
>>> The Apache Flink community is very happy to announce the release of
>>> Apache
>>> Flink 1.5.6, which is the final bugfix release for the Apache Flink 1.5
>>> series.
>>>
>>> Apache Flink® is an open-source stream processing framework for
>>> distributed, high-performing, always-available, and accurate data
>>> streaming
>>> applications.
>>>
>>> The release is available for download at:
>>> https://flink.apache.org/downloads.html
>>>
>>> Please check out the release blog post for an overview of the
>>> improvements
>>> for this bugfix release:
>>> https://flink.apache.org/news/2018/12/22/release-1.5.6.html
>>>
>>> The full release notes are available in Jira:
>>>
>>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12344315
>>>
>>> We would like to thank all contributors of the Apache Flink community who
>>> made this release possible!
>>>
>>> Regards,
>>> Thomas
>>>
>>

-- 
Best Regards

Jeff Zhang


Re: Flink with Docker: docker-compose and FLINK_JOB_ARGUMENT exception

2018-12-07 Thread Jeff Zhang
I didn't use the built-in docker of flink, but the following flink docker
works for me pretty well.

https://github.com/big-data-europe/docker-flink



Piotr Nowojski  于2018年12月7日周五 下午6:20写道:

> Hi,
>
> I have never used flink and docker together, so I’m not sure if I will be
> able to help, however have you seen this README:
> https://github.com/apache/flink/tree/master/flink-container/docker
> ?
> Shouldn’t you be passing your arguments via `FLINK_JOB_ARGUMENTS`
> environment variable?
>
> Piotrek
>
> On 7 Dec 2018, at 10:55, Marke Builder  wrote:
>
> Hi,
>
> I'm trying to run flink with docker (docker-compose) and job arguments
> "config-dev.properties". But it seams that the job arguments are not
> available:
>
> docker-compose.yml
>
> version: '2'
> services:
>   job-cluster:
> image: ${FLINK_DOCKER_IMAGE_NAME:-timeseries-v1}
> ports:
>   - '8081:8081'
> command: job-cluster --job-classname
> -Djobmanager.rpc.address=job-cluster -Dparallelism.default=1 --config
> config-dev.properties
>
>   taskmanager:
> image: ${FLINK_DOCKER_IMAGE_NAME:-timeseries-v1}
> command: task-manager -Djobmanager.rpc.address=job-cluster
> scale: 1
>
>
> Excpetion:
> 4:32 AMorg.apache.flink.runtime.entrypoint.FlinkParseException: Failed to
> parse the command line arguments.
> 12/7/2018 10:44:32 AM  at
> org.apache.flink.runtime.entrypoint.parser.CommandLineParser.parse(CommandLineParser.java:52)
> 12/7/2018 10:44:32 AM  at
> org.apache.flink.container.entrypoint.StandaloneJobClusterEntryPoint.main(StandaloneJobClusterEntryPoint.java:143)
> 12/7/2018 10:44:32 AMCaused by:
> org.apache.commons.cli.MissingArgumentException: Missing argument for
> option: j
> 12/7/2018 10:44:32 AM  at
> org.apache.commons.cli.DefaultParser.checkRequiredArgs(DefaultParser.java:211)
> 12/7/2018 10:44:32 AM  at
> org.apache.commons.cli.DefaultParser.handleOption(DefaultParser.java:599)
> 12/7/2018 10:44:32 AM  at
> org.apache.commons.cli.DefaultParser.handleShortAndLongOption(DefaultParser.java:548)
> 12/7/2018 10:44:32 AM  at
> org.apache.commons.cli.DefaultParser.handleToken(DefaultParser.java:243)
> 12/7/2018 10:44:32 AM  at
> org.apache.commons.cli.DefaultParser.parse(DefaultParser.java:120)
> 12/7/2018 10:44:32 AM  at
> org.apache.commons.cli.DefaultParser.parse(DefaultParser.java:81)
> 12/7/2018 10:44:32 AM  at
> org.apache.flink.runtime.entrypoint.parser.CommandLineParser.parse(CommandLineParser.java:50)
> 12/7/2018 10:44:32 AM  ... 1 more
> 12/7/2018 10:44:32 AMException in thread "main"
> java.lang.NoSuchMethodError:
> org.apache.flink.runtime.entrypoint.parser.CommandLineParser.printHelp()V
> 12/7/2018 10:44:32 AM  at
> org.apache.flink.container.entrypoint.StandaloneJobClusterEntryPoint.main(StandaloneJobClusterEntryPoint.java:146)
>
>
>

-- 
Best Regards

Jeff Zhang


Re: Assigning a port range to rest.port

2018-12-05 Thread Jeff Zhang
This requirement makes sense to me. Another issue I hit due to single value
of rest port is that user can not start 2 local MiniCluster, I try to start
2 flink scala-shell in local mode, but fails due to port conflict.



Gyula Fóra  于2018年12月5日周三 下午8:04写道:

> Hi!
> Is there any way currently to set a port range for the rest client?
> rest.port only takes a single number and it is anyways overwritten to 0.
>
> This seems to be necessary when running the flink client from behind a
> firewall where only a predefined port-range is accessible from the outside.
>
> I would assume this is a common setup in prod environments. This hasn't
> been a problem with the legacy execution mode.
>
> Any thoughts?
> Gyula
>


-- 
Best Regards

Jeff Zhang


Re: Are flink connectors included in the binary release ?

2018-11-14 Thread Jeff Zhang
Thanks Chesnay, but if user want to use connectors in scala shell, they
have to download it.

On Wed, Nov 14, 2018 at 5:22 PM Chesnay Schepler  wrote:

> Connectors are never contained in binary releases as they are supposed t
> be packaged into the user-jar.
>
> On 14.11.2018 10:12, Jeff Zhang wrote:
>
>
> I don't see the jars of flink connectors in the binary release of flink
> 1.6.1, so just want to confirm whether flink binary release include these
> connectors. Thanks
>
> --
> Best Regards
>
> Jeff Zhang
>
>
>

-- 
Best Regards

Jeff Zhang


Are flink connectors included in the binary release ?

2018-11-14 Thread Jeff Zhang
I don't see the jars of flink connectors in the binary release of flink
1.6.1, so just want to confirm whether flink binary release include these
connectors. Thanks

-- 
Best Regards

Jeff Zhang


Re: Confused window operation

2018-11-13 Thread Jeff Zhang
Thanks hequn & acqua.csq 

On Wed, Nov 14, 2018 at 2:17 PM Hequn Cheng  wrote:

> Hi Jeff,
>
> The window is not a global window. It is related to a specified key. You
> would have 6 windows after flatMap() and keyBy().
> key: hello with 3 windows
> key: world with 1 window
> key: flink with 1 window
> key: hadoop with 1 window
>
> Best, Hequn
>
>
> On Wed, Nov 14, 2018 at 10:31 AM Jeff Zhang  wrote:
>
>> Hi all,
>>
>> I am a little confused with the following windows operation. Here's the
>> code,
>>
>> val senv = StreamExecutionEnvironment.getExecutionEnvironment
>> senv.setParallelism(1)
>> val data = senv.fromElements("hello world", "hello flink", "hello hadoop")
>>
>> data.flatMap(line => line.split("\\s"))
>>   .map(w => (w, 1))
>>   .keyBy(0)
>>   .countWindow(2, 1)
>>   .sum(1)
>>   .print("**")
>>
>> senv.execute()
>>
>>
>> And this is the output:
>>
>> **> (hello,1)
>> **> (world,1)
>> **> (hello,2)
>> **> (flink,1)
>> **> (hello,2)
>> **> (hadoop,1)
>>
>>
>> As my understanding, here we have 3 windows.
>>
>> window 1
>>
>> (hello, world)
>>
>> window 2
>>
>> (hello, world)
>>
>> (hello, flink)
>>
>> window 3
>>
>> (hello flink)
>>
>> (hello hadoop)
>>
>> So for the first window, we have output (hello, 1) (world, 1)
>>
>> for the second window we should output (hello, 2), (world,1 ), (flink, 1)
>>
>> for the third window we should have output (hello, 2), (flink, 1), (hadoop, 
>> 1)
>>
>>
>> But as you can see, in the above I get different result, do I misunderstand 
>> the window ? Could anyone help me to understand that ? Thanks
>>
>>

-- 
Best Regards

Jeff Zhang


Re: Field could not be resolved by the field mapping when using kafka connector

2018-11-13 Thread Jeff Zhang
Thanks hequn, it is very helpful

On Wed, Nov 14, 2018 at 2:32 PM Hequn Cheng  wrote:

> Hi jeff,
>
> We need a different field name for the rowtime indicator, something looks
> like:
>
>>   new Schema()
>> .field("status", Types.STRING)
>> .field("direction", Types.STRING)
>> .field("rowtime", Types.SQL_TIMESTAMP).rowtime(
>> new
>> Rowtime().timestampsFromField("event_ts").watermarksPeriodicAscending())
>
>
> Furthermore, we should define another sink schema which contains no
> rowtime definitions, since currently time attributes and custom field
> mappings are not supported yet for sink.
>
>> val sinkSchema =
>>   new Schema()
>> .field("status", Types.STRING)
>> .field("direction", Types.STRING)
>> .field("rowtime", Types.SQL_TIMESTAMP)
>
>
> Btw, a unified api for source and sink is under discussion now. More
> details here[1]
>
> Best, Hequn
>
> [1]
> https://docs.google.com/document/d/1Yaxp1UJUFW-peGLt8EIidwKIZEWrrA-pznWLuvaH39Y/edit?ts=5bb62df4#heading=h.41fd6rs7b3cf
>
>
> On Wed, Nov 14, 2018 at 9:18 AM Jeff Zhang  wrote:
>
>>
>> Hi,
>>
>> I hit the following error when I try to use kafka connector in flink
>> table api. There's very little document about how to use kafka connector in
>> flink table api, could anyone help me on that ? Thanks
>>
>> Exception in thread "main"
>> org.apache.flink.table.api.ValidationException: Field 'event_ts' could not
>> be resolved by the field mapping.
>> at
>> org.apache.flink.table.sources.TableSourceUtil$.org$apache$flink$table$sources$TableSourceUtil$$resolveInputField(TableSourceUtil.scala:491)
>> at
>> org.apache.flink.table.sources.TableSourceUtil$$anonfun$org$apache$flink$table$sources$TableSourceUtil$$resolveInputFields$1.apply(TableSourceUtil.scala:521)
>> at
>> org.apache.flink.table.sources.TableSourceUtil$$anonfun$org$apache$flink$table$sources$TableSourceUtil$$resolveInputFields$1.apply(TableSourceUtil.scala:521)
>> at
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>> at
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>> at
>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>> at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
>> at
>> org.apache.flink.table.sources.TableSourceUtil$.org$apache$flink$table$sources$TableSourceUtil$$resolveInputFields(TableSourceUtil.scala:521)
>> at
>> org.apache.flink.table.sources.TableSourceUtil$.validateTableSource(TableSourceUtil.scala:127)
>> at
>> org.apache.flink.table.plan.schema.StreamTableSourceTable.(StreamTableSourceTable.scala:33)
>> at
>> org.apache.flink.table.api.StreamTableEnvironment.registerTableSourceInternal(StreamTableEnvironment.scala:150)
>> at
>> org.apache.flink.table.api.TableEnvironment.registerTableSource(TableEnvironment.scala:541)
>> at
>> org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.scala:47)
>> at
>> org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSourceAndSink(ConnectTableDescriptor.scala:68)
>>
>> And here's the source code:
>>
>>
>>
>>  case class Record(status: String, direction: String, var event_ts: 
>> Timestamp)
>>
>>
>>   def main(args: Array[String]): Unit = {
>> val senv = StreamExecutionEnvironment.getExecutionEnvironment
>> senv.setParallelism(1)
>> senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>>
>> val data: DataStream[Record] = ...
>> val tEnv = TableEnvironment.getTableEnvironment(senv)
>> tEnv
>>   // declare the external system to connect to
>>   .connect(
>>   new Kafka()
>> .version("0.11")
>> .topic("processed5.events")
>> .startFromEarliest()
>> .property("zookeeper.connect", "localhost:2181")
>> .property("bootstrap.servers", "localhost:9092"))
>>   .withFormat(new Json()
>>     .failOnMissingField(false)
>> .deriveSchema()
>>   )
>>   .withSchema(
>> new Schema()
>>   .field("status", Types.STRING)
>>   .field("direction", Types.STRING)
>>   .field("event_ts", Types.SQL_TIMESTAMP).rowtime(
>>   new 
>> Rowtime().timestampsFromField("event_ts").watermarksPeriodicAscending())
>>   )
>>
>>   // specify the update-mode for streaming tables
>>   .inAppendMode()
>>
>>   // register as source, sink, or both and under a name
>>   .registerTableSourceAndSink("MyUserTable");
>>
>> tEnv.fromDataStream(data).insertInto("MyUserTable")
>>
>> 0封新邮件
>> 回复
>>
>>

-- 
Best Regards

Jeff Zhang


Confused window operation

2018-11-13 Thread Jeff Zhang
Hi all,

I am a little confused with the following windows operation. Here's the
code,

val senv = StreamExecutionEnvironment.getExecutionEnvironment
senv.setParallelism(1)
val data = senv.fromElements("hello world", "hello flink", "hello hadoop")

data.flatMap(line => line.split("\\s"))
  .map(w => (w, 1))
  .keyBy(0)
  .countWindow(2, 1)
  .sum(1)
  .print("**")

senv.execute()


And this is the output:

**> (hello,1)
**> (world,1)
**> (hello,2)
**> (flink,1)
**> (hello,2)
**> (hadoop,1)


As my understanding, here we have 3 windows.

window 1

(hello, world)

window 2

(hello, world)

(hello, flink)

window 3

(hello flink)

(hello hadoop)

So for the first window, we have output (hello, 1) (world, 1)

for the second window we should output (hello, 2), (world,1 ), (flink, 1)

for the third window we should have output (hello, 2), (flink, 1), (hadoop, 1)


But as you can see, in the above I get different result, do I
misunderstand the window ? Could anyone help me to understand that ?
Thanks


Field could not be resolved by the field mapping when using kafka connector

2018-11-13 Thread Jeff Zhang
Hi,

I hit the following error when I try to use kafka connector in flink table
api. There's very little document about how to use kafka connector in flink
table api, could anyone help me on that ? Thanks

Exception in thread "main" org.apache.flink.table.api.ValidationException:
Field 'event_ts' could not be resolved by the field mapping.
at
org.apache.flink.table.sources.TableSourceUtil$.org$apache$flink$table$sources$TableSourceUtil$$resolveInputField(TableSourceUtil.scala:491)
at
org.apache.flink.table.sources.TableSourceUtil$$anonfun$org$apache$flink$table$sources$TableSourceUtil$$resolveInputFields$1.apply(TableSourceUtil.scala:521)
at
org.apache.flink.table.sources.TableSourceUtil$$anonfun$org$apache$flink$table$sources$TableSourceUtil$$resolveInputFields$1.apply(TableSourceUtil.scala:521)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
at
org.apache.flink.table.sources.TableSourceUtil$.org$apache$flink$table$sources$TableSourceUtil$$resolveInputFields(TableSourceUtil.scala:521)
at
org.apache.flink.table.sources.TableSourceUtil$.validateTableSource(TableSourceUtil.scala:127)
at
org.apache.flink.table.plan.schema.StreamTableSourceTable.(StreamTableSourceTable.scala:33)
at
org.apache.flink.table.api.StreamTableEnvironment.registerTableSourceInternal(StreamTableEnvironment.scala:150)
at
org.apache.flink.table.api.TableEnvironment.registerTableSource(TableEnvironment.scala:541)
at
org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.scala:47)
at
org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSourceAndSink(ConnectTableDescriptor.scala:68)

And here's the source code:



 case class Record(status: String, direction: String, var event_ts: Timestamp)


  def main(args: Array[String]): Unit = {
val senv = StreamExecutionEnvironment.getExecutionEnvironment
senv.setParallelism(1)
senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

val data: DataStream[Record] = ...
val tEnv = TableEnvironment.getTableEnvironment(senv)
tEnv
  // declare the external system to connect to
  .connect(
  new Kafka()
.version("0.11")
.topic("processed5.events")
.startFromEarliest()
.property("zookeeper.connect", "localhost:2181")
.property("bootstrap.servers", "localhost:9092"))
  .withFormat(new Json()
.failOnMissingField(false)
.deriveSchema()
  )
  .withSchema(
new Schema()
  .field("status", Types.STRING)
  .field("direction", Types.STRING)
  .field("event_ts", Types.SQL_TIMESTAMP).rowtime(
  new 
Rowtime().timestampsFromField("event_ts").watermarksPeriodicAscending())
  )

  // specify the update-mode for streaming tables
  .inAppendMode()

  // register as source, sink, or both and under a name
  .registerTableSourceAndSink("MyUserTable");

tEnv.fromDataStream(data).insertInto("MyUserTable")

0封新邮件
回复


Field could not be resolved by the field mapping when using kafka connector

2018-11-13 Thread Jeff Zhang
Hi,

I hit the following error when I try to use kafka connector in flink table
api. There's very little document about how to use kafka connector in flink
table api, could anyone help me on that ? Thanks

Exception in thread "main" org.apache.flink.table.api.ValidationException:
Field 'event_ts' could not be resolved by the field mapping.
at
org.apache.flink.table.sources.TableSourceUtil$.org$apache$flink$table$sources$TableSourceUtil$$resolveInputField(TableSourceUtil.scala:491)
at
org.apache.flink.table.sources.TableSourceUtil$$anonfun$org$apache$flink$table$sources$TableSourceUtil$$resolveInputFields$1.apply(TableSourceUtil.scala:521)
at
org.apache.flink.table.sources.TableSourceUtil$$anonfun$org$apache$flink$table$sources$TableSourceUtil$$resolveInputFields$1.apply(TableSourceUtil.scala:521)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
at
org.apache.flink.table.sources.TableSourceUtil$.org$apache$flink$table$sources$TableSourceUtil$$resolveInputFields(TableSourceUtil.scala:521)
at
org.apache.flink.table.sources.TableSourceUtil$.validateTableSource(TableSourceUtil.scala:127)
at
org.apache.flink.table.plan.schema.StreamTableSourceTable.(StreamTableSourceTable.scala:33)
at
org.apache.flink.table.api.StreamTableEnvironment.registerTableSourceInternal(StreamTableEnvironment.scala:150)
at
org.apache.flink.table.api.TableEnvironment.registerTableSource(TableEnvironment.scala:541)
at
org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.scala:47)
at
org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSourceAndSink(ConnectTableDescriptor.scala:68)

And here's the source code:



 case class Record(status: String, direction: String, var event_ts: Timestamp)


  def main(args: Array[String]): Unit = {
val senv = StreamExecutionEnvironment.getExecutionEnvironment
senv.setParallelism(1)
senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

val data: DataStream[Record] = ...
val tEnv = TableEnvironment.getTableEnvironment(senv)
tEnv
  // declare the external system to connect to
  .connect(
  new Kafka()
.version("0.11")
.topic("processed5.events")
.startFromEarliest()
.property("zookeeper.connect", "localhost:2181")
.property("bootstrap.servers", "localhost:9092"))
  .withFormat(new Json()
.failOnMissingField(false)
.deriveSchema()
  )
  .withSchema(
new Schema()
  .field("status", Types.STRING)
  .field("direction", Types.STRING)
  .field("event_ts", Types.SQL_TIMESTAMP).rowtime(
  new 
Rowtime().timestampsFromField("event_ts").watermarksPeriodicAscending())
  )

  // specify the update-mode for streaming tables
  .inAppendMode()

  // register as source, sink, or both and under a name
  .registerTableSourceAndSink("MyUserTable");

tEnv.fromDataStream(data).insertInto("MyUserTable")


Re: Starting a seperate Java process within a Flink cluster

2018-11-02 Thread Jeff Zhang
The error is most likely due to classpath issue. Because classpath is
different when you running flink program in IDE and run it in cluster.

And starting another jvm process in SourceFunction doesn't seems a good
approach to me, is it possible for you to do in your custom SourceFunction ?


Ly, The Anh 于2018年11月2日周五 下午5:22写道:

> Yes, i did. It is definitely there. I tried and made a separate Maven
> project to test if something was wrong with my jar.
> The resulting shaded jar of that test project was fine and the
> message-buffer-process was running with that test jar.
>
>
> Am 02.11.2018 04:47 schrieb Yun Tang :
> Hi
>
> Since you use the message-buffer-process as a dependency and the error
> tells you class not found, have you ever check your application jar package
> whether containing the wanted MessageBufferProcess.class? If not existed,
> try to use assembly-maven
>   or shaded-maven
>  plugin to include
> your classes.
>
> Best
> Yun Tang
> --
> *From:* Ly, The Anh 
> *Sent:* Friday, November 2, 2018 6:33
> *To:* user@flink.apache.org
> *Subject:* Starting a seperate Java process within a Flink cluster
>
>
> Hello,
>
>
> I am currently working on my masters and I encountered a difficult
> problem.
>
>
> Background (for context): I am trying to connect different data stream
> processors. Therefore i am using Flink's internal mechanisms of creating
> custom sinks and sources to receive from and send to different data stream
> processors. I am starting a separate
>
> process (message-buffer-process) in those custom sinks and sources
> to communicate and buffer data into that message-buffer-process.  My
> implementation is created with Maven and it could potentially be added as
> an dependency.
>
>
> Problem: I already tested my implementation by adding it as an dependency
> to a simple Flink word-count example. The test was within an IDE which
> works perfectly fine. But when i package that Flink work-count example and
> try
>
> to run it with "./flink run " or by uploading and submitting it as a job,
> it tells me that my buffer-process-class could not be found:
>
> In German: "Fehler: Hauptklasse
> de.tuberlin.mcc.geddsprocon.messagebuffer.MessageBufferProcess konnte nicht
> gefunden oder geladen werden"
>
> Roughly translated: "Error: Main class
> de.tuberlin.mcc.geddsprocon.messagebuffer.MessageBufferProcess could not be
> found or loaded"
>
>
> Code snipplets:
>
> Example - Adding my custom sink to send data to another data stream
> processor:
>
> dataStream.addSink(
>   (SinkFunction)DSPConnectorFactory
>   .getInstance()
>   .createSinkConnector(
>   new DSPConnectorConfig
>   .Builder("localhost", 9656)
>   .withDSP("flink")
>   
> .withBufferConnectorString("buffer-connection-string")
>   .withHWM(20)
>   .withTimeout(1)
>   .build()));
>
>
>
> The way i am trying to start the separate buffer-process: 
> JavaProcessBuilder.exec(MessageBufferProcess.class, connectionString, 
> addSentMessagesFrame);
> How JavaProcessBuilder.exec looks like:
> public static Process exec(Class javaClass, String connectionString, boolean 
> addSentMessagesFrame) throws IOException, InterruptedException {
> String javaHome = System.getProperty("java.home");
> String javaBin = javaHome +
> File.separator + "bin" +
> File.separator + "java";
> String classpath = System.getProperty("java.class.path");
> String className = javaClass.getCanonicalName();
>
> System.out.println("Trying to build process " + classpath + " " + 
> className);
>
> ProcessBuilder builder = new ProcessBuilder(
> javaBin, "-cp", classpath, className, connectionString, 
> Boolean.toString(addSentMessagesFrame));
>
> builder.redirectOutput(ProcessBuilder.Redirect.INHERIT);
> builder.redirectError(ProcessBuilder.Redirect.INHERIT);
>
> Process process = builder.start();
> return process;
> }
>
> I also tried running that message-buffer process separately in another maven 
> project and its packaged .jar file. That worked perfectly fine too. That is 
> why I am assuming that my approach is not appropriate for running in Flink.
> Did I miss something and starting my approach doesn't actually work within 
> Flink's context? I hope the information I gave you is sufficient to help 
> understanding my issue. If you need any more information feel free to message 
> me!
>
> Thanks for any help!
>
>  With best regards
>
>
>


How to add flink table jar to classpath via bin/flink

2018-09-13 Thread Jeff Zhang
Because flink-table is a provided dependency, so it won't be included in
the final shaded jar. I didn't find way to add custom jar to classpath via
bin/flink, does anyone know that ? Thanks


Can not run scala-shell in yarn mode in flink 1.5

2018-06-05 Thread Jeff Zhang
I try to run scala-shell in yarn mode in 1.5, but hit the following error.
I can run it successfully in 1.4.2. It is the same even when I change the
mode to legacy.  Is this a known issue or something changed in 1.5 ? Thanks

Command I Use: bin/start-scala-shell.sh yarn -n 1


Starting Flink Shell:
2018-06-06 12:30:02,672 INFO
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: jobmanager.rpc.address, localhost
2018-06-06 12:30:02,673 INFO
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: jobmanager.rpc.port, 6123
2018-06-06 12:30:02,674 INFO
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: jobmanager.heap.mb, 1024
2018-06-06 12:30:02,674 INFO
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: taskmanager.heap.mb, 1024
2018-06-06 12:30:02,674 INFO
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: taskmanager.numberOfTaskSlots, 1
2018-06-06 12:30:02,674 INFO
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: parallelism.default, 1
2018-06-06 12:30:02,675 INFO
org.apache.flink.configuration.GlobalConfiguration- Loading
configuration property: rest.port, 8081
Exception in thread "main" java.lang.UnsupportedOperationException: Can't
deploy a standalone cluster.
at
org.apache.flink.client.deployment.StandaloneClusterDescriptor.deploySessionCluster(StandaloneClusterDescriptor.java:57)
at
org.apache.flink.client.deployment.StandaloneClusterDescriptor.deploySessionCluster(StandaloneClusterDescriptor.java:31)
at
org.apache.flink.api.scala.FlinkShell$.deployNewYarnCluster(FlinkShell.scala:272)
at
org.apache.flink.api.scala.FlinkShell$.fetchConnectionInfo(FlinkShell.scala:164)
at
org.apache.flink.api.scala.FlinkShell$.liftedTree1$1(FlinkShell.scala:194)
at org.apache.flink.api.scala.FlinkShell$.startShell(FlinkShell.scala:193)
at org.apache.flink.api.scala.FlinkShell$.main(FlinkShell.scala:135)
at org.apache.flink.api.scala.FlinkShell.main(FlinkShell.scala)