[jira] [Created] (FLINK-15218) java.lang.NoClassDefFoundError: org/apache/flink/table/sources/TableSource

2019-12-11 Thread tyler fan (Jira)
tyler fan created FLINK-15218:
-

 Summary: java.lang.NoClassDefFoundError: 
org/apache/flink/table/sources/TableSource
 Key: FLINK-15218
 URL: https://issues.apache.org/jira/browse/FLINK-15218
 Project: Flink
  Issue Type: Bug
 Environment: IDEA 2019.2

jdk 1.8

no local flink envirment
Reporter: tyler fan


trying application develop in IDE,

 

making a simple app like read csv file and register like a table,



final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env);

String[] colname = new String[] 
\{"CountryName","CountryCode","IndicatorName","IndicatorCode","1960"};
TypeInformation[] coltype = new TypeInformation[] 
\{Types.STRING,Types.STRING,Types.STRING,Types.STRING,Types.STRING};

TableSource dds = new 
CsvTableSource("/home/tylerf/sampledata/gpd.csv",colname,coltype);
tableEnv.registerTableSource("gpd", dds);
Table tab = tableEnv.scan("gpd");

 

-

In IDEA

got the error like this when i running the sample.



java.lang.NoClassDefFoundError: org/apache/flink/table/sources/TableSource



pom is configed the dependence by the document 1.9

-


 org.apache.flink
 flink-java
 ${flink.version}
 provided




 org.apache.flink
 flink-streaming-java_${scala.binary.version}
 ${flink.version}
 provided




 org.apache.flink
 flink-table-api-java-bridge_2.11
 1.9.0
 provided


-

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15217) 'java.time.LocalDate' should support for the CSV input format.

2019-12-11 Thread xiaojin.wy (Jira)
xiaojin.wy created FLINK-15217:
--

 Summary: 'java.time.LocalDate' should support for the CSV input 
format.
 Key: FLINK-15217
 URL: https://issues.apache.org/jira/browse/FLINK-15217
 Project: Flink
  Issue Type: Bug
  Components: Connectors / FileSystem
Affects Versions: 1.10.0
Reporter: xiaojin.wy


*The sql is like this:*

CREATE TABLE `DATE_TBL` (
 > f1 date
 > ) WITH (
 > 'format.field-delimiter'='|',
 > 'connector.type'='filesystem',
 > 'format.derive-schema'='true',
 > 'connector.path'='hdfs://zthdev/defender_test_data/daily/test_date/sources/DATE_TBL.csv',
 > 'format.type'='csv'
 > );

SELECT f1 AS Fifteen FROM DATE_TBL;

 

*After excute the sql, there will be a exception :*

[ERROR] Could not execute SQL statement. Reason:
 java.lang.IllegalArgumentException: The type 'java.time.LocalDate' is not 
supported for the CSV input format.

*The input file's content is:*

1957-04-09
 1957-06-13
 1996-02-28
 1996-02-29
 1996-03-01
 1996-03-02
 1997-02-28
 1997-03-01
 1997-03-02
 2000-04-01
 2000-04-02
 2000-04-03
 2038-04-08
 2039-04-09

 

*The whole exception is:*

Caused by: java.lang.IllegalArgumentException: The type 'java.time.LocalDate' 
is not supported for the CSV input format.Caused by: 
java.lang.IllegalArgumentException: The type 'java.time.LocalDate' is not 
supported for the CSV input format. at 
org.apache.flink.api.common.io.GenericCsvInputFormat.setFieldsGeneric(GenericCsvInputFormat.java:289)
 at 
org.apache.flink.api.java.io.RowCsvInputFormat.(RowCsvInputFormat.java:64)
 at 
org.apache.flink.table.sources.CsvTableSource$CsvInputFormatConfig.createInputFormat(CsvTableSource.java:518)
 at 
org.apache.flink.table.sources.CsvTableSource.getDataStream(CsvTableSource.java:182)
 at 
org.apache.flink.table.plan.nodes.datastream.StreamTableSourceScan.translateToPlan(StreamTableSourceScan.scala:97)
 at 
org.apache.flink.table.planner.StreamPlanner.translateToCRow(StreamPlanner.scala:251)
 at 
org.apache.flink.table.planner.StreamPlanner.translateOptimized(StreamPlanner.scala:410)
 at 
org.apache.flink.table.planner.StreamPlanner.translateToType(StreamPlanner.scala:400)
 at 
org.apache.flink.table.planner.StreamPlanner.writeToRetractSink(StreamPlanner.scala:308)
 at 
org.apache.flink.table.planner.StreamPlanner.org$apache$flink$table$planner$StreamPlanner$$writeToSink(StreamPlanner.scala:272)
 at 
org.apache.flink.table.planner.StreamPlanner$$anonfun$2.apply(StreamPlanner.scala:166)
 at 
org.apache.flink.table.planner.StreamPlanner$$anonfun$2.apply(StreamPlanner.scala:145)
 at scala.Option.map(Option.scala:146) at 
org.apache.flink.table.planner.StreamPlanner.org$apache$flink$table$planner$StreamPlanner$$translate(StreamPlanner.scala:145)
 at 
org.apache.flink.table.planner.StreamPlanner$$anonfun$translate$1.apply(StreamPlanner.scala:117)
 at 
org.apache.flink.table.planner.StreamPlanner$$anonfun$translate$1.apply(StreamPlanner.scala:117)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at scala.collection.Iterator$class.foreach(Iterator.scala:891) at 
scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at 
scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at 
scala.collection.AbstractIterable.foreach(Iterable.scala:54) at 
scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at 
scala.collection.AbstractTraversable.map(Traversable.scala:104) at 
org.apache.flink.table.planner.StreamPlanner.translate(StreamPlanner.scala:117) 
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:680)
 at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.insertIntoInternal(TableEnvironmentImpl.java:353)
 at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.insertInto(TableEnvironmentImpl.java:341)
 at 
org.apache.flink.table.api.internal.TableImpl.insertInto(TableImpl.java:428) at 
org.apache.flink.table.client.gateway.local.LocalExecutor.lambda$executeQueryInternal$12(LocalExecutor.java:640)
 at 
org.apache.flink.table.client.gateway.local.ExecutionContext.wrapClassLoader(ExecutionContext.java:227)
 at 
org.apache.flink.table.client.gateway.local.LocalExecutor.executeQueryInternal(LocalExecutor.java:638)

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [ANNOUNCE] Apache Flink 1.8.3 released

2019-12-11 Thread Wei Zhong
Thanks Hequn for being the release manager. Great work!

Best,
Wei

> 在 2019年12月12日,15:27,Jingsong Li  写道:
> 
> Thanks Hequn for your driving, 1.8.3 fixed a lot of issues and it is very 
> useful to users.
> Great work!
> 
> Best,
> Jingsong Lee
> 
> On Thu, Dec 12, 2019 at 3:25 PM jincheng sun  > wrote:
> Thanks for being the release manager and the great work Hequn :)
> Also thanks to the community making this release possible!
> 
> Best,
> Jincheng
> 
> Jark Wu mailto:imj...@gmail.com>> 于2019年12月12日周四 下午3:23写道:
> Thanks Hequn for helping out this release and being the release manager.
> Great work!
> 
> Best,
> Jark
> 
> On Thu, 12 Dec 2019 at 15:02, Jeff Zhang  > wrote:
> 
> > Great work, Hequn
> >
> > Dian Fu mailto:dian0511...@gmail.com>> 
> > 于2019年12月12日周四 下午2:32写道:
> >
> >> Thanks Hequn for being the release manager and everyone who contributed
> >> to this release.
> >>
> >> Regards,
> >> Dian
> >>
> >> 在 2019年12月12日,下午2:24,Hequn Cheng  >> > 写道:
> >>
> >> Hi,
> >>
> >> The Apache Flink community is very happy to announce the release of
> >> Apache Flink 1.8.3, which is the third bugfix release for the Apache Flink
> >> 1.8 series.
> >>
> >> Apache Flink® is an open-source stream processing framework for
> >> distributed, high-performing, always-available, and accurate data streaming
> >> applications.
> >>
> >> The release is available for download at:
> >> https://flink.apache.org/downloads.html 
> >> 
> >>
> >> Please check out the release blog post for an overview of the
> >> improvements for this bugfix release:
> >> https://flink.apache.org/news/2019/12/11/release-1.8.3.html 
> >> 
> >>
> >> The full release notes are available in Jira:
> >> https://issues.apache.org/jira/projects/FLINK/versions/12346112 
> >> 
> >>
> >> We would like to thank all contributors of the Apache Flink community who
> >> made this release possible!
> >> Great thanks to @Jincheng as a mentor during this release.
> >>
> >> Regards,
> >> Hequn
> >>
> >>
> >>
> >
> > --
> > Best Regards
> >
> > Jeff Zhang
> >
> 
> 
> -- 
> Best, Jingsong Lee



Re: [ANNOUNCE] Apache Flink 1.8.3 released

2019-12-11 Thread Jingsong Li
Thanks Hequn for your driving, 1.8.3 fixed a lot of issues and it is very
useful to users.
Great work!

Best,
Jingsong Lee

On Thu, Dec 12, 2019 at 3:25 PM jincheng sun 
wrote:

> Thanks for being the release manager and the great work Hequn :)
> Also thanks to the community making this release possible!
>
> Best,
> Jincheng
>
> Jark Wu  于2019年12月12日周四 下午3:23写道:
>
>> Thanks Hequn for helping out this release and being the release manager.
>> Great work!
>>
>> Best,
>> Jark
>>
>> On Thu, 12 Dec 2019 at 15:02, Jeff Zhang  wrote:
>>
>> > Great work, Hequn
>> >
>> > Dian Fu  于2019年12月12日周四 下午2:32写道:
>> >
>> >> Thanks Hequn for being the release manager and everyone who contributed
>> >> to this release.
>> >>
>> >> Regards,
>> >> Dian
>> >>
>> >> 在 2019年12月12日,下午2:24,Hequn Cheng  写道:
>> >>
>> >> Hi,
>> >>
>> >> The Apache Flink community is very happy to announce the release of
>> >> Apache Flink 1.8.3, which is the third bugfix release for the Apache
>> Flink
>> >> 1.8 series.
>> >>
>> >> Apache Flink® is an open-source stream processing framework for
>> >> distributed, high-performing, always-available, and accurate data
>> streaming
>> >> applications.
>> >>
>> >> The release is available for download at:
>> >> https://flink.apache.org/downloads.html
>> >>
>> >> Please check out the release blog post for an overview of the
>> >> improvements for this bugfix release:
>> >> https://flink.apache.org/news/2019/12/11/release-1.8.3.html
>> >>
>> >> The full release notes are available in Jira:
>> >> https://issues.apache.org/jira/projects/FLINK/versions/12346112
>> >>
>> >> We would like to thank all contributors of the Apache Flink community
>> who
>> >> made this release possible!
>> >> Great thanks to @Jincheng as a mentor during this release.
>> >>
>> >> Regards,
>> >> Hequn
>> >>
>> >>
>> >>
>> >
>> > --
>> > Best Regards
>> >
>> > Jeff Zhang
>> >
>>
>

-- 
Best, Jingsong Lee


Re: [ANNOUNCE] Apache Flink 1.8.3 released

2019-12-11 Thread jincheng sun
Thanks for being the release manager and the great work Hequn :)
Also thanks to the community making this release possible!

Best,
Jincheng

Jark Wu  于2019年12月12日周四 下午3:23写道:

> Thanks Hequn for helping out this release and being the release manager.
> Great work!
>
> Best,
> Jark
>
> On Thu, 12 Dec 2019 at 15:02, Jeff Zhang  wrote:
>
> > Great work, Hequn
> >
> > Dian Fu  于2019年12月12日周四 下午2:32写道:
> >
> >> Thanks Hequn for being the release manager and everyone who contributed
> >> to this release.
> >>
> >> Regards,
> >> Dian
> >>
> >> 在 2019年12月12日,下午2:24,Hequn Cheng  写道:
> >>
> >> Hi,
> >>
> >> The Apache Flink community is very happy to announce the release of
> >> Apache Flink 1.8.3, which is the third bugfix release for the Apache
> Flink
> >> 1.8 series.
> >>
> >> Apache Flink® is an open-source stream processing framework for
> >> distributed, high-performing, always-available, and accurate data
> streaming
> >> applications.
> >>
> >> The release is available for download at:
> >> https://flink.apache.org/downloads.html
> >>
> >> Please check out the release blog post for an overview of the
> >> improvements for this bugfix release:
> >> https://flink.apache.org/news/2019/12/11/release-1.8.3.html
> >>
> >> The full release notes are available in Jira:
> >> https://issues.apache.org/jira/projects/FLINK/versions/12346112
> >>
> >> We would like to thank all contributors of the Apache Flink community
> who
> >> made this release possible!
> >> Great thanks to @Jincheng as a mentor during this release.
> >>
> >> Regards,
> >> Hequn
> >>
> >>
> >>
> >
> > --
> > Best Regards
> >
> > Jeff Zhang
> >
>


Re: [ANNOUNCE] Apache Flink 1.8.3 released

2019-12-11 Thread Jark Wu
Thanks Hequn for helping out this release and being the release manager.
Great work!

Best,
Jark

On Thu, 12 Dec 2019 at 15:02, Jeff Zhang  wrote:

> Great work, Hequn
>
> Dian Fu  于2019年12月12日周四 下午2:32写道:
>
>> Thanks Hequn for being the release manager and everyone who contributed
>> to this release.
>>
>> Regards,
>> Dian
>>
>> 在 2019年12月12日,下午2:24,Hequn Cheng  写道:
>>
>> Hi,
>>
>> The Apache Flink community is very happy to announce the release of
>> Apache Flink 1.8.3, which is the third bugfix release for the Apache Flink
>> 1.8 series.
>>
>> Apache Flink® is an open-source stream processing framework for
>> distributed, high-performing, always-available, and accurate data streaming
>> applications.
>>
>> The release is available for download at:
>> https://flink.apache.org/downloads.html
>>
>> Please check out the release blog post for an overview of the
>> improvements for this bugfix release:
>> https://flink.apache.org/news/2019/12/11/release-1.8.3.html
>>
>> The full release notes are available in Jira:
>> https://issues.apache.org/jira/projects/FLINK/versions/12346112
>>
>> We would like to thank all contributors of the Apache Flink community who
>> made this release possible!
>> Great thanks to @Jincheng as a mentor during this release.
>>
>> Regards,
>> Hequn
>>
>>
>>
>
> --
> Best Regards
>
> Jeff Zhang
>


[jira] [Created] (FLINK-15216) Can't use rocksdb with hdfs filesystem with flink-s3-fs-hadoop

2019-12-11 Thread Tank (Jira)
Tank created FLINK-15216:


 Summary: Can't use rocksdb with hdfs filesystem with 
flink-s3-fs-hadoop
 Key: FLINK-15216
 URL: https://issues.apache.org/jira/browse/FLINK-15216
 Project: Flink
  Issue Type: Bug
  Components: FileSystems
Affects Versions: 1.9.0
Reporter: Tank


Setup:
Flink 1.9.0 packaged with EMR 5.28 (uses hadoop 2.8.5)

After adding flink-s3-fs-hadoop to the plugins or libs directory, I was not 
able to use rocksdb with hdfs filesystem.

 

To replicate:

 

Add rocksdb checkpoint & savepoint configuration to 
/etc/flink/conf/flink-conf.yaml

state.backend: rocksdb
state.backend.incremental: true
state.savepoints.dir: hdfs://savepoints/
state.checkpoints.dir: hdfs:///checkpoints/

 

copy  flink-s3-fs-hadoop to plugins directory

 

{{cp ./opt/flink-s3-fs-hadoop-1.9.0.jar ./plugins/s3-fs-hadoop/}}

 

Start any job:

 

flink run -m yarn-cluster -yid ${app_id} -yn 1 -p 1 
/usr/lib/flink/examples/streaming/WordCount.jar

 

Looks like the hadoop classes packaged in flink-s3-fs-hadoop are interfering 
with flink runtime.

{{org.apache.flink.client.program.ProgramInvocationException: Could not 
retrieve the execution result. (JobID: 
b2d8b907839d82dda5e7407248941d4d)org.apache.flink.client.program.ProgramInvocationException:
 Could not retrieve the execution result. (JobID: 
b2d8b907839d82dda5e7407248941d4d) at 
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:255)
 at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:338) 
at 
org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:60)
 at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
 at 
org.apache.flink.streaming.examples.wordcount.WordCount.main(WordCount.java:89) 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498) at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
 at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
 at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274) 
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746) 
at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:273) at 
org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205) at 
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010) 
at 
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083) 
at java.security.AccessController.doPrivileged(Native Method) at 
javax.security.auth.Subject.doAs(Subject.java:422) at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844)
 at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
 at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)Caused 
by: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit 
JobGraph. at 
org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$8(RestClusterClient.java:382)
 at 
java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
 at 
java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
 at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) 
at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
 at 
org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:263)
 at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
 at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
 at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) 
at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561) 
at 
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929)
 at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
at java.lang.Thread.run(Thread.java:748)Caused by: 
org.apache.flink.runtime.rest.util.RestClientException: [Internal server 
error., (JobManagerRunner.java:152)
 at 
org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:83)
 at 

Re: [RESULT] [VOTE] Release 1.8.3, release candidate #3

2019-12-11 Thread Hequn Cheng
Hi Patrick,

The release has been announced.

+1 to integrate the publication of Docker images into the Flink release
process. Thus we can leverage the current release procedure for the docker
image.
Looking forward to the proposal.

Best, Hequn

On Thu, Dec 12, 2019 at 1:52 PM Yang Wang  wrote:

> Hi Lucas,
>
> That's great if we could integrate the publication of Flink official docker
> images into
> the Flink release process. Since many users are using or starting to use
> Flink in
> container environments.
>
>
> Best,
> Yang
>
> Patrick Lucas  于2019年12月11日周三 下午11:44写道:
>
> > Thanks, Hequn!
> >
> > The Dockerfiles for the Flink images on Docker Hub for the 1.8.3 release
> > are prepared[1] and I'll open a pull request upstream[2] once the release
> > announcement has gone out.
> >
> > And stay tuned: I'm working on a proposal for integrating the publication
> > of these Docker images into the Flink release process and will send it
> out
> > to the dev list before or shortly after the holidays.
> >
> > [1]
> >
> >
> https://github.com/docker-flink/docker-flink/commit/4d85b71b7cf9fa4a38f8682ed13aa0f55445e32e
> > [2] https://github.com/docker-library/official-images/
> >
> > On Wed, Dec 11, 2019 at 3:30 AM Hequn Cheng 
> wrote:
> >
> > > Hi everyone,
> > >
> > > I'm happy to announce that we have unanimously approved this release.
> > >
> > > There are 10 approving votes, 3 of which are binding:
> > > * Jincheng (binding)
> > > * Ufuk (binding)
> > > * Till (binding)
> > > * Jingsong
> > > * Fabian
> > > * Danny
> > > * Yang Wang
> > > * Dian
> > > * Wei
> > > * Hequn
> > >
> > > There are no disapproving votes.
> > >
> > > Thanks everyone!
> > >
> >
>


[jira] [Created] (FLINK-15215) Not able to provide a custom AWS credentials provider with flink-s3-fs-hadoop

2019-12-11 Thread Arjun Prakash (Jira)
Arjun Prakash created FLINK-15215:
-

 Summary: Not able to provide a custom AWS credentials provider 
with flink-s3-fs-hadoop
 Key: FLINK-15215
 URL: https://issues.apache.org/jira/browse/FLINK-15215
 Project: Flink
  Issue Type: Bug
  Components: FileSystems
Affects Versions: 1.9.0
Reporter: Arjun Prakash


I am using Flink 1.9.0 on EMR (emr 5.28), with StreamingFileSink using an S3 
filesystem.

I want flink to write to an S3 bucket which is running in another AWS account, 
and I want to do that by assuming a role in the other account. This can be 
easily accomplished by providing a custom credential provider (similar to 
[https://aws.amazon.com/blogs/big-data/securely-analyze-data-from-another-aws-account-with-emrfs/])

 

As described [here|#pluggable-file-systems]], I copied 
{{flink-s3-fs-hadoop-1.9.0.jar}} to the plugins directory. But the 
configuration parameter 'fs.s3a.aws.credentials.provider' is getting shaded 
[https://github.com/apache/flink/blob/master/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/S3FileSystemFactory.java#L47]}},
 and so are all the aws sdk dependencies, so when I provide a custom credential 
provider, it complained that I was not implementing the correct interface 
(AWSCredentialsProvider) 

The fix made in https://issues.apache.org/jira/browse/FLINK-13044 allows users 
to use one of the built-in credential providers like 
`_InstanceProfileCredentialsProvider`,_ but still does not help with providing 
custom credential providers.

 

Related: https://issues.apache.org/jira/browse/FLINK-13602



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [ANNOUNCE] Apache Flink 1.8.3 released

2019-12-11 Thread Jeff Zhang
Great work, Hequn

Dian Fu  于2019年12月12日周四 下午2:32写道:

> Thanks Hequn for being the release manager and everyone who contributed to
> this release.
>
> Regards,
> Dian
>
> 在 2019年12月12日,下午2:24,Hequn Cheng  写道:
>
> Hi,
>
> The Apache Flink community is very happy to announce the release of Apache
> Flink 1.8.3, which is the third bugfix release for the Apache Flink 1.8
> series.
>
> Apache Flink® is an open-source stream processing framework for
> distributed, high-performing, always-available, and accurate data streaming
> applications.
>
> The release is available for download at:
> https://flink.apache.org/downloads.html
>
> Please check out the release blog post for an overview of the improvements
> for this bugfix release:
> https://flink.apache.org/news/2019/12/11/release-1.8.3.html
>
> The full release notes are available in Jira:
> https://issues.apache.org/jira/projects/FLINK/versions/12346112
>
> We would like to thank all contributors of the Apache Flink community who
> made this release possible!
> Great thanks to @Jincheng as a mentor during this release.
>
> Regards,
> Hequn
>
>
>

-- 
Best Regards

Jeff Zhang


Re: [ANNOUNCE] Apache Flink 1.8.3 released

2019-12-11 Thread Dian Fu
Thanks Hequn for being the release manager and everyone who contributed to this 
release.

Regards,
Dian

> 在 2019年12月12日,下午2:24,Hequn Cheng  写道:
> 
> Hi,
>  
> The Apache Flink community is very happy to announce the release of Apache 
> Flink 1.8.3, which is the third bugfix release for the Apache Flink 1.8 
> series.
>  
> Apache Flink® is an open-source stream processing framework for distributed, 
> high-performing, always-available, and accurate data streaming applications.
>  
> The release is available for download at:
> https://flink.apache.org/downloads.html 
> 
>  
> Please check out the release blog post for an overview of the improvements 
> for this bugfix release:
> https://flink.apache.org/news/2019/12/11/release-1.8.3.html 
> 
>  
> The full release notes are available in Jira:
> https://issues.apache.org/jira/projects/FLINK/versions/12346112 
> 
>  
> We would like to thank all contributors of the Apache Flink community who 
> made this release possible!
> Great thanks to @Jincheng as a mentor during this release.
>  
> Regards,
> Hequn 



[jira] [Created] (FLINK-15214) Adding multiple submission e2e test for Flink's Mesos integration

2019-12-11 Thread Yangze Guo (Jira)
Yangze Guo created FLINK-15214:
--

 Summary: Adding multiple submission e2e test for Flink's Mesos 
integration
 Key: FLINK-15214
 URL: https://issues.apache.org/jira/browse/FLINK-15214
 Project: Flink
  Issue Type: Test
  Components: Tests
Reporter: Yangze Guo
 Fix For: 1.11.0


As discussed, we need a e2e test to verify the user's flow of submitting 
multiple jobs, in which the second job should reuse the slots of the first job.

More detail could be found [in 
ML|http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Adding-e2e-tests-for-Flink-s-Mesos-integration-td35660.html].



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[ANNOUNCE] Apache Flink 1.8.3 released

2019-12-11 Thread Hequn Cheng
Hi,

The Apache Flink community is very happy to announce the release of Apache
Flink 1.8.3, which is the third bugfix release for the Apache Flink 1.8
series.

Apache Flink® is an open-source stream processing framework for
distributed, high-performing, always-available, and accurate data streaming
applications.

The release is available for download at:
https://flink.apache.org/downloads.html

Please check out the release blog post for an overview of the improvements
for this bugfix release:
https://flink.apache.org/news/2019/12/11/release-1.8.3.html

The full release notes are available in Jira:
https://issues.apache.org/jira/projects/FLINK/versions/12346112

We would like to thank all contributors of the Apache Flink community who
made this release possible!
Great thanks to @Jincheng as a mentor during this release.

Regards,
Hequn


[jira] [Created] (FLINK-15213) The conversion between java.sql.Timestamp and long is not asymmetric

2019-12-11 Thread Zhenghua Gao (Jira)
Zhenghua Gao created FLINK-15213:


 Summary: The conversion between java.sql.Timestamp and long is not 
asymmetric
 Key: FLINK-15213
 URL: https://issues.apache.org/jira/browse/FLINK-15213
 Project: Flink
  Issue Type: Bug
Reporter: Zhenghua Gao


In Calcite, we use SqlFunctions.toLong(Timestamp) and 
SqlFunctions.internalToTimestamp(long) to convert java.sql.Timestmap to 
internal long and vice versa. The main logical inside is +/- local time zone 
offset.

But in the comments of TimeZone.getOffset(long date), the parameter represents 
in milliseconds since January 1, 1970 00:00:00 GMT. It means that there will 
one conversion above doesn't satisfy this hypothesis.

 

This causes many surprise to users:

(1) some Daylight Saving Time changes:

 
{code:java}
@Test public void testDayLightingSaving() {
 TimeZone tz = TimeZone.getDefault();
 TimeZone.setDefault(TimeZone.getTimeZone("America/Los_Angeles"));

 java.sql.Timestamp dst2018Begin = java.sql.Timestamp.valueOf("2018-03-11 
03:00:00");

 assertThat(dst2018Begin, is(internalToTimestamp(toLong(dst2018Begin;

 TimeZone.setDefault(tz);
}{code}
fails with:
{code:java}
java.lang.AssertionError: 
Expected: is <2018-03-11 04:00:00.0>
 but: was <2018-03-11 03:00:00.0>
Expected :is <2018-03-11 04:00:00.0>
Actual :<2018-03-11 03:00:00.0>{code}
 

(2) "1900-01-01 00:00:00" Changes in some TimeZone
{code:java}
@Test public void test() {
 TimeZone tz = TimeZone.getDefault();
 TimeZone.setDefault(TimeZone.getTimeZone("Asia/Shanghai"));

 java.sql.Timestamp ts = java.sql.Timestamp.valueOf("1900-01-01 00:00:00");
 assertThat(ts, is(internalToTimestamp(toLong(ts;
 TimeZone.setDefault(tz);
}{code}
fails with
{code:java}
java.lang.AssertionError: 
Expected: is <1899-12-31 23:54:17.0>
 but: was <1900-01-01 00:00:00.0>
Expected :is <1899-12-31 23:54:17.0>
Actual :<1900-01-01 00:00:00.0>
{code}
 

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [RESULT] [VOTE] Release 1.8.3, release candidate #3

2019-12-11 Thread Yang Wang
Hi Lucas,

That's great if we could integrate the publication of Flink official docker
images into
the Flink release process. Since many users are using or starting to use
Flink in
container environments.


Best,
Yang

Patrick Lucas  于2019年12月11日周三 下午11:44写道:

> Thanks, Hequn!
>
> The Dockerfiles for the Flink images on Docker Hub for the 1.8.3 release
> are prepared[1] and I'll open a pull request upstream[2] once the release
> announcement has gone out.
>
> And stay tuned: I'm working on a proposal for integrating the publication
> of these Docker images into the Flink release process and will send it out
> to the dev list before or shortly after the holidays.
>
> [1]
>
> https://github.com/docker-flink/docker-flink/commit/4d85b71b7cf9fa4a38f8682ed13aa0f55445e32e
> [2] https://github.com/docker-library/official-images/
>
> On Wed, Dec 11, 2019 at 3:30 AM Hequn Cheng  wrote:
>
> > Hi everyone,
> >
> > I'm happy to announce that we have unanimously approved this release.
> >
> > There are 10 approving votes, 3 of which are binding:
> > * Jincheng (binding)
> > * Ufuk (binding)
> > * Till (binding)
> > * Jingsong
> > * Fabian
> > * Danny
> > * Yang Wang
> > * Dian
> > * Wei
> > * Hequn
> >
> > There are no disapproving votes.
> >
> > Thanks everyone!
> >
>


Re: [DISCUSS] FLIP-27: Refactor Source Interface

2019-12-11 Thread Jingsong Li
Hi Becket,

I also have some performance concerns too.

If I understand correctly, SourceOutput will emit data per record into the
queue? I'm worried about the multithreading performance of this queue.

> One example is some batched messaging systems which only have an offset
for the entire batch instead of individual messages in the batch.

As you said, there are some batched system source, like parquet/orc source.
Could we have the batch emit interface to improve performance? The queue of
per record may cause performance degradation.

Best,
Jingsong Lee

On Thu, Dec 12, 2019 at 9:15 AM Jark Wu  wrote:

> Hi Becket,
>
> I think Dawid explained things clearly and makes a lot of sense.
> I'm also in favor of #2, because #1 doesn't work for our future unified
> envrionment.
>
> You can see the vision in this documentation [1]. In the future, we would
> like to
> drop the global streaming/batch mode in SQL (i.e.
> EnvironmentSettings#inStreamingMode/inBatchMode).
> A source is bounded or unbounded once defined, so queries can be inferred
> from source to run
> in streaming or batch or hybrid mode. However, in #1, we will lose this
> ability because the framework
> doesn't know whether the source is bounded or unbounded.
>
> Best,
> Jark
>
>
> [1]:
>
> https://docs.google.com/document/d/1yrKXEIRATfxHJJ0K3t6wUgXAtZq8D-XgvEnvl2uUcr0/edit#heading=h.v4ib17buma1p
>
> On Wed, 11 Dec 2019 at 20:52, Piotr Nowojski  wrote:
>
> > Hi,
> >
> > Regarding the:
> >
> > Collection getNextRecords()
> >
> > I’m pretty sure such design would unfortunately impact the performance
> > (accessing and potentially creating the collection on the hot path).
> >
> > Also the
> >
> > InputStatus emitNext(DataOutput output) throws Exception;
> > or
> > Status pollNext(SourceOutput sourceOutput) throws Exception;
> >
> > Gives us some opportunities in the future, to allow Source hot looping
> > inside, until it receives some signal “please exit because of some
> reasons”
> > (output collector could return such hint upon collecting the result). But
> > that’s another topic outside of this FLIP’s scope.
> >
> > Piotrek
> >
> > > On 11 Dec 2019, at 10:41, Till Rohrmann  wrote:
> > >
> > > Hi Becket,
> > >
> > > quick clarification from my side because I think you misunderstood my
> > > question. I did not suggest to let the SourceReader return only a
> single
> > > record at a time when calling getNextRecords. As the return type
> > indicates,
> > > the method can return an arbitrary number of records.
> > >
> > > Cheers,
> > > Till
> > >
> > > On Wed, Dec 11, 2019 at 10:13 AM Dawid Wysakowicz <
> > dwysakow...@apache.org >
> > > wrote:
> > >
> > >> Hi Becket,
> > >>
> > >> Issue #1 - Design of Source interface
> > >>
> > >> I mentioned the lack of a method like
> > Source#createEnumerator(Boundedness
> > >> boundedness, SplitEnumeratorContext context), because without the
> > current
> > >> proposal is not complete/does not work.
> > >>
> > >> If we say that boundedness is an intrinsic property of a source imo we
> > >> don't need the Source#createEnumerator(Boundedness boundedness,
> > >> SplitEnumeratorContext context) method.
> > >>
> > >> Assuming a source from my previous example:
> > >>
> > >> Source source = KafkaSource.builder()
> > >>  ...
> > >>  .untilTimestamp(...)
> > >>  .build()
> > >>
> > >> Would the enumerator differ if created like
> > >> source.createEnumerator(CONTINUOUS_UNBOUNDED, ...) vs source
> > >> .createEnumerator(BOUNDED, ...)? I know I am repeating myself, but
> this
> > is
> > >> the part that my opinion differ the most from the current proposal. I
> > >> really think it should always be the source that tells if it is
> bounded
> > or
> > >> not. In the current proposal methods continousSource/boundedSource
> > somewhat
> > >> reconfigure the source, which I think is misleading.
> > >>
> > >> I think a call like:
> > >>
> > >> Source source = KafkaSource.builder()
> > >>  ...
> > >>  .readContinously() / readUntilLatestOffset() / readUntilTimestamp /
> > readUntilOffsets / ...
> > >>  .build()
> > >>
> > >> is way cleaner (and expressive) than
> > >>
> > >> Source source = KafkaSource.builder()
> > >>  ...
> > >>  .build()
> > >>
> > >>
> > >> env.continousSource(source) // which actually underneath would call
> > createEnumerator(CONTINUOUS, ctx) which would be equivalent to
> > source.readContinously().createEnumerator(ctx)
> > >> // or
> > >> env.boundedSource(source) // which actually underneath would call
> > createEnumerator(BOUNDED, ctx) which would be equivalent to
> > source.readUntilLatestOffset().createEnumerator(ctx)
> > >>
> > >>
> > >> Sorry for the comparison, but to me it seems there is too much magic
> > >> happening underneath those two calls.
> > >>
> > >> I really believe the Source interface should have getBoundedness
> method
> > >> instead of (supportBoundedness) + createEnumerator(Boundedness, ...)
> > >>
> > >>
> > >> Issue #2 - Design of
> > >> 

Re: [DISCUSS] Overwrite and partition inserting support in 1.10

2019-12-11 Thread Danny Chan
Thanks Jingsong for bring up this discussion ~

After reviewing FLIP-63, it seems that we have made a conclusion for the syntax

- INSERT OVERWRITE ...
- INSERT INTO … PARTITION

Which means that they should not have the Hive dialect limitation, so I’m 
inclined that the behaviors for SQL-CLI is unexpected, or a “bug” that need to 
fix.

We did not make a conclusion for the syntax:

- CREATE TABLE … PARTITIONED BY ...

Which means that the behavior of it is under-discussion, so it is okey to be 
without the HIVE dialect limitation, we do not actually have any table 
sources/sinks that support such a DDL so for current code base, users should 
not be influenced by the behaviors change.

So I’m

+1 to remove the hive dialect limitations for INSERT OVERWRITE and INSERT 
PARTITION
+0 to add yaml dialect conf to SQL-CLI because FLIP-89 is not finished yet, we 
better do this until FLIP-89 is resolved.

Best,
Danny Chan
在 2019年12月11日 +0800 PM5:29,Jingsong Li ,写道:
> Hi Dev,
>
> After cutting out the branch of 1.10, I tried the following functions of
> SQL-CLI and found that it does not support:
> - insert overwrite
> - PARTITION (partcol1=val1, partcol2=val2 ...)
> The SQL pattern is:
> INSERT { INTO | OVERWRITE } TABLE tablename1 [PARTITION (partcol1=val1,
> partcol2=val2 ...) select_statement1 FROM from_statement;
> It is a surprise to me.
> The reason is that we only allow these two grammars in hive dialect. And
> SQL-CLI does not have an interface to switch dialects.
>
> Because it directly hinders the SQL-CLI's insert syntax in hive integration
> and seriously hinders the practicability of SQL-CLI.
> And we have introduced these two grammars in FLIP-63 [1] to Flink.
> Here are my question:
> 1.Should we remove hive dialect limitation for these two grammars?
> 2.Should we fix this in 1.10?
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-63%3A+Rework+table+partition+support
>
> Best,
> Jingsong Lee


[jira] [Created] (FLINK-15212) PROCTIME attribute causes problems with timestamp times before 1900 ?

2019-12-11 Thread Rockey Cui (Jira)
Rockey Cui created FLINK-15212:
--

 Summary:  PROCTIME attribute causes problems with timestamp times 
before 1900 ?
 Key: FLINK-15212
 URL: https://issues.apache.org/jira/browse/FLINK-15212
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / API
Affects Versions: 1.9.1
 Environment: flink 1.9.1

jdk1.8.0_211

idea2019.3
Reporter: Rockey Cui


A simple DataStreamSource with timestamp registered as a table.

 
{code:java}
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
env.setParallelism(1);
DataStreamSource stringDataStreamSource = env.fromElements(
"1001,1002,adc0,1900-01-01 00:00:00.0",
"1002,1003,adc1,1910-01-01 00:00:00.0",
"1003,1004,adc2,1920-01-01 00:00:00.0",
"1004,1005,adc3,1930-01-01 00:00:00.0",
"1005,1006,adc4,1970-01-01 00:00:00.0",
",,adc5,1971-01-01 00:00:00.0"
);
TypeInformation[] fieldTypes = new TypeInformation[]{Types.LONG, Types.LONG, 
Types.STRING, Types.SQL_TIM
String[] fieldNames = new String[]{"id", "cityId", "url", "clickTime"};
RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes, fieldNames);
DataStream stream = stringDataStreamSource.map((MapFunction) 
s -> {
String[] split = s.split(",");
Row row = new Row(split.length);
for (int i = 0; i < split.length; i++) {
Object value = null;
if (fieldTypes[i].equals(Types.STRING)) {
value = split[i];
}
if (fieldTypes[i].equals(Types.LONG)) {
value = Long.valueOf(split[i]);
}
if (fieldTypes[i].equals(Types.INT)) {
value = Integer.valueOf(split[i]);
}
if (fieldTypes[i].equals(Types.DOUBLE)) {
value = Double.valueOf(split[i]);
}
if (fieldTypes[i].equals(Types.SQL_TIMESTAMP)) {
value = Timestamp.valueOf(split[i]);
}
row.setField(i, value);
}
//System.out.println(row.toString());
return row;
}).returns(rowTypeInfo);
tableEnv.registerDataStream("user_click_info", stream, String.join(",", 
fieldNames) + ",www.proctime");
String sql = "select * from user_click_info";
Table table = tableEnv.sqlQuery(sql);
DataStream result = tableEnv.toAppendStream(table, Row.class);
result.print();
table.printSchema();
tableEnv.execute("Test");
{code}
result ==>

 

root
 |-- id: BIGINT
 |-- cityId: BIGINT
 |-- url: STRING
 |-- clickTime: TIMESTAMP(3)
 |-- www: TIMESTAMP(3) *PROCTIME*

 

1001,1002,adc0,{color:#FF}1899-12-31 23:54:17.0{color},2019-12-12 
03:37:18.036
1002,1003,adc1,1910-01-01 00:00:00.0,2019-12-12 03:37:18.196
1003,1004,adc2,1920-01-01 00:00:00.0,2019-12-12 03:37:18.196
1004,1005,adc3,1930-01-01 00:00:00.0,2019-12-12 03:37:18.196
1005,1006,adc4,1970-01-01 00:00:00.0,2019-12-12 03:37:18.196
,,adc5,1971-01-01 00:00:00.0,2019-12-12 03:37:18.196

without  PROCTIME attribute is OK ==>

 

root
 |-- id: BIGINT
 |-- cityId: BIGINT
 |-- url: STRING
 |-- clickTime: TIMESTAMP(3)

 

1001,1002,adc0,1900-01-01 00:00:00.0
1002,1003,adc1,1910-01-01 00:00:00.0
1003,1004,adc2,1920-01-01 00:00:00.0
1004,1005,adc3,1930-01-01 00:00:00.0
1005,1006,adc4,1970-01-01 00:00:00.0
,,adc5,1971-01-01 00:00:00.0



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] Overwrite and partition inserting support in 1.10

2019-12-11 Thread Jingsong Li
Hi Jark,

> The dialect restriction is introduced on purpose, because OVERWRITE and
PARTITION syntax are not SQL standard.

My understanding is that watermark [1] is also a non-standard grammar. We
can extend SQL standard syntax.

> Even in the discussion of FLIP-63, the community have different opinion
on whether the partition fields should be declared in schema part or not
[1]. The status of FLIP-63 is that limit the proposed syntax in Hive
dialect, and we may propose new built-in syntax for creating partition
tables in the future.

No, I'm not talking about creating partition tables. I'm talking about
inserting tables. FLIP-63 doesn't mean it's only for hive dialect. Flink
itself can need the support of partition table. I'm sure it will further
strengthen the work in 1.11.
I remember that we only had disputes on creating partition tables, so I
moved it to the chapter to be discussed, and limited it to hive dialect,
only creating partition tables, right?

> FLIP-89 proposed to make TableConfig configurable including the dialect
configuration, i.g. `table.planner.sql-dialect`. So the only thing we need
to do is introducing such a configuration.

I'm not sure we should introduce FLIP-89, and dialect has no other function
except for overwrite and partition inserting at present, I think they
should be Flink syntax.

And remove hive dialect limitation is a very small change/fix, Just remove
a few lines of check.

What do you think? (If you think so, after some discussion, we can start a
vote later.)

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-66%3A+Support+Time+Attribute+in+SQL+DDL

Best,
Jingsong Lee

On Thu, Dec 12, 2019 at 11:11 AM Rui Li  wrote:

> +1 to fix it in 1.10. If this feature doesn't work via SQL CLI, I guess it
> doesn't work for most Hive users.
> +0 to remove the dialect check. I don't see much benefit this check can
> bring to users, except that it prevents users from accidentally using some
> Hive features, which doesn't seem to be very harmful anyway. But I don't
> have a strong opinion for it and it probably needs more thorough
> discussion.
>
>
> On Thu, Dec 12, 2019 at 7:49 AM Jark Wu  wrote:
>
> > Thanks Jingsong,
> >
> > OVERWRITE and PARTITION are very fundamental features for Hive users.
> > I'm sorry to hear that it doesn't work in SQL CLI.
> >
> > > Remove hive dialect limitation for these two grammars?
> > The dialect restriction is introduced on purpose, because OVERWRITE and
> > PARTITION syntax
> > are not SQL standard. Even in the discussion of FLIP-63, the community
> > have different opinion on
> > whether the partition fields should be declared in schema part or not
> [1].
> > The status of FLIP-63 is
> > that limit the proposed syntax in Hive dialect, and we may propose new
> > built-in syntax for creating
> >  partition tables in the future. So I'm -1 to removing dialect
> limitation.
> >
> > > Should we fix this in 1.10?
> > From my point of view, the problem is that users can't switch dialects in
> > SQL CLI, because it is not
> >  exposed as a configuration. FLIP-89 [2] proposed to make TableConfig
> > configurable including the
> >  dialect configuration, i.g. `table.planner.sql-dialect`. So the only
> > thing we need to do is introducing
> > such a configuration, but this is definitely is not a *bug*. However,
> > considering it is a small change,
> > and from user perspective, I'm +1 to introducing such a configuration.
> >
> > Best,
> > Jark
> >
> > [1]:
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-63-Rework-table-partition-support-tp32770p33510.html
> > [2]:
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-89%3A+Improve+usability+of+TableConfig
> >
> > On Thu, 12 Dec 2019 at 03:24, Bowen Li  wrote:
> >
> >> Hi Jingsong,
> >>
> >> Thanks a lot for reporting this issue.
> >>
> >> IIRC, we added [INSERT OVERWRITE] and [PARTITION] clauses to support
> Hive
> >> integration before FLIP-63 was proposed to introduce generic partition
> >> support to Flink. Thus when we added these syntax, we were intentionally
> >> conservative and limited their scope to Hive dialect. @Rui may help to
> >> confirm that. I'm a bit surprised about it as well. As core APIs of
> >> FLIP-63
> >> were done, I don't see why we would limit these syntax to Hive dialect
> >> alone. It's just unfortunately that we may have forgot to revisit this
> >> topic and we apparently missed some test cases on SQL CLI side. Sorry
> for
> >> that.
> >>
> >> From a product perspective, SQL CLI is super critical for Flink-Hive
> >> integration and Flink SQL iteself. INSERT OVERWRITE and PARTITION are
> two
> >> of the most commonly used syntax in Hive, the product wouldn't be
> useable
> >> without such proper support.
> >>
> >> Thus, I think this is a *bug*, we can stop limiting them in Hive
> dialect,
> >> and should fix it in 1.10. We should also add more test coverage for SQL
> >> CLI to avoid such surprise.
> >>
> >> Cheers,
> >> Bowen
> >>

Re: [DISCUSS] Overwrite and partition inserting support in 1.10

2019-12-11 Thread Rui Li
+1 to fix it in 1.10. If this feature doesn't work via SQL CLI, I guess it
doesn't work for most Hive users.
+0 to remove the dialect check. I don't see much benefit this check can
bring to users, except that it prevents users from accidentally using some
Hive features, which doesn't seem to be very harmful anyway. But I don't
have a strong opinion for it and it probably needs more thorough discussion.


On Thu, Dec 12, 2019 at 7:49 AM Jark Wu  wrote:

> Thanks Jingsong,
>
> OVERWRITE and PARTITION are very fundamental features for Hive users.
> I'm sorry to hear that it doesn't work in SQL CLI.
>
> > Remove hive dialect limitation for these two grammars?
> The dialect restriction is introduced on purpose, because OVERWRITE and
> PARTITION syntax
> are not SQL standard. Even in the discussion of FLIP-63, the community
> have different opinion on
> whether the partition fields should be declared in schema part or not [1].
> The status of FLIP-63 is
> that limit the proposed syntax in Hive dialect, and we may propose new
> built-in syntax for creating
>  partition tables in the future. So I'm -1 to removing dialect limitation.
>
> > Should we fix this in 1.10?
> From my point of view, the problem is that users can't switch dialects in
> SQL CLI, because it is not
>  exposed as a configuration. FLIP-89 [2] proposed to make TableConfig
> configurable including the
>  dialect configuration, i.g. `table.planner.sql-dialect`. So the only
> thing we need to do is introducing
> such a configuration, but this is definitely is not a *bug*. However,
> considering it is a small change,
> and from user perspective, I'm +1 to introducing such a configuration.
>
> Best,
> Jark
>
> [1]:
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-63-Rework-table-partition-support-tp32770p33510.html
> [2]:
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-89%3A+Improve+usability+of+TableConfig
>
> On Thu, 12 Dec 2019 at 03:24, Bowen Li  wrote:
>
>> Hi Jingsong,
>>
>> Thanks a lot for reporting this issue.
>>
>> IIRC, we added [INSERT OVERWRITE] and [PARTITION] clauses to support Hive
>> integration before FLIP-63 was proposed to introduce generic partition
>> support to Flink. Thus when we added these syntax, we were intentionally
>> conservative and limited their scope to Hive dialect. @Rui may help to
>> confirm that. I'm a bit surprised about it as well. As core APIs of
>> FLIP-63
>> were done, I don't see why we would limit these syntax to Hive dialect
>> alone. It's just unfortunately that we may have forgot to revisit this
>> topic and we apparently missed some test cases on SQL CLI side. Sorry for
>> that.
>>
>> From a product perspective, SQL CLI is super critical for Flink-Hive
>> integration and Flink SQL iteself. INSERT OVERWRITE and PARTITION are two
>> of the most commonly used syntax in Hive, the product wouldn't be useable
>> without such proper support.
>>
>> Thus, I think this is a *bug*, we can stop limiting them in Hive dialect,
>> and should fix it in 1.10. We should also add more test coverage for SQL
>> CLI to avoid such surprise.
>>
>> Cheers,
>> Bowen
>>
>> On Wed, Dec 11, 2019 at 1:29 AM Jingsong Li 
>> wrote:
>>
>> > Hi Dev,
>> >
>> > After cutting out the branch of 1.10, I tried the following functions of
>> > SQL-CLI and found that it does not support:
>> > - insert overwrite
>> > - PARTITION (partcol1=val1, partcol2=val2 ...)
>> > The SQL pattern is:
>> > INSERT { INTO | OVERWRITE } TABLE tablename1 [PARTITION (partcol1=val1,
>> > partcol2=val2 ...) select_statement1 FROM from_statement;
>> > It is a surprise to me.
>> > The reason is that we only allow these two grammars in hive dialect. And
>> > SQL-CLI does not have an interface to switch dialects.
>> >
>> > Because it directly hinders the SQL-CLI's insert syntax in hive
>> integration
>> > and seriously hinders the practicability of SQL-CLI.
>> > And we have introduced these two grammars in FLIP-63 [1] to Flink.
>> > Here are my question:
>> > 1.Should we remove hive dialect limitation for these two grammars?
>> > 2.Should we fix this in 1.10?
>> >
>> > [1]
>> >
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-63%3A+Rework+table+partition+support
>> >
>> > Best,
>> > Jingsong Lee
>> >
>>
>

-- 
Best regards!
Rui Li


[jira] [Created] (FLINK-15211) Web UI request url for watermark is too long in large parallelism

2019-12-11 Thread fa zheng (Jira)
fa zheng created FLINK-15211:


 Summary: Web UI request url for watermark is too long in large 
parallelism
 Key: FLINK-15211
 URL: https://issues.apache.org/jira/browse/FLINK-15211
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Web Frontend
Affects Versions: 1.9.1, 1.9.0
Reporter: fa zheng
 Fix For: 1.10.0


In large parallelism. The request url to get watermark metric seems to too long 
 to get response.  The request url  is:

[http://0.0.0.0:8081/jobs/:jobid/vertices/:vertexid/metrics?get=0.currentInputWatermark,..,1199.currentInputWatermark]

The reason is 
org.apache.flink.shaded.netty4.io.netty.handler.codec.TooLongFrameException: An 
HTTP line is larger than 4096 bytes.

 

It need to paginate request and merge the result.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15210) Move java files in flink-sql-parser module package org.apache.calcite.sql to org.apache.flink.sql.parser.type

2019-12-11 Thread Danny Chen (Jira)
Danny Chen created FLINK-15210:
--

 Summary: Move java files in flink-sql-parser module package 
org.apache.calcite.sql to org.apache.flink.sql.parser.type
 Key: FLINK-15210
 URL: https://issues.apache.org/jira/browse/FLINK-15210
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / API
Affects Versions: 1.10.0
Reporter: Danny Chen
 Fix For: 1.11.0






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] Adding e2e tests for Flink's Mesos integration

2019-12-11 Thread Yangze Guo
Thanks for the feedback, Gary.

Regarding the WordCount test:
- True. There is no test coverage increment compared to others.
However, I think each test case better not have multiple purposes so
that we could find out the root cause quickly. As discussed in
FLINK-15135[1], I prefer only including WordCount test as the first
step. If the time overhead of E2E tests become severe in the future, I
agree to remove it. WDYT?
- I think the main overhead comes from building the image. The
subsequent tests will run fast since they will not build it again.

Regarding the Rocks test, I think it is a typical scenario using
off-heap memory. The main purpose is to verify the memory usage and
memory configuration in Mesos mode. Two typical use cases are off-heap
and on-heap. Thus, I think the following two test cases are valuable
to be included:
- A streaming task using heap backend. It should explicitly set the
“taskmanager.memory.managed.size” to zero to check the potential
unexpected usage of off-heap memory.
- A streaming task using rocks backend. It covers the scenario using
off-heap memory.

Look forward to your kind feedback.

[1]https://issues.apache.org/jira/browse/FLINK-15135

Best,
Yangze Guo



On Wed, Dec 11, 2019 at 6:14 PM Gary Yao  wrote:
>
> Thanks for driving this effort. Also +1 from my side. I have left a few
> questions below.
>
> > - Wordcount end-to-end test. For verifying the basic process of Mesos
> > deployment.
>
> Would this add additional test coverage compared to the
> "multiple submissions" test case? I am asking because the E2E tests are
> already
> expensive to run, and adding new tests should be carefully considered.
>
> > - State TTL RocksDb backend end-to-end test. For verifying memory
> > configuration behaviors, since Mesos has it’s own config options and
> > logics.
>
> Can you elaborate more on this? Which config options are relevant here?
>
> On Wed, Dec 11, 2019 at 9:58 AM Till Rohrmann  wrote:
>
> > +1 for building the image locally. If need should arise, then we could
> > change it always later.
> >
> > Cheers,
> > Till
> >
> > On Wed, Dec 11, 2019 at 4:05 AM Xintong Song 
> > wrote:
> >
> > > Thanks, Yangtze.
> > >
> > > +1 for building the image locally.
> > > The time consumption for both building image locally and pulling it from
> > > DockerHub sounds reasonable and affordable. Therefore, I'm also in favor
> > of
> > > avoiding the cost maintaining a custom image.
> > >
> > > Thank you~
> > >
> > > Xintong Song
> > >
> > >
> > >
> > > On Wed, Dec 11, 2019 at 10:11 AM Yangze Guo  wrote:
> > >
> > > > Thanks for the feedback, Yang.
> > > >
> > > > Some updates I want to share in this thread.
> > > > I have built a PoC version of Meos e2e test with WordCount
> > > > workflow.[1] Then, I ran it in the testing environment. As the result
> > > > shown here[2]:
> > > > - For pulling image from DockerHub, it took 1 minute and 21 seconds
> > > > - For building it locally, it took 2 minutes and 54 seconds.
> > > >
> > > > I prefer building it locally. Although it is slower, I think the time
> > > > overhead, comparing to the cost of maintaining the image in DockerHub
> > > > and the whole test process, is trivial for building or pulling the
> > > > image.
> > > >
> > > > I look forward to hearing from you. ;)
> > > >
> > > > Best,
> > > > Yangze Guo
> > > >
> > > > [1]
> > > >
> > >
> > https://github.com/KarmaGYZ/flink/commit/0406d942446a1b17f81d93235b21a829bf88ccf0
> > > > [2]https://travis-ci.org/KarmaGYZ/flink/jobs/623207957
> > > > Best,
> > > > Yangze Guo
> > > >
> > > > On Mon, Dec 9, 2019 at 2:39 PM Yang Wang 
> > wrote:
> > > > >
> > > > > Thanks Yangze for starting this discussion.
> > > > >
> > > > > Just share my thoughts.
> > > > >
> > > > > If the mesos official docker image could not meet our requirement, i
> > > > suggest to build the image locally.
> > > > > We have done the same things for yarn e2e tests. This way is more
> > > > flexible and easy to maintain. However,
> > > > > i have no idea how long building the mesos image locally will take.
> > > > Based on previous experience of yarn, i
> > > > > think it may not take too much time.
> > > > >
> > > > >
> > > > >
> > > > > Best,
> > > > > Yang
> > > > >
> > > > > Yangze Guo  于2019年12月7日周六 下午4:25写道:
> > > > >>
> > > > >> Thanks for your feedback!
> > > > >>
> > > > >> @Till
> > > > >> Regarding the time overhead, I think it mainly come from the network
> > > > >> transmission. For building the image locally, it will totally
> > download
> > > > >> 260MB files including the base image and packages. For pulling from
> > > > >> DockerHub, the compressed size of the image is 347MB. Thus, I agree
> > > > >> that it is ok to build the image locally.
> > > > >>
> > > > >> @Piyush
> > > > >> Thank you for offering the help and sharing your usage scenario. In
> > > > >> current stage, I think it will be really helpful if you can compress
> > > > >> the custom image[1] or reduce the time overhead to build it locally.
> 

[jira] [Created] (FLINK-15209) DDL with computed column didn't work for some of the connectors

2019-12-11 Thread Danny Chen (Jira)
Danny Chen created FLINK-15209:
--

 Summary: DDL with computed column didn't work for some of the 
connectors
 Key: FLINK-15209
 URL: https://issues.apache.org/jira/browse/FLINK-15209
 Project: Flink
  Issue Type: Bug
Reporter: Danny Chen






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] FLIP-27: Refactor Source Interface

2019-12-11 Thread Jark Wu
Hi Becket,

I think Dawid explained things clearly and makes a lot of sense.
I'm also in favor of #2, because #1 doesn't work for our future unified
envrionment.

You can see the vision in this documentation [1]. In the future, we would
like to
drop the global streaming/batch mode in SQL (i.e.
EnvironmentSettings#inStreamingMode/inBatchMode).
A source is bounded or unbounded once defined, so queries can be inferred
from source to run
in streaming or batch or hybrid mode. However, in #1, we will lose this
ability because the framework
doesn't know whether the source is bounded or unbounded.

Best,
Jark


[1]:
https://docs.google.com/document/d/1yrKXEIRATfxHJJ0K3t6wUgXAtZq8D-XgvEnvl2uUcr0/edit#heading=h.v4ib17buma1p

On Wed, 11 Dec 2019 at 20:52, Piotr Nowojski  wrote:

> Hi,
>
> Regarding the:
>
> Collection getNextRecords()
>
> I’m pretty sure such design would unfortunately impact the performance
> (accessing and potentially creating the collection on the hot path).
>
> Also the
>
> InputStatus emitNext(DataOutput output) throws Exception;
> or
> Status pollNext(SourceOutput sourceOutput) throws Exception;
>
> Gives us some opportunities in the future, to allow Source hot looping
> inside, until it receives some signal “please exit because of some reasons”
> (output collector could return such hint upon collecting the result). But
> that’s another topic outside of this FLIP’s scope.
>
> Piotrek
>
> > On 11 Dec 2019, at 10:41, Till Rohrmann  wrote:
> >
> > Hi Becket,
> >
> > quick clarification from my side because I think you misunderstood my
> > question. I did not suggest to let the SourceReader return only a single
> > record at a time when calling getNextRecords. As the return type
> indicates,
> > the method can return an arbitrary number of records.
> >
> > Cheers,
> > Till
> >
> > On Wed, Dec 11, 2019 at 10:13 AM Dawid Wysakowicz <
> dwysakow...@apache.org >
> > wrote:
> >
> >> Hi Becket,
> >>
> >> Issue #1 - Design of Source interface
> >>
> >> I mentioned the lack of a method like
> Source#createEnumerator(Boundedness
> >> boundedness, SplitEnumeratorContext context), because without the
> current
> >> proposal is not complete/does not work.
> >>
> >> If we say that boundedness is an intrinsic property of a source imo we
> >> don't need the Source#createEnumerator(Boundedness boundedness,
> >> SplitEnumeratorContext context) method.
> >>
> >> Assuming a source from my previous example:
> >>
> >> Source source = KafkaSource.builder()
> >>  ...
> >>  .untilTimestamp(...)
> >>  .build()
> >>
> >> Would the enumerator differ if created like
> >> source.createEnumerator(CONTINUOUS_UNBOUNDED, ...) vs source
> >> .createEnumerator(BOUNDED, ...)? I know I am repeating myself, but this
> is
> >> the part that my opinion differ the most from the current proposal. I
> >> really think it should always be the source that tells if it is bounded
> or
> >> not. In the current proposal methods continousSource/boundedSource
> somewhat
> >> reconfigure the source, which I think is misleading.
> >>
> >> I think a call like:
> >>
> >> Source source = KafkaSource.builder()
> >>  ...
> >>  .readContinously() / readUntilLatestOffset() / readUntilTimestamp /
> readUntilOffsets / ...
> >>  .build()
> >>
> >> is way cleaner (and expressive) than
> >>
> >> Source source = KafkaSource.builder()
> >>  ...
> >>  .build()
> >>
> >>
> >> env.continousSource(source) // which actually underneath would call
> createEnumerator(CONTINUOUS, ctx) which would be equivalent to
> source.readContinously().createEnumerator(ctx)
> >> // or
> >> env.boundedSource(source) // which actually underneath would call
> createEnumerator(BOUNDED, ctx) which would be equivalent to
> source.readUntilLatestOffset().createEnumerator(ctx)
> >>
> >>
> >> Sorry for the comparison, but to me it seems there is too much magic
> >> happening underneath those two calls.
> >>
> >> I really believe the Source interface should have getBoundedness method
> >> instead of (supportBoundedness) + createEnumerator(Boundedness, ...)
> >>
> >>
> >> Issue #2 - Design of
> >> ExecutionEnvironment#source()/continuousSource()/boundedSource()
> >>
> >> As you might have guessed I am slightly in favor of option #2 modified.
> >> Yes I am aware every step of the dag would have to be able to say if it
> is
> >> bounded or not. I have a feeling it would be easier to express cross
> >> bounded/unbounded operations, but I must admit I have not thought it
> >> through thoroughly, In the spirit of batch is just a special case of
> >> streaming I thought BoundedStream would extend from DataStream. Correct
> me
> >> if I am wrong. In such a setup the cross bounded/unbounded operation
> could
> >> be expressed quite easily I think:
> >>
> >> DataStream {
> >>  DataStream join(DataStream, ...); // we could not really tell if the
> result is bounded or not, but because bounded stream is a special case of
> unbounded the API object is correct, irrespective if 

[jira] [Created] (FLINK-15208) support client to submit both an online streaming job and an offline batch job based on dynamic catalog table

2019-12-11 Thread Bowen Li (Jira)
Bowen Li created FLINK-15208:


 Summary: support client to submit both an online streaming job and 
an offline batch job based on dynamic catalog table
 Key: FLINK-15208
 URL: https://issues.apache.org/jira/browse/FLINK-15208
 Project: Flink
  Issue Type: New Feature
  Components: Table SQL / API, Table SQL / Client
Reporter: Bowen Li
Assignee: Bowen Li


with dynamic catalog table in FLINK-15206, users can maintain a single SQL job 
for both their online and offline job. However, they still need to change their 
configurations in order to submit different jobs over time.

E.g. when users update logic of their streaming job, they need to bootstrap 
both a new online job and backfill offline job, let's call them sub-jobs of a 
job with dynamic catalog table. They would have to 
1) manually change execution mode in yaml config to "streaming" and submit the 
streaming job 
2) manually change execution mode in yaml config to "batch" and submit the 
batch job

we should introduce a mechanism to allow users submit all or a subset of 
sub-jobs all at once. In the backfill use case mentioned above, ideally users 
should just execute the SQL once, and Flink should spin up two jobs for our 
users. 

Streaming platform at some big companies like Uber and Netflix are already kind 
of doing this for backfill use cases one way or another - some do it in UI, 
some do it in planning phase. Would be great to standardize this practice and 
provide users with ultimate simplicity.

The assumption here is that users are fully aware of the consequences of 
launching two/multiple jobs at the same time. E.g. they need to handle 
overlapped results if there's any, 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15207) japicmp reference version is stale

2019-12-11 Thread Zili Chen (Jira)
Zili Chen created FLINK-15207:
-

 Summary: japicmp reference version is stale
 Key: FLINK-15207
 URL: https://issues.apache.org/jira/browse/FLINK-15207
 Project: Flink
  Issue Type: Bug
  Components: Build System
Reporter: Zili Chen


{{jamicmp.referenceVersion}} property is still {{1.8.0}}. Given now we're in a 
releasing process I'm not sure what to do with this. Besides, there is a typo 
in the property.

cc [~chesnay]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15206) support dynamic catalog table for unified SQL job

2019-12-11 Thread Bowen Li (Jira)
Bowen Li created FLINK-15206:


 Summary: support dynamic catalog table for unified SQL job
 Key: FLINK-15206
 URL: https://issues.apache.org/jira/browse/FLINK-15206
 Project: Flink
  Issue Type: New Feature
  Components: Table SQL / API
Reporter: Bowen Li
Assignee: Bowen Li
 Fix For: 1.11.0


currently if users have both an online and an offline job with same business 
logic in Flink SQL, their codebase is still not unified. They would keep two 
SQL statements whose only difference is the source (or/and sink) table. E.g.


{code:java}
// online job
insert into x select * from kafka_table;

// offline backfill job
insert into x select * from hive_table;
{code}

We would like to introduce a "dynamic catalog table". The dynamic catalog table 
acts as a view, and is just an abstract source from actual sources behind it 
under with configurations. When execute a job, depending on the configuration, 
the dynamic catalog table can point to an actual source table.

A use case for this is the example given above - users want to just keep one 
sql statement as {{insert into x select * from my_source_dynamic_table);}}; 
when executed in streaming mode, the {{my_source_dynamic_table}} should point 
to a kafka catalog table, and in batch mode, the {{my_source_dynamic_table}} 
should point to a hive catalog table.




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] Overwrite and partition inserting support in 1.10

2019-12-11 Thread Jark Wu
Thanks Jingsong,

OVERWRITE and PARTITION are very fundamental features for Hive users.
I'm sorry to hear that it doesn't work in SQL CLI.

> Remove hive dialect limitation for these two grammars?
The dialect restriction is introduced on purpose, because OVERWRITE and
PARTITION syntax
are not SQL standard. Even in the discussion of FLIP-63, the community have
different opinion on
whether the partition fields should be declared in schema part or not [1].
The status of FLIP-63 is
that limit the proposed syntax in Hive dialect, and we may propose new
built-in syntax for creating
 partition tables in the future. So I'm -1 to removing dialect limitation.

> Should we fix this in 1.10?
>From my point of view, the problem is that users can't switch dialects in
SQL CLI, because it is not
 exposed as a configuration. FLIP-89 [2] proposed to make TableConfig
configurable including the
 dialect configuration, i.g. `table.planner.sql-dialect`. So the only thing
we need to do is introducing
such a configuration, but this is definitely is not a *bug*. However,
considering it is a small change,
and from user perspective, I'm +1 to introducing such a configuration.

Best,
Jark

[1]:
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-63-Rework-table-partition-support-tp32770p33510.html
[2]:
https://cwiki.apache.org/confluence/display/FLINK/FLIP-89%3A+Improve+usability+of+TableConfig

On Thu, 12 Dec 2019 at 03:24, Bowen Li  wrote:

> Hi Jingsong,
>
> Thanks a lot for reporting this issue.
>
> IIRC, we added [INSERT OVERWRITE] and [PARTITION] clauses to support Hive
> integration before FLIP-63 was proposed to introduce generic partition
> support to Flink. Thus when we added these syntax, we were intentionally
> conservative and limited their scope to Hive dialect. @Rui may help to
> confirm that. I'm a bit surprised about it as well. As core APIs of FLIP-63
> were done, I don't see why we would limit these syntax to Hive dialect
> alone. It's just unfortunately that we may have forgot to revisit this
> topic and we apparently missed some test cases on SQL CLI side. Sorry for
> that.
>
> From a product perspective, SQL CLI is super critical for Flink-Hive
> integration and Flink SQL iteself. INSERT OVERWRITE and PARTITION are two
> of the most commonly used syntax in Hive, the product wouldn't be useable
> without such proper support.
>
> Thus, I think this is a *bug*, we can stop limiting them in Hive dialect,
> and should fix it in 1.10. We should also add more test coverage for SQL
> CLI to avoid such surprise.
>
> Cheers,
> Bowen
>
> On Wed, Dec 11, 2019 at 1:29 AM Jingsong Li 
> wrote:
>
> > Hi Dev,
> >
> > After cutting out the branch of 1.10, I tried the following functions of
> > SQL-CLI and found that it does not support:
> > - insert overwrite
> > - PARTITION (partcol1=val1, partcol2=val2 ...)
> > The SQL pattern is:
> > INSERT { INTO | OVERWRITE } TABLE tablename1 [PARTITION (partcol1=val1,
> > partcol2=val2 ...) select_statement1 FROM from_statement;
> > It is a surprise to me.
> > The reason is that we only allow these two grammars in hive dialect. And
> > SQL-CLI does not have an interface to switch dialects.
> >
> > Because it directly hinders the SQL-CLI's insert syntax in hive
> integration
> > and seriously hinders the practicability of SQL-CLI.
> > And we have introduced these two grammars in FLIP-63 [1] to Flink.
> > Here are my question:
> > 1.Should we remove hive dialect limitation for these two grammars?
> > 2.Should we fix this in 1.10?
> >
> > [1]
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-63%3A+Rework+table+partition+support
> >
> > Best,
> > Jingsong Lee
> >
>


[jira] [Created] (FLINK-15205) add doc and exmaple of INSERT OVERWRITE and insert into partitioned table for Hive connector

2019-12-11 Thread Bowen Li (Jira)
Bowen Li created FLINK-15205:


 Summary: add doc and exmaple of INSERT OVERWRITE and insert into 
partitioned table for Hive connector
 Key: FLINK-15205
 URL: https://issues.apache.org/jira/browse/FLINK-15205
 Project: Flink
  Issue Type: Task
  Components: Connectors / Hive, Documentation
Reporter: Bowen Li
Assignee: Rui Li
 Fix For: 1.10.0






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15204) add documentation for Flink-Hive timestamp conversions in table and udf

2019-12-11 Thread Bowen Li (Jira)
Bowen Li created FLINK-15204:


 Summary: add documentation for Flink-Hive timestamp conversions in 
table and udf
 Key: FLINK-15204
 URL: https://issues.apache.org/jira/browse/FLINK-15204
 Project: Flink
  Issue Type: Task
  Components: Connectors / Hive, Documentation
Reporter: Bowen Li
Assignee: Rui Li
 Fix For: 1.10.0






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15203) rephrase Hive's data types doc

2019-12-11 Thread Bowen Li (Jira)
Bowen Li created FLINK-15203:


 Summary: rephrase Hive's data types doc
 Key: FLINK-15203
 URL: https://issues.apache.org/jira/browse/FLINK-15203
 Project: Flink
  Issue Type: Task
  Components: Documentation
Reporter: Bowen Li
Assignee: Bowen Li
 Fix For: 1.10.0






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] Overwrite and partition inserting support in 1.10

2019-12-11 Thread Bowen Li
Hi Jingsong,

Thanks a lot for reporting this issue.

IIRC, we added [INSERT OVERWRITE] and [PARTITION] clauses to support Hive
integration before FLIP-63 was proposed to introduce generic partition
support to Flink. Thus when we added these syntax, we were intentionally
conservative and limited their scope to Hive dialect. @Rui may help to
confirm that. I'm a bit surprised about it as well. As core APIs of FLIP-63
were done, I don't see why we would limit these syntax to Hive dialect
alone. It's just unfortunately that we may have forgot to revisit this
topic and we apparently missed some test cases on SQL CLI side. Sorry for
that.

>From a product perspective, SQL CLI is super critical for Flink-Hive
integration and Flink SQL iteself. INSERT OVERWRITE and PARTITION are two
of the most commonly used syntax in Hive, the product wouldn't be useable
without such proper support.

Thus, I think this is a *bug*, we can stop limiting them in Hive dialect,
and should fix it in 1.10. We should also add more test coverage for SQL
CLI to avoid such surprise.

Cheers,
Bowen

On Wed, Dec 11, 2019 at 1:29 AM Jingsong Li  wrote:

> Hi Dev,
>
> After cutting out the branch of 1.10, I tried the following functions of
> SQL-CLI and found that it does not support:
> - insert overwrite
> - PARTITION (partcol1=val1, partcol2=val2 ...)
> The SQL pattern is:
> INSERT { INTO | OVERWRITE } TABLE tablename1 [PARTITION (partcol1=val1,
> partcol2=val2 ...) select_statement1 FROM from_statement;
> It is a surprise to me.
> The reason is that we only allow these two grammars in hive dialect. And
> SQL-CLI does not have an interface to switch dialects.
>
> Because it directly hinders the SQL-CLI's insert syntax in hive integration
> and seriously hinders the practicability of SQL-CLI.
> And we have introduced these two grammars in FLIP-63 [1] to Flink.
> Here are my question:
> 1.Should we remove hive dialect limitation for these two grammars?
> 2.Should we fix this in 1.10?
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-63%3A+Rework+table+partition+support
>
> Best,
> Jingsong Lee
>


[jira] [Created] (FLINK-15202) Increment metric when Interval Join record is late

2019-12-11 Thread Chris Gillespie (Jira)
Chris Gillespie created FLINK-15202:
---

 Summary: Increment metric when Interval Join record is late
 Key: FLINK-15202
 URL: https://issues.apache.org/jira/browse/FLINK-15202
 Project: Flink
  Issue Type: Improvement
  Components: API / DataStream
Reporter: Chris Gillespie


Currently an interval join will silently drop late records. I think it would be 
quite useful to have the metric *numLateRecordsDropped* incremented when this 
happens. Window Join already does this, so it would make sense to have it 
happen for both types of joins.


Mailing List Idea: 
[https://lists.apache.org/thread.html/f463d81db3777a529d1764723189a57c33d601a6c142441fb5036ff9%40%3Cuser.flink.apache.org%3E]



Window Join:
[https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L424]

Interval Join:

[https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java#L222

]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15201) Remove verifications in detach execution

2019-12-11 Thread Zili Chen (Jira)
Zili Chen created FLINK-15201:
-

 Summary: Remove verifications in detach execution
 Key: FLINK-15201
 URL: https://issues.apache.org/jira/browse/FLINK-15201
 Project: Flink
  Issue Type: Improvement
  Components: Client / Job Submission
Reporter: Zili Chen
Assignee: Zili Chen
 Fix For: 1.10.0


>From [~aljoscha]: I think we actually don't need these "verifications" 
>anymore, with the new architecture where the Executor is called inside the 
>execute() method and where we don't actually "hijack" the method anymore, we 
>can actually have multiple execute() calls. We can address that in a 
>follow-up, though.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15200) legacy planner cannot deal Type with precision like DataTypes.TIMESTAMP(3) in TableSourceUtil

2019-12-11 Thread Leonard Xu (Jira)
Leonard Xu created FLINK-15200:
--

 Summary: legacy planner cannot  deal Type with precision like 
DataTypes.TIMESTAMP(3) in TableSourceUtil
 Key: FLINK-15200
 URL: https://issues.apache.org/jira/browse/FLINK-15200
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Legacy Planner
Affects Versions: 1.10.0
Reporter: Leonard Xu
 Fix For: 1.10.0


org.apache.flink.table.client.gateway.SqlExecutionException: Invalid SQL update 
statement.
 at 
org.apache.flink.table.client.gateway.local.LocalExecutor.applyUpdate(LocalExecutor.java:684)
 at 
org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdateInternal(LocalExecutor.java:576)
 at 
org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdate(LocalExecutor.java:527)
 at 
org.apache.flink.table.client.cli.CliClient.callInsertInto(CliClient.java:535)
 at 
org.apache.flink.table.client.cli.CliClient.lambda$submitUpdate$0(CliClient.java:231)
 at java.util.Optional.map(Optional.java:215)
 at org.apache.flink.table.client.cli.CliClient.submitUpdate(CliClient.java:228)
 at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:129)
 at org.apache.flink.table.client.SqlClient.start(SqlClient.java:104)
 at org.apache.flink.table.client.SqlClient.main(SqlClient.java:178)
Caused by: java.lang.RuntimeException: Error while applying rule 
PushProjectIntoTableSourceScanRule, args 
[rel#88:FlinkLogicalCalc.LOGICAL(input=RelSubset#87,expr#0..2=\{inputs},expr#3=IS
 NOT NULL($t1),user=$t1,rowtime=$t0,$condition=$t3), 
Scan(table:[default_catalog, default_database, JsonSourceTable], 
fields:(rowtime, user, event), source:KafkaTableSource(rowtime, user, event))]
 at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:235)
 at 
org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:631)
 at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:327)
 at org.apache.flink.table.plan.Optimizer.runVolcanoPlanner(Optimizer.scala:280)
 at 
org.apache.flink.table.plan.Optimizer.optimizeLogicalPlan(Optimizer.scala:199)
 at 
org.apache.flink.table.plan.StreamOptimizer.optimize(StreamOptimizer.scala:66)
 at 
org.apache.flink.table.planner.StreamPlanner.writeToUpsertSink(StreamPlanner.scala:350)
 at 
org.apache.flink.table.planner.StreamPlanner.org$apache$flink$table$planner$StreamPlanner$$writeToSink(StreamPlanner.scala:278)
 at 
org.apache.flink.table.planner.StreamPlanner$$anonfun$2.apply(StreamPlanner.scala:166)
 at 
org.apache.flink.table.planner.StreamPlanner$$anonfun$2.apply(StreamPlanner.scala:145)
 at scala.Option.map(Option.scala:146)
 at 
org.apache.flink.table.planner.StreamPlanner.org$apache$flink$table$planner$StreamPlanner$$translate(StreamPlanner.scala:145)
 at 
org.apache.flink.table.planner.StreamPlanner$$anonfun$translate$1.apply(StreamPlanner.scala:117)
 at 
org.apache.flink.table.planner.StreamPlanner$$anonfun$translate$1.apply(StreamPlanner.scala:117)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at scala.collection.Iterator$class.foreach(Iterator.scala:891)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
 at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
 at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
 at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
 at scala.collection.AbstractTraversable.map(Traversable.scala:104)
 at 
org.apache.flink.table.planner.StreamPlanner.translate(StreamPlanner.scala:117)
 at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:661)
 at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:482)
 at 
org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.sqlUpdate(StreamTableEnvironmentImpl.java:331)
 at 
org.apache.flink.table.client.gateway.local.LocalExecutor.lambda$applyUpdate$14(LocalExecutor.java:676)
 at 
org.apache.flink.table.client.gateway.local.ExecutionContext.wrapClassLoader(ExecutionContext.java:220)
 at 
org.apache.flink.table.client.gateway.local.LocalExecutor.applyUpdate(LocalExecutor.java:674)
 ... 9 more
Caused by: org.apache.flink.table.api.ValidationException: Rowtime field 
'rowtime' has invalid type LocalDateTime. Rowtime attributes must be of type 
Timestamp.
 at 
org.apache.flink.table.sources.TableSourceUtil$$anonfun$3.apply(TableSourceUtil.scala:114)
 at 
org.apache.flink.table.sources.TableSourceUtil$$anonfun$3.apply(TableSourceUtil.scala:92)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at 

Re: [RESULT] [VOTE] Release 1.8.3, release candidate #3

2019-12-11 Thread Patrick Lucas
Thanks, Hequn!

The Dockerfiles for the Flink images on Docker Hub for the 1.8.3 release
are prepared[1] and I'll open a pull request upstream[2] once the release
announcement has gone out.

And stay tuned: I'm working on a proposal for integrating the publication
of these Docker images into the Flink release process and will send it out
to the dev list before or shortly after the holidays.

[1]
https://github.com/docker-flink/docker-flink/commit/4d85b71b7cf9fa4a38f8682ed13aa0f55445e32e
[2] https://github.com/docker-library/official-images/

On Wed, Dec 11, 2019 at 3:30 AM Hequn Cheng  wrote:

> Hi everyone,
>
> I'm happy to announce that we have unanimously approved this release.
>
> There are 10 approving votes, 3 of which are binding:
> * Jincheng (binding)
> * Ufuk (binding)
> * Till (binding)
> * Jingsong
> * Fabian
> * Danny
> * Yang Wang
> * Dian
> * Wei
> * Hequn
>
> There are no disapproving votes.
>
> Thanks everyone!
>


Re: [DISCUSS] Need feedback on Azure-based build system

2019-12-11 Thread Robert Metzger
Some comments on Chesnay's message:
- Changing the number of splits will not reduce the complexity.
- One can also use the Flink build machines by opening a PR to the
"flink-ci/flink" repo, no need to open crappy PRs :)
- On the number of builds being run: We currently use 4 out of 10 machines
offered by Alibaba, and we are not yet hitting any limits. In addition to
that, another big cloud provider has reached out to us, offering build
capacity.

But generally, I agree that solely relying on the build infrastructure of
Flink is not a good option. The free Azure builds should provide a
reasonable experience.


On Wed, Dec 11, 2019 at 3:22 PM Chesnay Schepler  wrote:

> Note that for B it's not strictly necessary to maintain the current
> number of splits; 2 might already be enough to bring contributor builds
> to a more reasonable level.
>
> I don't think that a contributor build taking 3,5h is a viable option;
> people will start disregarding their own instance and just open a PR
> without having run the tests, which will naturally mean that PR quality
> will drop. Committers probably will start working around this and push
> branches into the flink repo for running tests; we have seen that in the
> past and see this currently for e2e tests.
>
> This will increase the number of builds being run on the Flink machines
> by quite a bit, obviously affecting throughput and latency..
>
> On 11/12/2019 14:59, Arvid Heise wrote:
> > Hi Robert,
> >
> > thank you very much for raising this issue and improving the build
> system.
> >
> > For now, I'd like to stick to a lean solution (= option A).
> >
> > While option B can greatly reduce build times, it also has the habit of
> > clogging up the build machines. Just some arbitrary numbers, but it
> > currently feels like B cuts down latency by half but also uses 10
> machines
> > for 30 minutes, decreasing the overall throughput significantly. Thus,
> when
> > many folks want to see their commits tested, resources quickly run out
> and
> > this in turn significantly increases latency.
> > I'd like to have some more predictable build times and sacrifice some
> > latency for now.
> >
> > It would be interesting to see if we could rearrange the project
> execution
> > in Maven, such that fast projects are executed first. E2E tests should be
> > executed last, which they are somewhat, because of the project
> dependencies.
> >
> > Of course, I'm very interested to improve the overall build experience by
> > exploring other options to Maven.
> >
> > Best,
> >
> > Arvid
> >
> > On Wed, Dec 11, 2019 at 2:32 PM Robert Metzger 
> wrote:
> >
> >> Hey devs,
> >>
> >> I need your opinion on something: As part of our migration from Travis
> to
> >> Azure, I'm revisiting the build system of Flink. I currently see two
> >> different ways of proceeding, and I would like to know your opinion on
> the
> >> two options.
> >>
> >> A) We build and test Flink in one "mvn clean verify" call on the CI
> system.
> >> B) We migrate the two staged build of one compile and N test jobs to
> Azure.
> >>
> >> Option A) is what we are currently running as part of testing the
> >> Azure-based system.
> >>
> >> Pro/Cons for A)
> >> + for "apache/flink" pushes and pull requests, the big testing machines
> >> need 1:30 hours to complete (this might go up for a few minutes because
> the
> >> python tests, and some auxiliary tests are not executed yet)
> >> + Our build will be easier to maintain and understand, because we rely
> on
> >> fewer scripts
> >> - builds on Flink forks, using the free Azure plan currently take 3:30
> >> hours to complete.
> >>
> >> Pro/Cons for B)
> >> + builds on Flink forks using the free Azure plan take 1:20 hours,
> >> + Builds take 1:20 hours on the big testing machines
> >> - maintenance and complexity of the build scripts
> >> - the build times are a lot less predictable, because they depend on the
> >> availability of workers. For the free plan builds, they are currently
> fast,
> >> because the test stage has 10 jobs, and Azure offers 10 parallel
> workers.
> >> We currently only have a total of 8 big machines, so there will always
> be
> >> some queueing. In practice, for the "apache/flink" repo, build times
> will
> >> be less favorable, because of the scheduling.
> >>
> >>
> >> In my opinion, the question is mostly: Are you okay to wait 3.5 hours
> for a
> >> build to finish on your private CI, in favor of a less complex build
> >> system?
> >> Ideally, we'll be able to reduce these 3.5 hours by using a more modern
> >> build tool ("gradle") in the future.
> >>
> >> I'm happy to hear your thoughts!
> >>
> >> Best,
> >> Robert
> >>
>
>


[jira] [Created] (FLINK-15199) Benchmarks are not compiling

2019-12-11 Thread Piotr Nowojski (Jira)
Piotr Nowojski created FLINK-15199:
--

 Summary: Benchmarks are not compiling
 Key: FLINK-15199
 URL: https://issues.apache.org/jira/browse/FLINK-15199
 Project: Flink
  Issue Type: Bug
  Components: Benchmarks
Affects Versions: 1.10.0
Reporter: Piotr Nowojski
Assignee: Piotr Nowojski
 Fix For: 1.10.0



{noformat}
[ERROR] COMPILATION ERROR : 
[INFO] -
[ERROR] 
/home/jenkins/workspace/flink-master-benchmarks/flink-benchmarks/src/main/java/org/apache/flink/state/benchmark/BackendUtils.java:[61,57]
 cannot infer type arguments for 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder<>
[INFO] 1 error
[INFO] -
[INFO] 
[INFO] BUILD FAILURE
[INFO] 
[INFO] Total time: 2.635 s
[INFO] Finished at: 2019-12-11T14:58:37+01:00
[INFO] Final Memory: 31M/751M
[INFO] 
[ERROR] Failed to execute goal 
org.apache.maven.plugins:maven-compiler-plugin:3.1:compile (default-compile) on 
project flink-hackathon-benchmarks: Compilation failure
[ERROR] 
/home/jenkins/workspace/flink-master-benchmarks/flink-benchmarks/src/main/java/org/apache/flink/state/benchmark/BackendUtils.java:[61,57]
 cannot infer type arguments for 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder<>
{noformat}




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] Register user jar files in {Stream}ExecutionEnvironment

2019-12-11 Thread Jark Wu
I think configuration "pipeline.jars" [1] works for you, because SQL Client
supports --jars to load user jars and it also uses this option internally.

But I'm not expert on this, maybe Kostas and Aljoscha can give
a definitive answer.

Best,
Jark

[1]:
https://ci.apache.org/projects/flink/flink-docs-master/ops/config.html#pipeline-jars

On Wed, 11 Dec 2019 at 15:41, Jingsong Li  wrote:

> Hi Leo,
>
> I think run job with external jars is important too.
> Have you took a look to PipelineOptions.JARS in configuration?
> I think this is a way to set external jars. And SQL-CLI need it too.
>
> Best,
> Jingsong Lee
>
> On Wed, Dec 11, 2019 at 9:18 AM 50man  wrote:
>
> > Hi everyone,
> >
> >
> > I propose an important API feature to register user jar files in
> > {Stream}ExecutionEnvironment.
> >
> > The proposal is here,
> >
> > https://issues.apache.org/jira/browse/FLINK-14319
> >
> > Let's discuss it and I hope we can acquire a consent ASAP cause this
> > feature is potentially helpful for some release issues about version
> > 1.10 on sub-task FLINK-14055
> >  under task
> > FLINK-10232  ;-).
> >
> >
> > Best,
> >
> > Leo
> >
> >
>
> --
> Best, Jingsong Lee
>


Re: [DISCUSS] Need feedback on Azure-based build system

2019-12-11 Thread Chesnay Schepler
Note that for B it's not strictly necessary to maintain the current 
number of splits; 2 might already be enough to bring contributor builds 
to a more reasonable level.


I don't think that a contributor build taking 3,5h is a viable option; 
people will start disregarding their own instance and just open a PR 
without having run the tests, which will naturally mean that PR quality 
will drop. Committers probably will start working around this and push 
branches into the flink repo for running tests; we have seen that in the 
past and see this currently for e2e tests.


This will increase the number of builds being run on the Flink machines 
by quite a bit, obviously affecting throughput and latency..


On 11/12/2019 14:59, Arvid Heise wrote:

Hi Robert,

thank you very much for raising this issue and improving the build system.

For now, I'd like to stick to a lean solution (= option A).

While option B can greatly reduce build times, it also has the habit of
clogging up the build machines. Just some arbitrary numbers, but it
currently feels like B cuts down latency by half but also uses 10 machines
for 30 minutes, decreasing the overall throughput significantly. Thus, when
many folks want to see their commits tested, resources quickly run out and
this in turn significantly increases latency.
I'd like to have some more predictable build times and sacrifice some
latency for now.

It would be interesting to see if we could rearrange the project execution
in Maven, such that fast projects are executed first. E2E tests should be
executed last, which they are somewhat, because of the project dependencies.

Of course, I'm very interested to improve the overall build experience by
exploring other options to Maven.

Best,

Arvid

On Wed, Dec 11, 2019 at 2:32 PM Robert Metzger  wrote:


Hey devs,

I need your opinion on something: As part of our migration from Travis to
Azure, I'm revisiting the build system of Flink. I currently see two
different ways of proceeding, and I would like to know your opinion on the
two options.

A) We build and test Flink in one "mvn clean verify" call on the CI system.
B) We migrate the two staged build of one compile and N test jobs to Azure.

Option A) is what we are currently running as part of testing the
Azure-based system.

Pro/Cons for A)
+ for "apache/flink" pushes and pull requests, the big testing machines
need 1:30 hours to complete (this might go up for a few minutes because the
python tests, and some auxiliary tests are not executed yet)
+ Our build will be easier to maintain and understand, because we rely on
fewer scripts
- builds on Flink forks, using the free Azure plan currently take 3:30
hours to complete.

Pro/Cons for B)
+ builds on Flink forks using the free Azure plan take 1:20 hours,
+ Builds take 1:20 hours on the big testing machines
- maintenance and complexity of the build scripts
- the build times are a lot less predictable, because they depend on the
availability of workers. For the free plan builds, they are currently fast,
because the test stage has 10 jobs, and Azure offers 10 parallel workers.
We currently only have a total of 8 big machines, so there will always be
some queueing. In practice, for the "apache/flink" repo, build times will
be less favorable, because of the scheduling.


In my opinion, the question is mostly: Are you okay to wait 3.5 hours for a
build to finish on your private CI, in favor of a less complex build
system?
Ideally, we'll be able to reduce these 3.5 hours by using a more modern
build tool ("gradle") in the future.

I'm happy to hear your thoughts!

Best,
Robert





Re: [DISCUSS] Need feedback on Azure-based build system

2019-12-11 Thread Arvid Heise
Hi Robert,

thank you very much for raising this issue and improving the build system.

For now, I'd like to stick to a lean solution (= option A).

While option B can greatly reduce build times, it also has the habit of
clogging up the build machines. Just some arbitrary numbers, but it
currently feels like B cuts down latency by half but also uses 10 machines
for 30 minutes, decreasing the overall throughput significantly. Thus, when
many folks want to see their commits tested, resources quickly run out and
this in turn significantly increases latency.
I'd like to have some more predictable build times and sacrifice some
latency for now.

It would be interesting to see if we could rearrange the project execution
in Maven, such that fast projects are executed first. E2E tests should be
executed last, which they are somewhat, because of the project dependencies.

Of course, I'm very interested to improve the overall build experience by
exploring other options to Maven.

Best,

Arvid

On Wed, Dec 11, 2019 at 2:32 PM Robert Metzger  wrote:

> Hey devs,
>
> I need your opinion on something: As part of our migration from Travis to
> Azure, I'm revisiting the build system of Flink. I currently see two
> different ways of proceeding, and I would like to know your opinion on the
> two options.
>
> A) We build and test Flink in one "mvn clean verify" call on the CI system.
> B) We migrate the two staged build of one compile and N test jobs to Azure.
>
> Option A) is what we are currently running as part of testing the
> Azure-based system.
>
> Pro/Cons for A)
> + for "apache/flink" pushes and pull requests, the big testing machines
> need 1:30 hours to complete (this might go up for a few minutes because the
> python tests, and some auxiliary tests are not executed yet)
> + Our build will be easier to maintain and understand, because we rely on
> fewer scripts
> - builds on Flink forks, using the free Azure plan currently take 3:30
> hours to complete.
>
> Pro/Cons for B)
> + builds on Flink forks using the free Azure plan take 1:20 hours,
> + Builds take 1:20 hours on the big testing machines
> - maintenance and complexity of the build scripts
> - the build times are a lot less predictable, because they depend on the
> availability of workers. For the free plan builds, they are currently fast,
> because the test stage has 10 jobs, and Azure offers 10 parallel workers.
> We currently only have a total of 8 big machines, so there will always be
> some queueing. In practice, for the "apache/flink" repo, build times will
> be less favorable, because of the scheduling.
>
>
> In my opinion, the question is mostly: Are you okay to wait 3.5 hours for a
> build to finish on your private CI, in favor of a less complex build
> system?
> Ideally, we'll be able to reduce these 3.5 hours by using a more modern
> build tool ("gradle") in the future.
>
> I'm happy to hear your thoughts!
>
> Best,
> Robert
>


[DISCUSS] Need feedback on Azure-based build system

2019-12-11 Thread Robert Metzger
Hey devs,

I need your opinion on something: As part of our migration from Travis to
Azure, I'm revisiting the build system of Flink. I currently see two
different ways of proceeding, and I would like to know your opinion on the
two options.

A) We build and test Flink in one "mvn clean verify" call on the CI system.
B) We migrate the two staged build of one compile and N test jobs to Azure.

Option A) is what we are currently running as part of testing the
Azure-based system.

Pro/Cons for A)
+ for "apache/flink" pushes and pull requests, the big testing machines
need 1:30 hours to complete (this might go up for a few minutes because the
python tests, and some auxiliary tests are not executed yet)
+ Our build will be easier to maintain and understand, because we rely on
fewer scripts
- builds on Flink forks, using the free Azure plan currently take 3:30
hours to complete.

Pro/Cons for B)
+ builds on Flink forks using the free Azure plan take 1:20 hours,
+ Builds take 1:20 hours on the big testing machines
- maintenance and complexity of the build scripts
- the build times are a lot less predictable, because they depend on the
availability of workers. For the free plan builds, they are currently fast,
because the test stage has 10 jobs, and Azure offers 10 parallel workers.
We currently only have a total of 8 big machines, so there will always be
some queueing. In practice, for the "apache/flink" repo, build times will
be less favorable, because of the scheduling.


In my opinion, the question is mostly: Are you okay to wait 3.5 hours for a
build to finish on your private CI, in favor of a less complex build system?
Ideally, we'll be able to reduce these 3.5 hours by using a more modern
build tool ("gradle") in the future.

I'm happy to hear your thoughts!

Best,
Robert


[jira] [Created] (FLINK-15198) Remove deprecated mesos.resourcemanager.tasks.mem in 1.11

2019-12-11 Thread Andrey Zagrebin (Jira)
Andrey Zagrebin created FLINK-15198:
---

 Summary: Remove deprecated mesos.resourcemanager.tasks.mem in 1.11
 Key: FLINK-15198
 URL: https://issues.apache.org/jira/browse/FLINK-15198
 Project: Flink
  Issue Type: Task
  Components: Deployment / Mesos
Affects Versions: 1.11.0
Reporter: Andrey Zagrebin
 Fix For: 1.11.0


In FLINK-15082, we deprecated 'mesos.resourcemanager.tasks.mem' in favour of 
the new unified option 'taskmanager.memory.total-process.size' from FLIP-49. We 
should remove it now in 1.11.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] FLIP-27: Refactor Source Interface

2019-12-11 Thread Piotr Nowojski
Hi,

Regarding the:

Collection getNextRecords()

I’m pretty sure such design would unfortunately impact the performance 
(accessing and potentially creating the collection on the hot path).

Also the

InputStatus emitNext(DataOutput output) throws Exception;
or
Status pollNext(SourceOutput sourceOutput) throws Exception;

Gives us some opportunities in the future, to allow Source hot looping inside, 
until it receives some signal “please exit because of some reasons” (output 
collector could return such hint upon collecting the result). But that’s 
another topic outside of this FLIP’s scope.

Piotrek

> On 11 Dec 2019, at 10:41, Till Rohrmann  wrote:
> 
> Hi Becket,
> 
> quick clarification from my side because I think you misunderstood my
> question. I did not suggest to let the SourceReader return only a single
> record at a time when calling getNextRecords. As the return type indicates,
> the method can return an arbitrary number of records.
> 
> Cheers,
> Till
> 
> On Wed, Dec 11, 2019 at 10:13 AM Dawid Wysakowicz  >
> wrote:
> 
>> Hi Becket,
>> 
>> Issue #1 - Design of Source interface
>> 
>> I mentioned the lack of a method like Source#createEnumerator(Boundedness
>> boundedness, SplitEnumeratorContext context), because without the current
>> proposal is not complete/does not work.
>> 
>> If we say that boundedness is an intrinsic property of a source imo we
>> don't need the Source#createEnumerator(Boundedness boundedness,
>> SplitEnumeratorContext context) method.
>> 
>> Assuming a source from my previous example:
>> 
>> Source source = KafkaSource.builder()
>>  ...
>>  .untilTimestamp(...)
>>  .build()
>> 
>> Would the enumerator differ if created like
>> source.createEnumerator(CONTINUOUS_UNBOUNDED, ...) vs source
>> .createEnumerator(BOUNDED, ...)? I know I am repeating myself, but this is
>> the part that my opinion differ the most from the current proposal. I
>> really think it should always be the source that tells if it is bounded or
>> not. In the current proposal methods continousSource/boundedSource somewhat
>> reconfigure the source, which I think is misleading.
>> 
>> I think a call like:
>> 
>> Source source = KafkaSource.builder()
>>  ...
>>  .readContinously() / readUntilLatestOffset() / readUntilTimestamp / 
>> readUntilOffsets / ...
>>  .build()
>> 
>> is way cleaner (and expressive) than
>> 
>> Source source = KafkaSource.builder()
>>  ...
>>  .build()
>> 
>> 
>> env.continousSource(source) // which actually underneath would call 
>> createEnumerator(CONTINUOUS, ctx) which would be equivalent to 
>> source.readContinously().createEnumerator(ctx)
>> // or
>> env.boundedSource(source) // which actually underneath would call 
>> createEnumerator(BOUNDED, ctx) which would be equivalent to 
>> source.readUntilLatestOffset().createEnumerator(ctx)
>> 
>> 
>> Sorry for the comparison, but to me it seems there is too much magic
>> happening underneath those two calls.
>> 
>> I really believe the Source interface should have getBoundedness method
>> instead of (supportBoundedness) + createEnumerator(Boundedness, ...)
>> 
>> 
>> Issue #2 - Design of
>> ExecutionEnvironment#source()/continuousSource()/boundedSource()
>> 
>> As you might have guessed I am slightly in favor of option #2 modified.
>> Yes I am aware every step of the dag would have to be able to say if it is
>> bounded or not. I have a feeling it would be easier to express cross
>> bounded/unbounded operations, but I must admit I have not thought it
>> through thoroughly, In the spirit of batch is just a special case of
>> streaming I thought BoundedStream would extend from DataStream. Correct me
>> if I am wrong. In such a setup the cross bounded/unbounded operation could
>> be expressed quite easily I think:
>> 
>> DataStream {
>>  DataStream join(DataStream, ...); // we could not really tell if the result 
>> is bounded or not, but because bounded stream is a special case of unbounded 
>> the API object is correct, irrespective if the left or right side of the 
>> join is bounded
>> }
>> 
>> BoundedStream extends DataStream {
>>  BoundedStream join(BoundedStream, ...); // only if both sides are bounded 
>> the result can be bounded as well. However we do have access to the 
>> DataStream#join here, so you can still join with a DataStream
>> }
>> 
>> 
>> On the other hand I also see benefits of two completely disjointed APIs,
>> as we could prohibit some streaming calls in the bounded API. I can't think
>> of any unbounded operators that could not be implemented for bounded stream.
>> 
>> Besides I think we both agree we don't like the method:
>> 
>> DataStream boundedStream(Source)
>> 
>> suggested in the current state of the FLIP. Do we ? :)
>> 
>> Best,
>> 
>> Dawid
>> 
>> On 10/12/2019 18:57, Becket Qin wrote:
>> 
>> Hi folks,
>> 
>> Thanks for the discussion, great feedback. Also thanks Dawid for the
>> explanation, it is much clearer now.
>> 
>> One thing that is indeed missing from the 

[jira] [Created] (FLINK-15197) Add resource related config options to dynamical properties for Kubernetes

2019-12-11 Thread Yang Wang (Jira)
Yang Wang created FLINK-15197:
-

 Summary: Add resource related config options to dynamical 
properties for Kubernetes
 Key: FLINK-15197
 URL: https://issues.apache.org/jira/browse/FLINK-15197
 Project: Flink
  Issue Type: Sub-task
Reporter: Yang Wang


Since FLIP-49 and FLIP-53 have been completely merged. The new introduced 
resource config options should be passed to taskmanager via dynamic properties 
for Kubernetes. The dynamic properties are generated in 
\{{KubernetesResourceManager}}.

{\{TaskExecutorResourceUtils#generateDynamicConfigsStr}} could be used to 
generate the dynamic properties.

 

cc [~xintongsong]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15196) The mesos.resourcemanager.tasks.cpus configuration does not work as expectation

2019-12-11 Thread Yangze Guo (Jira)
Yangze Guo created FLINK-15196:
--

 Summary: The mesos.resourcemanager.tasks.cpus configuration does 
not work as expectation
 Key: FLINK-15196
 URL: https://issues.apache.org/jira/browse/FLINK-15196
 Project: Flink
  Issue Type: Bug
  Components: Deployment / Mesos
Reporter: Yangze Guo
 Fix For: 1.10.0


When I tried to set the number of CPU of Mesos workers, I found the 
"mesos.resourcemanager.tasks.cpus" configuration does not work anymore. After 
passing over  recent commits, I guess the root cause is 
[FLINK-14188|https://github.com/apache/flink/pull/10146]. The configuration key 
in MesosTaskManagerParameters#getCpuCores is incorrect.

[~azagrebin] 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: Open Method is not being called in case of AggregateFunction UDFs

2019-12-11 Thread Timo Walther

At least I hope it has been fixed. Which version and planner are you using?


On 11.12.19 11:47, Arujit Pradhan wrote:

Hi Timo,

Thanks for the bug reference.

You mentioned that this bug has been fixed. Is the fix available for 
flink 1.9+ and default query planner.


Thanks and regards,
/Arujit/

On Wed, Dec 11, 2019 at 3:56 PM Timo Walther > wrote:


I remember that we fixed some bug around this topic recently. The
legacy
planner should not be affected.

There is another user reporting this:
https://issues.apache.org/jira/browse/FLINK-15040

Regards,
Timo

On 11.12.19 10:34, Dawid Wysakowicz wrote:
 > Hi Arujit,
 >
 > Could you also share the query where you use this UDF? It would also
 > help if you said which version of Flink you are using and which
planner.
 >
 > Best,
 >
 > Dawid
 >
 > On 11/12/2019 10:21, Arujit Pradhan wrote:
 >> Hi all,
 >>
 >> So we are creating some User Defined Functions of type
 >> AggregateFunction. And we want to send some static metrics from the
 >> *open()* method of the UDFs as we can get *MetricGroup *by
 >> *FunctionContext *which is only exposed in the open method. Our
code
 >> looks something like this(Which is an implementation of count
distinct
 >> in SQL) :
 >>
 >> public class DistinctCount extends AggregateFunction> DistinctCountAccumulator> { @Override public
DistinctCountAccumulator
 >> createAccumulator() { return new DistinctCountAccumulator(); }
 >> @Override public void open(FunctionContext context) throws
Exception { super.open(context); MetricGroup metricGroup =
context.getMetricGroup(); // add some metric to the group here
 >> System.out.println("in the open of UDF"); } @Override public void
 >> close() throws Exception { super.close(); } @Override public
Integer
 >> getValue(DistinctCountAccumulator distinctCountAccumulator) {
System.out.println("in the udf"); return
distinctCountAccumulator.count(); } public void
accumulate(DistinctCountAccumulator distinctCountAccumulator, String
item) { if (item== null) { return; }
distinctCountAccumulator.add(item); } }
 >>
 >> But when we use this UDF in FlinkSQL, it seems like the open
method is
 >> not being called at all.
 >>
 >> From the filnk UDF documentation we find :
 >>
 >> *The |open()| method is called once before the evaluation
method. The
 >> |close()| method after the last call to the evaluation method.*
 >>
 >> *The |open()| method provides a |FunctionContext| that contains
 >> information about the context in which user-defined functions are
 >> executed, such as the metric group, the distributed cache files, or
 >> the global job parameters.*
 >>
 >> Then is there any reason that open is not working in
 >> AggragateFunctions. Btw it works fine in case of
ScalarFunctions. Is
 >> there any alternative scope where we can register some static
metrics
 >> in a UDF.
 >>
 >>
 >> Thanks and regards,
 >> /Arujit/
 >>





[jira] [Created] (FLINK-15195) Remove unu

2019-12-11 Thread Kostas Kloudas (Jira)
Kostas Kloudas created FLINK-15195:
--

 Summary: Remove unu
 Key: FLINK-15195
 URL: https://issues.apache.org/jira/browse/FLINK-15195
 Project: Flink
  Issue Type: Sub-task
Reporter: Kostas Kloudas






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] Adding e2e tests for Flink's Mesos integration

2019-12-11 Thread Gary Yao
Thanks for driving this effort. Also +1 from my side. I have left a few
questions below.

> - Wordcount end-to-end test. For verifying the basic process of Mesos
> deployment.

Would this add additional test coverage compared to the
"multiple submissions" test case? I am asking because the E2E tests are
already
expensive to run, and adding new tests should be carefully considered.

> - State TTL RocksDb backend end-to-end test. For verifying memory
> configuration behaviors, since Mesos has it’s own config options and
> logics.

Can you elaborate more on this? Which config options are relevant here?

On Wed, Dec 11, 2019 at 9:58 AM Till Rohrmann  wrote:

> +1 for building the image locally. If need should arise, then we could
> change it always later.
>
> Cheers,
> Till
>
> On Wed, Dec 11, 2019 at 4:05 AM Xintong Song 
> wrote:
>
> > Thanks, Yangtze.
> >
> > +1 for building the image locally.
> > The time consumption for both building image locally and pulling it from
> > DockerHub sounds reasonable and affordable. Therefore, I'm also in favor
> of
> > avoiding the cost maintaining a custom image.
> >
> > Thank you~
> >
> > Xintong Song
> >
> >
> >
> > On Wed, Dec 11, 2019 at 10:11 AM Yangze Guo  wrote:
> >
> > > Thanks for the feedback, Yang.
> > >
> > > Some updates I want to share in this thread.
> > > I have built a PoC version of Meos e2e test with WordCount
> > > workflow.[1] Then, I ran it in the testing environment. As the result
> > > shown here[2]:
> > > - For pulling image from DockerHub, it took 1 minute and 21 seconds
> > > - For building it locally, it took 2 minutes and 54 seconds.
> > >
> > > I prefer building it locally. Although it is slower, I think the time
> > > overhead, comparing to the cost of maintaining the image in DockerHub
> > > and the whole test process, is trivial for building or pulling the
> > > image.
> > >
> > > I look forward to hearing from you. ;)
> > >
> > > Best,
> > > Yangze Guo
> > >
> > > [1]
> > >
> >
> https://github.com/KarmaGYZ/flink/commit/0406d942446a1b17f81d93235b21a829bf88ccf0
> > > [2]https://travis-ci.org/KarmaGYZ/flink/jobs/623207957
> > > Best,
> > > Yangze Guo
> > >
> > > On Mon, Dec 9, 2019 at 2:39 PM Yang Wang 
> wrote:
> > > >
> > > > Thanks Yangze for starting this discussion.
> > > >
> > > > Just share my thoughts.
> > > >
> > > > If the mesos official docker image could not meet our requirement, i
> > > suggest to build the image locally.
> > > > We have done the same things for yarn e2e tests. This way is more
> > > flexible and easy to maintain. However,
> > > > i have no idea how long building the mesos image locally will take.
> > > Based on previous experience of yarn, i
> > > > think it may not take too much time.
> > > >
> > > >
> > > >
> > > > Best,
> > > > Yang
> > > >
> > > > Yangze Guo  于2019年12月7日周六 下午4:25写道:
> > > >>
> > > >> Thanks for your feedback!
> > > >>
> > > >> @Till
> > > >> Regarding the time overhead, I think it mainly come from the network
> > > >> transmission. For building the image locally, it will totally
> download
> > > >> 260MB files including the base image and packages. For pulling from
> > > >> DockerHub, the compressed size of the image is 347MB. Thus, I agree
> > > >> that it is ok to build the image locally.
> > > >>
> > > >> @Piyush
> > > >> Thank you for offering the help and sharing your usage scenario. In
> > > >> current stage, I think it will be really helpful if you can compress
> > > >> the custom image[1] or reduce the time overhead to build it locally.
> > > >> Any ideas for improving test coverage will also be appreciated.
> > > >>
> > > >> [1]
> > >
> >
> https://hub.docker.com/layers/karmagyz/mesos-flink/latest/images/sha256-4e1caefea107818aa11374d6ac8a6e889922c81806f5cd791ead141f18ec7e64
> > > >>
> > > >> Best,
> > > >> Yangze Guo
> > > >>
> > > >> On Sat, Dec 7, 2019 at 3:17 AM Piyush Narang 
> > > wrote:
> > > >> >
> > > >> > +1 from our end as well. At Criteo, we are running some Flink jobs
> > on
> > > Mesos in production to compute short term features for machine
> learning.
> > > We’d love to help out and contribute on this initiative.
> > > >> >
> > > >> > Thanks,
> > > >> > -- Piyush
> > > >> >
> > > >> >
> > > >> > From: Till Rohrmann 
> > > >> > Date: Friday, December 6, 2019 at 8:10 AM
> > > >> > To: dev 
> > > >> > Cc: user 
> > > >> > Subject: Re: [DISCUSS] Adding e2e tests for Flink's Mesos
> > integration
> > > >> >
> > > >> > Big +1 for adding a fully working e2e test for Flink's Mesos
> > > integration. Ideally we would have it ready for the 1.10 release. The
> > lack
> > > of such a test has bitten us already multiple times.
> > > >> >
> > > >> > In general I would prefer to use the official image if possible
> > since
> > > it frees us from maintaining our own custom image. Since Java 9 is no
> > > longer officially supported as we opted for supporting Java 11 (LTS) it
> > > might not be feasible, though. How much longer would building the
> custom
> > > image vs. 

Re: [DISCUSS] FLIP-27: Refactor Source Interface

2019-12-11 Thread Till Rohrmann
Hi Becket,

quick clarification from my side because I think you misunderstood my
question. I did not suggest to let the SourceReader return only a single
record at a time when calling getNextRecords. As the return type indicates,
the method can return an arbitrary number of records.

Cheers,
Till

On Wed, Dec 11, 2019 at 10:13 AM Dawid Wysakowicz 
wrote:

> Hi Becket,
>
> Issue #1 - Design of Source interface
>
> I mentioned the lack of a method like Source#createEnumerator(Boundedness
> boundedness, SplitEnumeratorContext context), because without the current
> proposal is not complete/does not work.
>
> If we say that boundedness is an intrinsic property of a source imo we
> don't need the Source#createEnumerator(Boundedness boundedness,
> SplitEnumeratorContext context) method.
>
> Assuming a source from my previous example:
>
> Source source = KafkaSource.builder()
>   ...
>   .untilTimestamp(...)
>   .build()
>
> Would the enumerator differ if created like
> source.createEnumerator(CONTINUOUS_UNBOUNDED, ...) vs source
> .createEnumerator(BOUNDED, ...)? I know I am repeating myself, but this is
> the part that my opinion differ the most from the current proposal. I
> really think it should always be the source that tells if it is bounded or
> not. In the current proposal methods continousSource/boundedSource somewhat
> reconfigure the source, which I think is misleading.
>
> I think a call like:
>
> Source source = KafkaSource.builder()
>   ...
>   .readContinously() / readUntilLatestOffset() / readUntilTimestamp / 
> readUntilOffsets / ...
>   .build()
>
> is way cleaner (and expressive) than
>
> Source source = KafkaSource.builder()
>   ...
>   .build()
>
>
> env.continousSource(source) // which actually underneath would call 
> createEnumerator(CONTINUOUS, ctx) which would be equivalent to 
> source.readContinously().createEnumerator(ctx)
> // or
> env.boundedSource(source) // which actually underneath would call 
> createEnumerator(BOUNDED, ctx) which would be equivalent to 
> source.readUntilLatestOffset().createEnumerator(ctx)
>
>
> Sorry for the comparison, but to me it seems there is too much magic
> happening underneath those two calls.
>
> I really believe the Source interface should have getBoundedness method
> instead of (supportBoundedness) + createEnumerator(Boundedness, ...)
>
>
> Issue #2 - Design of
> ExecutionEnvironment#source()/continuousSource()/boundedSource()
>
> As you might have guessed I am slightly in favor of option #2 modified.
> Yes I am aware every step of the dag would have to be able to say if it is
> bounded or not. I have a feeling it would be easier to express cross
> bounded/unbounded operations, but I must admit I have not thought it
> through thoroughly, In the spirit of batch is just a special case of
> streaming I thought BoundedStream would extend from DataStream. Correct me
> if I am wrong. In such a setup the cross bounded/unbounded operation could
> be expressed quite easily I think:
>
> DataStream {
>   DataStream join(DataStream, ...); // we could not really tell if the result 
> is bounded or not, but because bounded stream is a special case of unbounded 
> the API object is correct, irrespective if the left or right side of the join 
> is bounded
> }
>
> BoundedStream extends DataStream {
>   BoundedStream join(BoundedStream, ...); // only if both sides are bounded 
> the result can be bounded as well. However we do have access to the 
> DataStream#join here, so you can still join with a DataStream
> }
>
>
> On the other hand I also see benefits of two completely disjointed APIs,
> as we could prohibit some streaming calls in the bounded API. I can't think
> of any unbounded operators that could not be implemented for bounded stream.
>
> Besides I think we both agree we don't like the method:
>
> DataStream boundedStream(Source)
>
> suggested in the current state of the FLIP. Do we ? :)
>
> Best,
>
> Dawid
>
> On 10/12/2019 18:57, Becket Qin wrote:
>
> Hi folks,
>
> Thanks for the discussion, great feedback. Also thanks Dawid for the
> explanation, it is much clearer now.
>
> One thing that is indeed missing from the FLIP is how the boundedness is
> passed to the Source implementation. So the API should be
> Source#createEnumerator(Boundedness boundedness, SplitEnumeratorContext
> context)
> And we can probably remove the Source#supportBoundedness(Boundedness
> boundedness) method.
>
> Assuming we have that, we are essentially choosing from one of the
> following two options:
>
> Option 1:
> // The source is continuous source, and only unbounded operations can be
> performed.
> DataStream datastream = env.continuousSource(someSource);
>
> // The source is bounded source, both bounded and unbounded operations can
> be performed.
> BoundedDataStream boundedDataStream = env.boundedSource(someSource);
>
>   - Pros:
>a) explicit boundary between bounded / unbounded streams, it is
> quite simple and clear to the users.
>   - Cons:
>a) For 

[DISCUSS] Overwrite and partition inserting support in 1.10

2019-12-11 Thread Jingsong Li
Hi Dev,

After cutting out the branch of 1.10, I tried the following functions of
SQL-CLI and found that it does not support:
- insert overwrite
- PARTITION (partcol1=val1, partcol2=val2 ...)
The SQL pattern is:
INSERT { INTO | OVERWRITE } TABLE tablename1 [PARTITION (partcol1=val1,
partcol2=val2 ...) select_statement1 FROM from_statement;
It is a surprise to me.
The reason is that we only allow these two grammars in hive dialect. And
SQL-CLI does not have an interface to switch dialects.

Because it directly hinders the SQL-CLI's insert syntax in hive integration
and seriously hinders the practicability of SQL-CLI.
And we have introduced these two grammars in FLIP-63 [1] to Flink.
Here are my question:
1.Should we remove hive dialect limitation for these two grammars?
2.Should we fix this in 1.10?

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-63%3A+Rework+table+partition+support

Best,
Jingsong Lee


Re: [DISCUSS] FLIP-27: Refactor Source Interface

2019-12-11 Thread Dawid Wysakowicz
Hi Becket,

Issue #1 - Design of Source interface

I mentioned the lack of a method like
Source#createEnumerator(Boundedness boundedness, SplitEnumeratorContext
context), because without the current proposal is not complete/does not
work.

If we say that boundedness is an intrinsic property of a source imo we
don't need the Source#createEnumerator(Boundedness boundedness,
SplitEnumeratorContext context) method.

Assuming a source from my previous example:

Source source = KafkaSource.builder()
  ...
  .untilTimestamp(...)
  .build()

Would the enumerator differ if created like
source.createEnumerator(CONTINUOUS_UNBOUNDED, ...) vs source
.createEnumerator(BOUNDED, ...)? I know I am repeating myself, but this
is the part that my opinion differ the most from the current proposal. I
really think it should always be the source that tells if it is bounded
or not. In the current proposal methods continousSource/boundedSource
somewhat reconfigure the source, which I think is misleading.

I think a call like:

Source source = KafkaSource.builder()
  ...
  .readContinously() / readUntilLatestOffset() / readUntilTimestamp / 
readUntilOffsets / ...
  .build()

is way cleaner (and expressive) than

Source source = KafkaSource.builder()
  ...
  .build()

env.continousSource(source) // which actually underneath would call 
createEnumerator(CONTINUOUS, ctx) which would be equivalent to 
source.readContinously().createEnumerator(ctx)
// or
env.boundedSource(source) // which actually underneath would call 
createEnumerator(BOUNDED, ctx) which would be equivalent to 
source.readUntilLatestOffset().createEnumerator(ctx)

Sorry for the comparison, but to me it seems there is too much magic
happening underneath those two calls.

I really believe the Source interface should have getBoundedness method
instead of (supportBoundedness) + createEnumerator(Boundedness, ...)


Issue #2 - Design of
ExecutionEnvironment#source()/continuousSource()/boundedSource()

As you might have guessed I am slightly in favor of option #2 modified.
Yes I am aware every step of the dag would have to be able to say if it
is bounded or not. I have a feeling it would be easier to express cross
bounded/unbounded operations, but I must admit I have not thought it
through thoroughly, In the spirit of batch is just a special case of
streaming I thought BoundedStream would extend from DataStream. Correct
me if I am wrong. In such a setup the cross bounded/unbounded operation
could be expressed quite easily I think:

DataStream {
  DataStream join(DataStream, ...); // we could not really tell if the result 
is bounded or not, but because bounded stream is a special case of unbounded 
the API object is correct, irrespective if the left or right side of the join 
is bounded
}

BoundedStream extends DataStream {
  BoundedStream join(BoundedStream, ...); // only if both sides are bounded the 
result can be bounded as well. However we do have access to the DataStream#join 
here, so you can still join with a DataStream
}

On the other hand I also see benefits of two completely disjointed APIs,
as we could prohibit some streaming calls in the bounded API. I can't
think of any unbounded operators that could not be implemented for
bounded stream.

Besides I think we both agree we don't like the method:

DataStream boundedStream(Source)

suggested in the current state of the FLIP. Do we ? :)

Best,

Dawid

On 10/12/2019 18:57, Becket Qin wrote:

> Hi folks,
>
> Thanks for the discussion, great feedback. Also thanks Dawid for the
> explanation, it is much clearer now.
>
> One thing that is indeed missing from the FLIP is how the boundedness is
> passed to the Source implementation. So the API should be
> Source#createEnumerator(Boundedness boundedness, SplitEnumeratorContext
> context)
> And we can probably remove the Source#supportBoundedness(Boundedness
> boundedness) method.
>
> Assuming we have that, we are essentially choosing from one of the
> following two options:
>
> Option 1:
> // The source is continuous source, and only unbounded operations can be
> performed.
> DataStream datastream = env.continuousSource(someSource);
>
> // The source is bounded source, both bounded and unbounded operations can
> be performed.
> BoundedDataStream boundedDataStream = env.boundedSource(someSource);
>
>   - Pros:
>a) explicit boundary between bounded / unbounded streams, it is
> quite simple and clear to the users.
>   - Cons:
>a) For applications that do not involve bounded operations, they
> still have to call different API to distinguish bounded / unbounded streams.
>b) No support for bounded stream to run in a streaming runtime
> setting, i.e. scheduling and operators behaviors.
>
>
> Option 2:
> // The source is either bounded or unbounded, but only unbounded operations
> could be performed on the returned DataStream.
> DataStream dataStream = env.source(someSource);
>
> // The source must be a bounded source, otherwise exception is thrown.
> 

Re: [DISCUSS] Adding e2e tests for Flink's Mesos integration

2019-12-11 Thread Till Rohrmann
+1 for building the image locally. If need should arise, then we could
change it always later.

Cheers,
Till

On Wed, Dec 11, 2019 at 4:05 AM Xintong Song  wrote:

> Thanks, Yangtze.
>
> +1 for building the image locally.
> The time consumption for both building image locally and pulling it from
> DockerHub sounds reasonable and affordable. Therefore, I'm also in favor of
> avoiding the cost maintaining a custom image.
>
> Thank you~
>
> Xintong Song
>
>
>
> On Wed, Dec 11, 2019 at 10:11 AM Yangze Guo  wrote:
>
> > Thanks for the feedback, Yang.
> >
> > Some updates I want to share in this thread.
> > I have built a PoC version of Meos e2e test with WordCount
> > workflow.[1] Then, I ran it in the testing environment. As the result
> > shown here[2]:
> > - For pulling image from DockerHub, it took 1 minute and 21 seconds
> > - For building it locally, it took 2 minutes and 54 seconds.
> >
> > I prefer building it locally. Although it is slower, I think the time
> > overhead, comparing to the cost of maintaining the image in DockerHub
> > and the whole test process, is trivial for building or pulling the
> > image.
> >
> > I look forward to hearing from you. ;)
> >
> > Best,
> > Yangze Guo
> >
> > [1]
> >
> https://github.com/KarmaGYZ/flink/commit/0406d942446a1b17f81d93235b21a829bf88ccf0
> > [2]https://travis-ci.org/KarmaGYZ/flink/jobs/623207957
> > Best,
> > Yangze Guo
> >
> > On Mon, Dec 9, 2019 at 2:39 PM Yang Wang  wrote:
> > >
> > > Thanks Yangze for starting this discussion.
> > >
> > > Just share my thoughts.
> > >
> > > If the mesos official docker image could not meet our requirement, i
> > suggest to build the image locally.
> > > We have done the same things for yarn e2e tests. This way is more
> > flexible and easy to maintain. However,
> > > i have no idea how long building the mesos image locally will take.
> > Based on previous experience of yarn, i
> > > think it may not take too much time.
> > >
> > >
> > >
> > > Best,
> > > Yang
> > >
> > > Yangze Guo  于2019年12月7日周六 下午4:25写道:
> > >>
> > >> Thanks for your feedback!
> > >>
> > >> @Till
> > >> Regarding the time overhead, I think it mainly come from the network
> > >> transmission. For building the image locally, it will totally download
> > >> 260MB files including the base image and packages. For pulling from
> > >> DockerHub, the compressed size of the image is 347MB. Thus, I agree
> > >> that it is ok to build the image locally.
> > >>
> > >> @Piyush
> > >> Thank you for offering the help and sharing your usage scenario. In
> > >> current stage, I think it will be really helpful if you can compress
> > >> the custom image[1] or reduce the time overhead to build it locally.
> > >> Any ideas for improving test coverage will also be appreciated.
> > >>
> > >> [1]
> >
> https://hub.docker.com/layers/karmagyz/mesos-flink/latest/images/sha256-4e1caefea107818aa11374d6ac8a6e889922c81806f5cd791ead141f18ec7e64
> > >>
> > >> Best,
> > >> Yangze Guo
> > >>
> > >> On Sat, Dec 7, 2019 at 3:17 AM Piyush Narang 
> > wrote:
> > >> >
> > >> > +1 from our end as well. At Criteo, we are running some Flink jobs
> on
> > Mesos in production to compute short term features for machine learning.
> > We’d love to help out and contribute on this initiative.
> > >> >
> > >> > Thanks,
> > >> > -- Piyush
> > >> >
> > >> >
> > >> > From: Till Rohrmann 
> > >> > Date: Friday, December 6, 2019 at 8:10 AM
> > >> > To: dev 
> > >> > Cc: user 
> > >> > Subject: Re: [DISCUSS] Adding e2e tests for Flink's Mesos
> integration
> > >> >
> > >> > Big +1 for adding a fully working e2e test for Flink's Mesos
> > integration. Ideally we would have it ready for the 1.10 release. The
> lack
> > of such a test has bitten us already multiple times.
> > >> >
> > >> > In general I would prefer to use the official image if possible
> since
> > it frees us from maintaining our own custom image. Since Java 9 is no
> > longer officially supported as we opted for supporting Java 11 (LTS) it
> > might not be feasible, though. How much longer would building the custom
> > image vs. downloading the custom image from DockerHub be? Maybe it is ok
> to
> > build the image locally. Then we would not have to maintain the image.
> > >> >
> > >> > Cheers,
> > >> > Till
> > >> >
> > >> > On Fri, Dec 6, 2019 at 11:05 AM Yangze Guo  > > wrote:
> > >> > Hi, all,
> > >> >
> > >> > Currently, there is no end to end test or IT case for Mesos
> deployment
> > >> > while the common deployment related developing would inevitably
> touch
> > >> > the logic of this component. Thus, some work needs to be done to
> > >> > guarantee experience for both Meos users and contributors. After
> > >> > offline discussion with Till and Xintong, we have some basic ideas
> and
> > >> > would like to start a discussion thread on adding end to end tests
> for
> > >> > Flink's Mesos integration.
> > >> >
> > >> > As a first step, we would like to keep the scope of this
> contribution
> > >> > to be 

[jira] [Created] (FLINK-15194) Directories in distributed caches are not extracted in Yarn Per Job Cluster Mode

2019-12-11 Thread Wei Zhong (Jira)
Wei Zhong created FLINK-15194:
-

 Summary: Directories in distributed caches are not extracted in 
Yarn Per Job Cluster Mode
 Key: FLINK-15194
 URL: https://issues.apache.org/jira/browse/FLINK-15194
 Project: Flink
  Issue Type: Bug
  Components: Deployment / YARN
Affects Versions: 1.10.0
Reporter: Wei Zhong


If we insert such code into the word count batch examples:
{code:java}
File testDirectory = new File("test_directory");
testDirectory.mkdirs();
env.registerCachedFile(testDirectory.getAbsolutePath(), "test_directory");

text = text.map(new RichMapFunction() {
   @Override
   public String map(String value) throws Exception {
  File testDirectory = 
getRuntimeContext().getDistributedCache().getFile("test_directory");
  if (!testDirectory.isDirectory()) {
 throw new RuntimeException(
String.format("the directory %s is not a directory!", 
testDirectory.getAbsolutePath()));
  }
  return value;
   }
});
{code}
It works well in standalone mode but fails in Yarn Per Job Cluster Mode, the 
exception is:
{code:java}
org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: org.apache.flink.client.program.ProgramInvocationException: 
Job failed (JobID: da572c60eb63b13b7a90892f1958a7b7)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
at 
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:146)
at 
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:671)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:216)
at 
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:933)
at 
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1006)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1006)
Caused by: java.util.concurrent.ExecutionException: 
org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 
da572c60eb63b13b7a90892f1958a7b7)
at 
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
at 
org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:93)
at 
org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:804)
at org.apache.flink.api.java.DataSet.collect(DataSet.java:413)
at org.apache.flink.api.java.DataSet.print(DataSet.java:1652)
at 
org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:115)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
... 11 more
Caused by: org.apache.flink.client.program.ProgramInvocationException: Job 
failed (JobID: da572c60eb63b13b7a90892f1958a7b7)
at 
org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:112)
at 
java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
at 
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
at 
org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$21(RestClusterClient.java:532)
at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
at 
org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:291)
at 

[jira] [Created] (FLINK-15193) Move DDL to first tab in table connector page

2019-12-11 Thread Jingsong Lee (Jira)
Jingsong Lee created FLINK-15193:


 Summary: Move DDL to first tab in table connector page
 Key: FLINK-15193
 URL: https://issues.apache.org/jira/browse/FLINK-15193
 Project: Flink
  Issue Type: Task
  Components: Documentation
Reporter: Jingsong Lee
 Fix For: 1.10.0


Since we have a good support for DDL in tableEnv.sqlUpdate and SQL-CLI, I think 
it is time to highlight DDL in the document.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)