Re: [ANNOUNCE] Open source of remote shuffle project for Flink batch data processing

2021-11-30 Thread 刘建刚
Good work for flink's batch processing!
Remote shuffle service can resolve the container lost problem and reduce
the running time for batch jobs once failover. We have investigated the
component a lot and welcome Flink's native solution. We will try it and
help improve it.

Thanks,
Liu Jiangang

Yingjie Cao  于2021年11月30日周二 下午9:33写道:

> Hi dev & users,
>
> We are happy to announce the open source of remote shuffle project [1] for
> Flink. The project is originated in Alibaba and the main motivation is to
> improve batch data processing for both performance & stability and further
> embrace cloud native. For more features about the project, please refer to
> [1].
>
> Before going open source, the project has been used widely in production
> and it behaves well on both stability and performance. We hope you enjoy
> it. Collaborations and feedbacks are highly appreciated.
>
> Best,
> Yingjie on behalf of all contributors
>
> [1] https://github.com/flink-extended/flink-remote-shuffle
>


Re: FLink Accessing two hdfs cluster

2021-11-30 Thread David Morávek
Can you please elaborare on what has solved your issue, so others that run
into it could learn from your experience?

Best,
D.

On Wed 1. 12. 2021 at 3:38, chenqizhu  wrote:

> Hi,
>
>   My problem has been solved. Thank you again
>
> Best regards
>
> 在 2021-12-01 09:58:52,"chenqizhu"  写道:
>
> Hi David,
>
>I'm glad you can reply.
>
>--this exception doesn't seem to come from Flink, but rather from a
> YARN container bootstrap.
>--In this case the exception happens before any Flink code is executed
> by the NodeManager.
>
> If that's the case, but how nodeManager knows about the 'BCluster' I
> configured in Flink in this case ?
>
>
> In short, there are now two HDFS and I want to access one of
> them(called BCluster), which is the cluster not the default of the FLINK
> client . (The YARN node contains all nodes of the two HDFS . )
>
>
>There are more details in JIRA FLINK-25099
> 
>
>
>
> At 2021-11-30 21:50:08, "David Morávek"  wrote:
>
> Hi chenqizhu,
>
> When YARN container starts up, it needs to download resources from HDFS
> (your job archives / configuration / distributed cache / ...) which are
> necessary for startup of the user application (in Flink case JobManager /
> TaskManager). As far as I can tell, the affected NodeManager tries to pull
> data from a filesystem it doesn't have access to (refer to hdfs-site.conf /
> yarn logs on the particular node).
>
> question : Why cannot flink-conf(flink.hadoop.*) overwrite the
>> configurations read by YARN NodeManager ?
>>
>
> In this case the exception happens before any Flink code is executed by
> the NodeManager.
>
> I think NM logs can help you identify which files are not accessible by
> YARN, that could narrow it down a bit.
>
> Best,
> D.
>
> On Tue, Nov 30, 2021 at 9:23 AM chenqizhu  wrote:
>
>> hi,
>> Flink version 1.13 supports configuration of Hadoop properties in
>> flink-conf.yaml via flink.hadoop.*. There is A requirement to write
>> checkpoint to HDFS with SSDS (called Bcluster ) to speed checkpoint
>> writing, but this HDFS cluster is not the default HDFS in the flink client
>> (called Acluster ). Yaml is configured with nameservices for cluster A and
>> cluster B, which is similar to HDFS federated mode.
>>
>> The configuration is as follows:
>>
>> flink.hadoop.dfs.nameservices: ACluster,BCluster
>> flink.hadoop.fs.defaultFS: hdfs://BCluster
>> flink.hadoop.dfs.ha.namenodes.ACluster: nn1,nn2
>> flink.hadoop.dfs.namenode.rpc-address.ACluster.nn1: 10.:9000
>> flink.hadoop.dfs.namenode.http-address.ACluster.nn1: 10.:50070
>> flink.hadoop.dfs.namenode.rpc-address.ACluster.nn2: 10.xx:9000
>> flink.hadoop.dfs.namenode.http-address.ACluster.nn2: 10.xx:50070
>> flink.hadoop.dfs.client.failover.proxy.provider.ACluster: 
>> org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider
>>
>> flink.hadoop.dfs.ha.namenodes.BCluster: nn1,nn2
>> flink.hadoop.dfs.namenode.rpc-address.BCluster.nn1: 10.xx:9000
>> flink.hadoop.dfs.namenode.http-address.BCluster.nn1: 10.xx:50070
>> flink.hadoop.dfs.namenode.rpc-address.BCluster.nn2: 10.xx:9000
>> flink.hadoop.dfs.namenode.http-address.BCluster.nn2: 10.x:50070
>> flink.hadoop.dfs.client.failover.proxy.provider.BCluster: 
>> org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider
>>
>> However, an error occurred during the startup of the job, which is
>> reported as follows:
>>
>> (change configuration items to A flink local client default HDFS cluster,
>> the operation can be normal boot:  flink.hadoop.fs.DefaultFS: hdfs: / /
>> ACluster)
>>
>>
>> Failing this attempt.Diagnostics: [2021-11-30 
>> 15:39:15.582]java.net.UnknownHostException: BCluster
>>
>> java.lang.IllegalArgumentException: java.net.UnknownHostException: BCluster
>>
>>  at 
>> org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:448)
>>  at 
>> org.apache.hadoop.hdfs.NameNodeProxiesClient.createProxyWithClientProtocol(NameNodeProxiesClient.java:139)
>>  at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:374)
>>  at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:308)
>>  at 
>> org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:184)
>>  at 
>> org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3414)
>>  at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:158)
>>  at 
>> org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3474)
>>  at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3442)
>>  at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:524)
>>  at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
>>  at 
>> org.apache.hadoop.yarn.util.FSDownload.verifyAndCopy(FSDownload.java:270)
>>  at org.apache.hadoop.yarn.util.FSDownload.access$000(FSDownload.java:68)
>>  at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:415)
>>   

Re: [ANNOUNCE] Open source of remote shuffle project for Flink batch data processing

2021-11-30 Thread Yun Gao
Very thanks for all the warm responses ! We are greatly welcome more use cases 
and co-work on Flink Remote Shuffle and bash processing with Flink~

Best,
Yun


--
From:Yingjie Cao 
Send Time:2021 Dec. 1 (Wed.) 11:16
To:dev 
Subject:Re: [ANNOUNCE] Open source of remote shuffle project for Flink batch 
data processing

Hi Jing,

Great to hear that, collaborations and feedbacks are welcomed.

Best,
Yingjie

Jing Zhang  于2021年12月1日周三 上午10:34写道:

> Amazing!
> Remote shuffle service is an important improvement for batch data
> processing experience on Flink.
> It is also a strong requirement in our internal batch business, we would
> try it soon and give you feedback.
>
> Best,
> Jing Zhang
>
> Martijn Visser  于2021年12月1日周三 上午3:25写道:
>
> > Hi Yingjie,
> >
> > This is great, thanks for sharing. Will you also add it to
> > https://flink-packages.org/ ?
> >
> > Best regards,
> >
> > Martijn
> >
> > On Tue, 30 Nov 2021 at 17:31, Till Rohrmann 
> wrote:
> >
> > > Great news, Yingjie. Thanks a lot for sharing this information with the
> > > community and kudos to all the contributors of the external shuffle
> > service
> > > :-)
> > >
> > > Cheers,
> > > Till
> > >
> > > On Tue, Nov 30, 2021 at 2:32 PM Yingjie Cao 
> > > wrote:
> > >
> > > > Hi dev & users,
> > > >
> > > > We are happy to announce the open source of remote shuffle project
> [1]
> > > for
> > > > Flink. The project is originated in Alibaba and the main motivation
> is
> > to
> > > > improve batch data processing for both performance & stability and
> > > further
> > > > embrace cloud native. For more features about the project, please
> refer
> > > to
> > > > [1].
> > > >
> > > > Before going open source, the project has been used widely in
> > production
> > > > and it behaves well on both stability and performance. We hope you
> > enjoy
> > > > it. Collaborations and feedbacks are highly appreciated.
> > > >
> > > > Best,
> > > > Yingjie on behalf of all contributors
> > > >
> > > > [1] https://github.com/flink-extended/flink-remote-shuffle
> > > >
> > >
> >
>



[jira] [Created] (FLINK-25124) A deadlock occurs when the jdbc sink uses two consecutive dimension tables to associate

2021-11-30 Thread shizhengchao (Jira)
shizhengchao created FLINK-25124:


 Summary: A deadlock occurs when the jdbc sink uses two consecutive 
dimension tables to associate
 Key: FLINK-25124
 URL: https://issues.apache.org/jira/browse/FLINK-25124
 Project: Flink
  Issue Type: Bug
  Components: Connectors / JDBC
Affects Versions: 1.13.1
Reporter: shizhengchao


 

The sql statement is as follows:
{code:java}
//代码占位符
INSERT INTO imei_phone_domestic_realtime
  SELECT
  t.data.imei AS imei,
  CAST(t.data.register_date_key AS bigint) AS register_date_key,
  c.agent_type AS channel_name,
  c.agent_short_name,
  c.agent_name,
  c.agent_chinese_name,
  c.isforeign AS agent_market_type,
  p.seriename AS series_name,
  p.salename AS sale_name,
  p.devname AS dev_name,
  p.devnamesource AS dev_name_source,
  p.color,
  p.isforeign AS product_market_type,
  p.carrier,
  p.lcname AS life_cycle,
  IFNULL(p.shipping_price,0) AS shipping_price,
  IFNULL(p.retail_price,0) AS  retail_price
  FROM kafka_imei_phone_domestic_realtime AS t
  LEFT JOIN dim_product FOR SYSTEM_TIME AS OF t.proctime AS p ON 
p.pn=t.item_code
  LEFT JOIN dim_customer FOR SYSTEM_TIME AS OF t.proctime AS c ON 
c.customer_code=t.customer_code
  where t.eventType='update'; {code}
There will be a probability of deadlock:
{code:java}
//代码占位符
"jdbc-upsert-output-format-thread-1" Id=84 BLOCKED on 
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat@649788af 
owned by "Legacy Source Thread - Source: 
TableSourceScan(table=[[default_catalog, default_database, 
kafka_imei_phone_domestic_realtime]], fields=[data, eventType]) -> 
Calc(select=[data, data.item_code AS $f3], where=[(eventType = 
_UTF-16LE'update':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")]) -> 
LookupJoin(table=[default_catalog.default_database.dim_product], 
joinType=[LeftOuterJoin], async=[false], lookup=[pn=$f3], select=[data, $f3, 
pn, color, isforeign, devname, salename, seriename, lcname, carrier, 
devnamesource, shipping_price, retail_price]) -> Calc(select=[data, color, 
isforeign, devname, salename, seriename, lcname, carrier, devnamesource, 
shipping_price, retail_price, data.customer_code AS $f31]) -> 
LookupJoin(table=[default_catalog.default_database.dim_customer], 
joinType=[LeftOuterJoin], async=[false], lookup=[customer_code=$f31], 
select=[data, color, isforeign, devname, salename, seriename, lcname, carrier, 
devnamesource, shipping_price, retail_price, $f31, customer_code, 
agent_short_name, agent_name, isforeign, agent_type, agent_chinese_name]) -> 
Calc(select=[data.imei AS imei, CAST(data.register_date_key) AS 
register_date_key, agent_type AS channel_name, agent_short_name, agent_name, 
agent_chinese_name, isforeign0 AS agent_market_type, seriename AS series_name, 
salename AS sale_name, devname AS dev_name, devnamesource AS dev_name_source, 
color, isforeign AS product_market_type, carrier, lcname AS life_cycle, 
IFNULL(shipping_price, 0:DECIMAL(10, 0)) AS shipping_price, 
IFNULL(retail_price, 0:DECIMAL(10, 0)) AS retail_price]) -> 
NotNullEnforcer(fields=[imei]) -> Sink: 
Sink(table=[default_catalog.default_database.imei_phone_domestic_realtime], 
fields=[imei, register_date_key, channel_name, agent_short_name, agent_name, 
agent_chinese_name, agent_market_type, series_name, sale_name, dev_name, 
dev_name_source, color, product_market_type, carrier, life_cycle, 
shipping_price, retail_price]) (6/12)#0" Id=82
    at 
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.lambda$open$0(JdbcBatchingOutputFormat.java:124)
    -  blocked on 
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat@649788af
    at 
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat$$Lambda$344/21845506.run(Unknown
 Source)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    ...    Number of locked synchronizers = 1
    - java.util.concurrent.ThreadPoolExecutor$Worker@325612a2 {code}
 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25123) Improve expression description in SQL operator

2021-11-30 Thread Wenlong Lyu (Jira)
Wenlong Lyu created FLINK-25123:
---

 Summary: Improve expression description in SQL operator
 Key: FLINK-25123
 URL: https://issues.apache.org/jira/browse/FLINK-25123
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: Wenlong Lyu






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: [DISCUSS] FLIP-194: Introduce the JobResultStore

2021-11-30 Thread Zhu Zhu
Thanks for the explanation Matthias. The solution sounds good to me.
I have no more concerns and +1 for the FLIP.

Thanks,
Zhu

Xintong Song  于2021年12月1日周三 下午12:56写道:

> @David,
>
> Thanks for the clarification.
>
> No more concerns from my side. +1 for this FLIP.
>
> Thank you~
>
> Xintong Song
>
>
>
> On Wed, Dec 1, 2021 at 12:28 AM Till Rohrmann 
> wrote:
>
> > Given the other breaking changes, I think that it is ok to remove the
> > `RunningJobsRegistry` completely.
> >
> > Since we allow users to specify a HighAvailabilityServices implementation
> > when starting Flink via `high-availability: FQDN`, I think we should mark
> > the interface at least @Experimental.
> >
> > Cheers,
> > Till
> >
> > On Tue, Nov 30, 2021 at 2:29 PM Mika Naylor  wrote:
> >
> > > Hi Till,
> > >
> > > We thought that breaking interfaces, specifically
> > > HighAvailabilityServices and RunningJobsRegistry, was acceptable in
> this
> > > instance because:
> > >
> > > - Neither of these interfaces are marked @Public and so carry no
> > >guarantees about being public and stable.
> > > - As far as we are aware, we currently have no users with custom
> > >HighAvailabilityServices implementations.
> > > - The interface was already broken in 1.14 with the changes to
> > >CheckpointRecoveryFactory, and will likely be changed again in 1.15
> > >due to further changes in that factory.
> > >
> > > Given that, we thought changes to the interface would not be
> disruptive.
> > > Perhaps it could be annotated as @Internal - I'm not sure exactly what
> > > guarantees we try and give for the stability of the
> > > HighAvailabilityServices interface.
> > >
> > > Kind regards,
> > > Mika
> > >
> > > On 26.11.2021 18:28, Till Rohrmann wrote:
> > > >Thanks for creating this FLIP Matthias, Mika and David.
> > > >
> > > >I think the JobResultStore is an important piece for fixing Flink's
> last
> > > >high-availability problem (afaik). Once we have this piece in place,
> > users
> > > >no longer risk to re-execute a successfully completed job.
> > > >
> > > >I have one comment concerning breaking interfaces:
> > > >
> > > >If we don't want to break interfaces, then we could keep the
> > > >HighAvailabilityServices.getRunningJobsRegistry() method and add a
> > default
> > > >implementation for HighAvailabilityServices.getJobResultStore(). We
> > could
> > > >then deprecate the former method and then remove it in the subsequent
> > > >release (1.16).
> > > >
> > > >Apart from that, +1 for the FLIP.
> > > >
> > > >Cheers,
> > > >Till
> > > >
> > > >On Wed, Nov 17, 2021 at 6:05 PM David Morávek 
> wrote:
> > > >
> > > >> Hi everyone,
> > > >>
> > > >> Matthias, Mika and I want to start a discussion about introduction
> of
> > a
> > > new
> > > >> Flink component, the *JobResultStore*.
> > > >>
> > > >> The main motivation is to address shortcomings of the
> > > *RunningJobsRegistry*
> > > >> and surpass it with the new component. These shortcomings have been
> > > first
> > > >> described in FLINK-11813 [1].
> > > >>
> > > >> This change should improve the overall stability of the JobManager's
> > > >> components and address the race conditions in some of the fail over
> > > >> scenarios during the job cleanup lifecycle.
> > > >>
> > > >> It should also help to ensure that Flink doesn't leave any uncleaned
> > > >> resources behind.
> > > >>
> > > >> We've prepared a FLIP-194 [2], which outlines the design and
> reasoning
> > > >> behind this new component.
> > > >>
> > > >> [1] https://issues.apache.org/jira/browse/FLINK-11813
> > > >> [2]
> > > >>
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=195726435
> > > >>
> > > >> We're looking forward for your feedback ;)
> > > >>
> > > >> Best,
> > > >> Matthias, Mika and David
> > > >>
> > >
> > > Mika Naylor
> > > https://autophagy.io
> > >
> >
>


[jira] [Created] (FLINK-25122) flink-dist/src/main/flink-bin/bin/flink does not expand variable for FLINK_ENV_JAVA_OPTS

2021-11-30 Thread L Z (Jira)
L Z created FLINK-25122:
---

 Summary: flink-dist/src/main/flink-bin/bin/flink does not expand 
variable for FLINK_ENV_JAVA_OPTS
 Key: FLINK-25122
 URL: https://issues.apache.org/jira/browse/FLINK-25122
 Project: Flink
  Issue Type: Bug
  Components: Command Line Client
Affects Versions: 1.12.5
Reporter: L Z


After adding 
{code:yaml}
env.java.opts: "-Xloggc:${FLINK_LOG_PREFIX}.gc.log <...omitted chars...>"
{code}
to flink-conf.yaml.

flink CLI fails with
{code:bash}
Invalid file name for use with -Xloggc: Filename can only contain the 
characters [A-Z][a-z][0-9]-_.%[p|t] but it has been ${FLINK_LOG_PREFIX}.gc.log 
Note %p or %t can only be used once Error: Could not create the Java Virtual 
Machine. Error: A fatal exception has occurred. Program will exit.{code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: [DISCUSS] FLIP-194: Introduce the JobResultStore

2021-11-30 Thread Xintong Song
@David,

Thanks for the clarification.

No more concerns from my side. +1 for this FLIP.

Thank you~

Xintong Song



On Wed, Dec 1, 2021 at 12:28 AM Till Rohrmann  wrote:

> Given the other breaking changes, I think that it is ok to remove the
> `RunningJobsRegistry` completely.
>
> Since we allow users to specify a HighAvailabilityServices implementation
> when starting Flink via `high-availability: FQDN`, I think we should mark
> the interface at least @Experimental.
>
> Cheers,
> Till
>
> On Tue, Nov 30, 2021 at 2:29 PM Mika Naylor  wrote:
>
> > Hi Till,
> >
> > We thought that breaking interfaces, specifically
> > HighAvailabilityServices and RunningJobsRegistry, was acceptable in this
> > instance because:
> >
> > - Neither of these interfaces are marked @Public and so carry no
> >guarantees about being public and stable.
> > - As far as we are aware, we currently have no users with custom
> >HighAvailabilityServices implementations.
> > - The interface was already broken in 1.14 with the changes to
> >CheckpointRecoveryFactory, and will likely be changed again in 1.15
> >due to further changes in that factory.
> >
> > Given that, we thought changes to the interface would not be disruptive.
> > Perhaps it could be annotated as @Internal - I'm not sure exactly what
> > guarantees we try and give for the stability of the
> > HighAvailabilityServices interface.
> >
> > Kind regards,
> > Mika
> >
> > On 26.11.2021 18:28, Till Rohrmann wrote:
> > >Thanks for creating this FLIP Matthias, Mika and David.
> > >
> > >I think the JobResultStore is an important piece for fixing Flink's last
> > >high-availability problem (afaik). Once we have this piece in place,
> users
> > >no longer risk to re-execute a successfully completed job.
> > >
> > >I have one comment concerning breaking interfaces:
> > >
> > >If we don't want to break interfaces, then we could keep the
> > >HighAvailabilityServices.getRunningJobsRegistry() method and add a
> default
> > >implementation for HighAvailabilityServices.getJobResultStore(). We
> could
> > >then deprecate the former method and then remove it in the subsequent
> > >release (1.16).
> > >
> > >Apart from that, +1 for the FLIP.
> > >
> > >Cheers,
> > >Till
> > >
> > >On Wed, Nov 17, 2021 at 6:05 PM David Morávek  wrote:
> > >
> > >> Hi everyone,
> > >>
> > >> Matthias, Mika and I want to start a discussion about introduction of
> a
> > new
> > >> Flink component, the *JobResultStore*.
> > >>
> > >> The main motivation is to address shortcomings of the
> > *RunningJobsRegistry*
> > >> and surpass it with the new component. These shortcomings have been
> > first
> > >> described in FLINK-11813 [1].
> > >>
> > >> This change should improve the overall stability of the JobManager's
> > >> components and address the race conditions in some of the fail over
> > >> scenarios during the job cleanup lifecycle.
> > >>
> > >> It should also help to ensure that Flink doesn't leave any uncleaned
> > >> resources behind.
> > >>
> > >> We've prepared a FLIP-194 [2], which outlines the design and reasoning
> > >> behind this new component.
> > >>
> > >> [1] https://issues.apache.org/jira/browse/FLINK-11813
> > >> [2]
> > >>
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=195726435
> > >>
> > >> We're looking forward for your feedback ;)
> > >>
> > >> Best,
> > >> Matthias, Mika and David
> > >>
> >
> > Mika Naylor
> > https://autophagy.io
> >
>


[jira] [Created] (FLINK-25121) Support ML Python API

2021-11-30 Thread Huang Xingbo (Jira)
Huang Xingbo created FLINK-25121:


 Summary: Support ML Python API
 Key: FLINK-25121
 URL: https://issues.apache.org/jira/browse/FLINK-25121
 Project: Flink
  Issue Type: New Feature
  Components: API / Python, Library / Machine Learning
Reporter: Huang Xingbo
Assignee: Huang Xingbo






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25120) Add many kinds of checks in ML Python API

2021-11-30 Thread Huang Xingbo (Jira)
Huang Xingbo created FLINK-25120:


 Summary: Add many kinds of checks in ML Python API
 Key: FLINK-25120
 URL: https://issues.apache.org/jira/browse/FLINK-25120
 Project: Flink
  Issue Type: New Feature
  Components: API / Python, Library / Machine Learning
Reporter: Huang Xingbo
Assignee: Huang Xingbo
 Fix For: 0.1.0


Add many kinds of checks in ML Python API. These checks include pytest, flask8 
and mypy.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25119) The name of freeSlots parameter in SlotSharingSlotAllocator#determineParallelism method is named incorrectly.

2021-11-30 Thread Cassie (Jira)
Cassie created FLINK-25119:
--

 Summary: The name of freeSlots parameter in 
SlotSharingSlotAllocator#determineParallelism method is named incorrectly.
 Key: FLINK-25119
 URL: https://issues.apache.org/jira/browse/FLINK-25119
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Coordination
Affects Versions: 1.13.3, 1.13.2, 1.13.1, 1.14.0, 1.13.0
Reporter: Cassie
 Attachments: image-2021-12-01-11-18-34-572.png

determineParallism方法中的第二个参数freeSlots,即为在slotpool中空闲的slot,但实际使用时,有的调用方传入的是slotpool中all
 slot(allocated slot and free slot),有的调用方传入的是slotpool中free 
slot,因此参数名直接定义成freeSlots,不够准确,影响对determineParallism逻辑的理解。

!image-2021-12-01-11-18-34-572.png!

 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: [ANNOUNCE] Open source of remote shuffle project for Flink batch data processing

2021-11-30 Thread Yingjie Cao
Hi Jing,

Great to hear that, collaborations and feedbacks are welcomed.

Best,
Yingjie

Jing Zhang  于2021年12月1日周三 上午10:34写道:

> Amazing!
> Remote shuffle service is an important improvement for batch data
> processing experience on Flink.
> It is also a strong requirement in our internal batch business, we would
> try it soon and give you feedback.
>
> Best,
> Jing Zhang
>
> Martijn Visser  于2021年12月1日周三 上午3:25写道:
>
> > Hi Yingjie,
> >
> > This is great, thanks for sharing. Will you also add it to
> > https://flink-packages.org/ ?
> >
> > Best regards,
> >
> > Martijn
> >
> > On Tue, 30 Nov 2021 at 17:31, Till Rohrmann 
> wrote:
> >
> > > Great news, Yingjie. Thanks a lot for sharing this information with the
> > > community and kudos to all the contributors of the external shuffle
> > service
> > > :-)
> > >
> > > Cheers,
> > > Till
> > >
> > > On Tue, Nov 30, 2021 at 2:32 PM Yingjie Cao 
> > > wrote:
> > >
> > > > Hi dev & users,
> > > >
> > > > We are happy to announce the open source of remote shuffle project
> [1]
> > > for
> > > > Flink. The project is originated in Alibaba and the main motivation
> is
> > to
> > > > improve batch data processing for both performance & stability and
> > > further
> > > > embrace cloud native. For more features about the project, please
> refer
> > > to
> > > > [1].
> > > >
> > > > Before going open source, the project has been used widely in
> > production
> > > > and it behaves well on both stability and performance. We hope you
> > enjoy
> > > > it. Collaborations and feedbacks are highly appreciated.
> > > >
> > > > Best,
> > > > Yingjie on behalf of all contributors
> > > >
> > > > [1] https://github.com/flink-extended/flink-remote-shuffle
> > > >
> > >
> >
>


Re: [ANNOUNCE] Open source of remote shuffle project for Flink batch data processing

2021-11-30 Thread Yingjie Cao
Hi Martijn,

Yes, we will add it to flink-packages soon.

Best,
Yingjie

Martijn Visser  于2021年12月1日周三 上午3:24写道:

> Hi Yingjie,
>
> This is great, thanks for sharing. Will you also add it to
> https://flink-packages.org/ ?
>
> Best regards,
>
> Martijn
>
> On Tue, 30 Nov 2021 at 17:31, Till Rohrmann  wrote:
>
> > Great news, Yingjie. Thanks a lot for sharing this information with the
> > community and kudos to all the contributors of the external shuffle
> service
> > :-)
> >
> > Cheers,
> > Till
> >
> > On Tue, Nov 30, 2021 at 2:32 PM Yingjie Cao 
> > wrote:
> >
> > > Hi dev & users,
> > >
> > > We are happy to announce the open source of remote shuffle project [1]
> > for
> > > Flink. The project is originated in Alibaba and the main motivation is
> to
> > > improve batch data processing for both performance & stability and
> > further
> > > embrace cloud native. For more features about the project, please refer
> > to
> > > [1].
> > >
> > > Before going open source, the project has been used widely in
> production
> > > and it behaves well on both stability and performance. We hope you
> enjoy
> > > it. Collaborations and feedbacks are highly appreciated.
> > >
> > > Best,
> > > Yingjie on behalf of all contributors
> > >
> > > [1] https://github.com/flink-extended/flink-remote-shuffle
> > >
> >
>


[jira] [Created] (FLINK-25118) Add vertex index as prefix in vertex name

2021-11-30 Thread Wenlong Lyu (Jira)
Wenlong Lyu created FLINK-25118:
---

 Summary: Add vertex index as prefix in vertex name
 Key: FLINK-25118
 URL: https://issues.apache.org/jira/browse/FLINK-25118
 Project: Flink
  Issue Type: Sub-task
Reporter: Wenlong Lyu






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25117) NoSuchMethodError getCatalog()

2021-11-30 Thread zzt (Jira)
zzt created FLINK-25117:
---

 Summary: NoSuchMethodError getCatalog()
 Key: FLINK-25117
 URL: https://issues.apache.org/jira/browse/FLINK-25117
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Client
Affects Versions: 1.13.3
 Environment: offical docker image,  flink:1.13.2-scala_2.12
Reporter: zzt


{code:java}

Flink SQL> insert into `wide_order` (`user_id`, `row_num`, `sum`)
> select `t`.`receiver_user_id`, `t`.`rowNum`, `t`.`total`
> from (select `t`.`receiver_user_id`, `t`.`total`, ROW_NUMBER() OVER (ORDER BY 
> total desc) as `rowNum`
>       from (select `order_view`.`receiver_user_id`, sum(`order_view`.`total`) 
> as `total`
>             from `order_view` where create_time > '2021-11-01 00:24:55.453'
>             group by `order_view`.`receiver_user_id`) `t`) `t`
> where `rowNum` <= 1;


Exception in thread "main" org.apache.flink.table.client.SqlClientException: 
Unexpected exception. This is a bug. Please consider filing an issue.
    at org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:201)
    at org.apache.flink.table.client.SqlClient.main(SqlClient.java:161)
Caused by: java.lang.NoSuchMethodError: 
org.apache.flink.table.catalog.CatalogManager$TableLookupResult.getCatalog()Ljava/util/Optional;
    at 
org.apache.flink.table.planner.catalog.DatabaseCalciteSchema.extractTableStats(DatabaseCalciteSchema.java:106)
    at 
org.apache.flink.table.planner.catalog.DatabaseCalciteSchema.getStatistic(DatabaseCalciteSchema.java:90)
    at 
org.apache.flink.table.planner.catalog.DatabaseCalciteSchema.lambda$getTable$0(DatabaseCalciteSchema.java:79)
    at java.util.Optional.map(Optional.java:215)
    at 
org.apache.flink.table.planner.catalog.DatabaseCalciteSchema.getTable(DatabaseCalciteSchema.java:74)
    at 
org.apache.calcite.jdbc.SimpleCalciteSchema.getImplicitTable(SimpleCalciteSchema.java:83)
    at org.apache.calcite.jdbc.CalciteSchema.getTable(CalciteSchema.java:289)
    at 
org.apache.calcite.sql.validate.SqlValidatorUtil.getTableEntryFrom(SqlValidatorUtil.java:1059)
    at 
org.apache.calcite.sql.validate.SqlValidatorUtil.getTableEntry(SqlValidatorUtil.java:1016)
    at 
org.apache.calcite.prepare.CalciteCatalogReader.getTable(CalciteCatalogReader.java:119)
    at 
org.apache.flink.table.planner.plan.FlinkCalciteCatalogReader.getTable(FlinkCalciteCatalogReader.java:86)
    at 
org.apache.flink.table.planner.calcite.PreValidateReWriter$.appendPartitionAndNullsProjects(PreValidateReWriter.scala:116)
    at 
org.apache.flink.table.planner.calcite.PreValidateReWriter.visit(PreValidateReWriter.scala:56)
    at 
org.apache.flink.table.planner.calcite.PreValidateReWriter.visit(PreValidateReWriter.scala:47)
    at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
    at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:113)
    at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:107)
    at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:205)
    at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101)
    at 
org.apache.flink.table.client.gateway.local.LocalExecutor.lambda$parseStatement$1(LocalExecutor.java:176)
    at 
org.apache.flink.table.client.gateway.context.ExecutionContext.wrapClassLoader(ExecutionContext.java:90)
    at 
org.apache.flink.table.client.gateway.local.LocalExecutor.parseStatement(LocalExecutor.java:176)
    at 
org.apache.flink.table.client.cli.CliClient.parseCommand(CliClient.java:385)
    at 
org.apache.flink.table.client.cli.CliClient.executeStatement(CliClient.java:326)
    at 
org.apache.flink.table.client.cli.CliClient.executeInteractive(CliClient.java:297)
    at 
org.apache.flink.table.client.cli.CliClient.executeInInteractiveMode(CliClient.java:221)
    at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:151)
    at org.apache.flink.table.client.SqlClient.start(SqlClient.java:95)
    at org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:187)
    ... 1 more
Shutting down the session...
done. {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: [ANNOUNCE] Open source of remote shuffle project for Flink batch data processing

2021-11-30 Thread Jingsong Li
Amazing!

Thanks Yingjie and all contributors for your great work.

Best,
Jingsong

On Wed, Dec 1, 2021 at 10:52 AM Yun Tang  wrote:
>
> Great news!
> Thanks for all the guys who contributed in this project.
>
> Best
> Yun Tang
>
> On 2021/11/30 16:30:52 Till Rohrmann wrote:
> > Great news, Yingjie. Thanks a lot for sharing this information with the
> > community and kudos to all the contributors of the external shuffle service
> > :-)
> >
> > Cheers,
> > Till
> >
> > On Tue, Nov 30, 2021 at 2:32 PM Yingjie Cao  wrote:
> >
> > > Hi dev & users,
> > >
> > > We are happy to announce the open source of remote shuffle project [1] for
> > > Flink. The project is originated in Alibaba and the main motivation is to
> > > improve batch data processing for both performance & stability and further
> > > embrace cloud native. For more features about the project, please refer to
> > > [1].
> > >
> > > Before going open source, the project has been used widely in production
> > > and it behaves well on both stability and performance. We hope you enjoy
> > > it. Collaborations and feedbacks are highly appreciated.
> > >
> > > Best,
> > > Yingjie on behalf of all contributors
> > >
> > > [1] https://github.com/flink-extended/flink-remote-shuffle
> > >
> >



-- 
Best, Jingsong Lee


[jira] [Created] (FLINK-25116) Fabric8FlinkKubeClientITCase hangs on Azure

2021-11-30 Thread Yun Tang (Jira)
Yun Tang created FLINK-25116:


 Summary: Fabric8FlinkKubeClientITCase hangs on Azure
 Key: FLINK-25116
 URL: https://issues.apache.org/jira/browse/FLINK-25116
 Project: Flink
  Issue Type: Bug
  Components: Deployment / Kubernetes, Tests
Reporter: Yun Tang


Instance: 
[https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=27208=logs=bea52777-eaf8-5663-8482-18fbc3630e81=b2642e3a-5b86-574d-4c8a-f7e2842bfb14]

 
{code:java}
2021-11-29T13:18:56.6420610Z Nov 29 13:18:56 Invoking mvn with 
'/home/vsts/maven_cache/apache-maven-3.2.5/bin/mvn 
-Dmaven.repo.local=/home/vsts/work/1/.m2/repository 
-Dmaven.wagon.http.pool=false -Dorg.slf4j.simpleLogger.showDateTime=true 
-Dorg.slf4j.simpleLogger.dateTimeFormat=HH:mm:ss.SSS 
-Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn
 --no-snapshot-updates -B -Dhadoop.version=2.8.3 -Dinclude_hadoop_aws 
-Dscala-2.12  --settings 
/home/vsts/work/1/s/tools/ci/google-mirror-settings.xml  test 
-Dlog.dir=/home/vsts/work/_temp/debug_files 
-Dlog4j.configurationFile=file:///home/vsts/work/1/s/flink-end-to-end-tests/../tools/ci/log4j.properties
 -Dtest=org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClientITCase'

2021-11-29T13:19:16.0638794Z Nov 29 13:19:16 [INFO] --- 
maven-surefire-plugin:3.0.0-M5:test (default-test) @ flink-kubernetes ---
2021-11-29T17:10:39.7133994Z 
==
2021-11-29T17:10:39.7134714Z === WARNING: This task took already 95% of the 
available time budget of 282 minutes === {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: [ANNOUNCE] Open source of remote shuffle project for Flink batch data processing

2021-11-30 Thread Jing Zhang
Amazing!
Remote shuffle service is an important improvement for batch data
processing experience on Flink.
It is also a strong requirement in our internal batch business, we would
try it soon and give you feedback.

Best,
Jing Zhang

Martijn Visser  于2021年12月1日周三 上午3:25写道:

> Hi Yingjie,
>
> This is great, thanks for sharing. Will you also add it to
> https://flink-packages.org/ ?
>
> Best regards,
>
> Martijn
>
> On Tue, 30 Nov 2021 at 17:31, Till Rohrmann  wrote:
>
> > Great news, Yingjie. Thanks a lot for sharing this information with the
> > community and kudos to all the contributors of the external shuffle
> service
> > :-)
> >
> > Cheers,
> > Till
> >
> > On Tue, Nov 30, 2021 at 2:32 PM Yingjie Cao 
> > wrote:
> >
> > > Hi dev & users,
> > >
> > > We are happy to announce the open source of remote shuffle project [1]
> > for
> > > Flink. The project is originated in Alibaba and the main motivation is
> to
> > > improve batch data processing for both performance & stability and
> > further
> > > embrace cloud native. For more features about the project, please refer
> > to
> > > [1].
> > >
> > > Before going open source, the project has been used widely in
> production
> > > and it behaves well on both stability and performance. We hope you
> enjoy
> > > it. Collaborations and feedbacks are highly appreciated.
> > >
> > > Best,
> > > Yingjie on behalf of all contributors
> > >
> > > [1] https://github.com/flink-extended/flink-remote-shuffle
> > >
> >
>


Re: [DISCUSS] Deprecate Java 8 support

2021-11-30 Thread wenlong.lwl
hi, @Chesnay Schepler  would you explain more about
what would happen when deprecating Java 8, but still support it. IMO, if we
still generate packages based on Java 8 which seems to be a  consensus, we
still can not take the advantages you mentioned even if we announce that
Java 8 support is deprecated.


Best,
Wenlong

On Mon, 29 Nov 2021 at 17:22, Marios Trivyzas  wrote:

> +1 from me as well on the Java 8 deprecation!
> It's important to make the users aware, and "force" them but also the
> communities of other
> related projects (like the aforementioned Hive) to start preparing for the
> future drop of Java 8
> support and the upgrade to the recent stable versions.
>
>
> On Sun, Nov 28, 2021 at 11:15 PM Thomas Weise  wrote:
>
> > +1 for Java 8 deprecation. It's an important signal for users and we
> > need to give sufficient time to adopt. Thanks Chesnay for starting the
> > discussion! Maybe user@ can be included into this discussion?
> >
> > Thomas
> >
> >
> > On Fri, Nov 26, 2021 at 6:49 AM Becket Qin  wrote:
> > >
> > > Thanks for raising the discussion, Chesnay.
> > >
> > > I think it is OK to deprecate Java 8 to let the users know that Java 11
> > > migration should be put into the schedule. However, According to some
> of
> > > the statistics of the Java version adoption[1], a large number of users
> > are
> > > still using Java 8 in production. I doubt that Java 8 users will drop
> to
> > a
> > > negligible amount within the next 2 - 3 Flink releases. I would suggest
> > > making the time to drop Java 8 support flexible.
> > >
> > > Thanks,
> > >
> > > Jiangjie (Becket) Qin
> > >
> > > [1] https://www.infoq.com/news/2021/07/snyk-jvm-2021/
> > >
> > > On Fri, Nov 26, 2021 at 5:09 AM Till Rohrmann 
> > wrote:
> > >
> > > > +1 for the deprecation and reaching out to the user ML to ask for
> > feedback
> > > > from our users. Thanks for driving this Chesnay!
> > > >
> > > > Cheers,
> > > > Till
> > > >
> > > > On Thu, Nov 25, 2021 at 10:15 AM Roman Khachatryan  >
> > > > wrote:
> > > >
> > > > > The situation is probably a bit different now compared to the
> > previous
> > > > > upgrade: some users might be using Amazon Coretto (or other builds)
> > > > > which have longer support.
> > > > >
> > > > > Still +1 for deprecation to trigger migration, and thanks for
> > bringing
> > > > > this up!
> > > > >
> > > > > Regards,
> > > > > Roman
> > > > >
> > > > > On Thu, Nov 25, 2021 at 10:09 AM Arvid Heise 
> > wrote:
> > > > > >
> > > > > > +1 to deprecate Java 8, so we can hopefully incorporate the
> module
> > > > > concept
> > > > > > in Flink.
> > > > > >
> > > > > > On Thu, Nov 25, 2021 at 9:49 AM Chesnay Schepler <
> > ches...@apache.org>
> > > > > wrote:
> > > > > >
> > > > > > > Users can already use APIs from Java 8/11.
> > > > > > >
> > > > > > > On 25/11/2021 09:35, Francesco Guardiani wrote:
> > > > > > > > +1 with what both Ingo and Matthias sad, personally, I cannot
> > wait
> > > > to
> > > > > > > start using some of
> > > > > > > > the APIs introduced in Java 9. And I'm pretty sure that's the
> > same
> > > > > for
> > > > > > > our users as well.
> > > > > > > >
> > > > > > > > On Tuesday, 23 November 2021 13:35:07 CET Ingo Bürk wrote:
> > > > > > > >> Hi everyone,
> > > > > > > >>
> > > > > > > >> continued support for Java 8 can also create project risks,
> > e.g.
> > > > if
> > > > > a
> > > > > > > >> vulnerability arises in Flink's dependencies and we cannot
> > upgrade
> > > > > them
> > > > > > > >> because they no longer support Java 8. Some projects already
> > > > started
> > > > > > > >> deprecating support as well, like Kafka, and other projects
> > will
> > > > > likely
> > > > > > > >> follow.
> > > > > > > >> Let's also keep in mind that the proposal here is not to
> drop
> > > > > support
> > > > > > > right
> > > > > > > >> away, but to deprecate it, send the message, and motivate
> > users to
> > > > > start
> > > > > > > >> migrating. Delaying this process could ironically mean users
> > have
> > > > > less
> > > > > > > time
> > > > > > > >> to prepare for it.
> > > > > > > >>
> > > > > > > >>
> > > > > > > >> Ingo
> > > > > > > >>
> > > > > > > >> On Tue, Nov 23, 2021 at 8:54 AM Matthias Pohl <
> > > > > matth...@ververica.com>
> > > > > > > >>
> > > > > > > >> wrote:
> > > > > > > >>> Thanks for constantly driving these maintenance topics,
> > Chesnay.
> > > > +1
> > > > > > > from
> > > > > > > >>> my
> > > > > > > >>> side for deprecating Java 8. I see the point Jingsong is
> > raising.
> > > > > But I
> > > > > > > >>> agree with what David already said here. Deprecating the
> Java
> > > > > version
> > > > > > > is a
> > > > > > > >>> tool to make users aware of it (same as starting this
> > discussion
> > > > > > > thread).
> > > > > > > >>> If there's no major opposition against deprecating it in
> the
> > > > > community
> > > > > > > we
> > > > > > > >>> should move forward in this regard to make the users who do
> > not
> > > > > > > >>> regularly 

Re: [ANNOUNCE] Open source of remote shuffle project for Flink batch data processing

2021-11-30 Thread Martijn Visser
Hi Yingjie,

This is great, thanks for sharing. Will you also add it to
https://flink-packages.org/ ?

Best regards,

Martijn

On Tue, 30 Nov 2021 at 17:31, Till Rohrmann  wrote:

> Great news, Yingjie. Thanks a lot for sharing this information with the
> community and kudos to all the contributors of the external shuffle service
> :-)
>
> Cheers,
> Till
>
> On Tue, Nov 30, 2021 at 2:32 PM Yingjie Cao 
> wrote:
>
> > Hi dev & users,
> >
> > We are happy to announce the open source of remote shuffle project [1]
> for
> > Flink. The project is originated in Alibaba and the main motivation is to
> > improve batch data processing for both performance & stability and
> further
> > embrace cloud native. For more features about the project, please refer
> to
> > [1].
> >
> > Before going open source, the project has been used widely in production
> > and it behaves well on both stability and performance. We hope you enjoy
> > it. Collaborations and feedbacks are highly appreciated.
> >
> > Best,
> > Yingjie on behalf of all contributors
> >
> > [1] https://github.com/flink-extended/flink-remote-shuffle
> >
>


Re: [VOTE] FLIP-175: Compose Estimator/Model/AlgoOperator from DAG of Estimator/Model/AlgoOperator

2021-11-30 Thread Dong Lin
Thanks everyone for your votes!

The proposal has passed with the following votes:

+1 (Binding): 3 (Jiangjie, Yun, Dian)
+1 (Non-binding): 0
-1: 0

Cheers,
Dong


On Tue, Nov 30, 2021 at 6:21 PM Dian Fu  wrote:

> +1
>
> Regards,
> Dian
>
> On Tue, Nov 30, 2021 at 12:08 PM Yun Gao 
> wrote:
>
> > +1 for the new GraphBuilder API supports more complex
> > estimator / model structure and helps to reduce the repeat
> > code. Thanks Dong for the proposal.
> >
> > Best,
> > Yun
> >
> >
> > --
> > From:Becket Qin 
> > Send Time:2021 Nov. 22 (Mon.) 11:45
> > To:dev 
> > Subject:Re: [VOTE] FLIP-175: Compose Estimator/Model/AlgoOperator from
> DAG
> > of Estimator/Model/AlgoOperator
> >
> > +1 on the latest proposal. Thanks for the writeup, Dong.
> >
> > The GraphBuilder API is useful and also relatively more complicated than
> > the Pipeline API. It would probably take some time for the users to get
> > familiar with the usage. Good documentation would be really helpful here.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> > On Fri, Oct 15, 2021 at 2:11 PM Dong Lin  wrote:
> >
> > > Hi all,
> > >
> > > We would like to start the vote for FLIP-175: Compose
> > > Estimator/Model/AlgoOperator from DAG of Estimator/Model/AlgoOperator
> > [1].
> > > This FLIP was discussed in this thread [2].
> > >
> > > With this FLIP, users can compose an Estimator from a DAG of
> > > Estimator/Model/AlgoOperator by describing this DAG just once without
> > > having to separately describe the DAG for the Model fitted by this
> > > Estimator.
> > >
> > > The vote will be open for at least 72 hours, following the consensus
> > voting
> > > process.
> > >
> > > Thanks!
> > > Dong Lin and Zhipeng Zhang
> > >
> > > [1]
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181311363
> > > [2]
> > >
> > >
> >
> https://lists.apache.org/thread.html/r3bb7c2c145635f98ae22c5a917c7f0ce80265dd00ffecd754d8bedf8%40%3Cdev.flink.apache.org%3E
> > >
> >
>


Re: [ANNOUNCE] Open source of remote shuffle project for Flink batch data processing

2021-11-30 Thread Till Rohrmann
Great news, Yingjie. Thanks a lot for sharing this information with the
community and kudos to all the contributors of the external shuffle service
:-)

Cheers,
Till

On Tue, Nov 30, 2021 at 2:32 PM Yingjie Cao  wrote:

> Hi dev & users,
>
> We are happy to announce the open source of remote shuffle project [1] for
> Flink. The project is originated in Alibaba and the main motivation is to
> improve batch data processing for both performance & stability and further
> embrace cloud native. For more features about the project, please refer to
> [1].
>
> Before going open source, the project has been used widely in production
> and it behaves well on both stability and performance. We hope you enjoy
> it. Collaborations and feedbacks are highly appreciated.
>
> Best,
> Yingjie on behalf of all contributors
>
> [1] https://github.com/flink-extended/flink-remote-shuffle
>


Re: [DISCUSS] FLIP-194: Introduce the JobResultStore

2021-11-30 Thread Till Rohrmann
Given the other breaking changes, I think that it is ok to remove the
`RunningJobsRegistry` completely.

Since we allow users to specify a HighAvailabilityServices implementation
when starting Flink via `high-availability: FQDN`, I think we should mark
the interface at least @Experimental.

Cheers,
Till

On Tue, Nov 30, 2021 at 2:29 PM Mika Naylor  wrote:

> Hi Till,
>
> We thought that breaking interfaces, specifically
> HighAvailabilityServices and RunningJobsRegistry, was acceptable in this
> instance because:
>
> - Neither of these interfaces are marked @Public and so carry no
>guarantees about being public and stable.
> - As far as we are aware, we currently have no users with custom
>HighAvailabilityServices implementations.
> - The interface was already broken in 1.14 with the changes to
>CheckpointRecoveryFactory, and will likely be changed again in 1.15
>due to further changes in that factory.
>
> Given that, we thought changes to the interface would not be disruptive.
> Perhaps it could be annotated as @Internal - I'm not sure exactly what
> guarantees we try and give for the stability of the
> HighAvailabilityServices interface.
>
> Kind regards,
> Mika
>
> On 26.11.2021 18:28, Till Rohrmann wrote:
> >Thanks for creating this FLIP Matthias, Mika and David.
> >
> >I think the JobResultStore is an important piece for fixing Flink's last
> >high-availability problem (afaik). Once we have this piece in place, users
> >no longer risk to re-execute a successfully completed job.
> >
> >I have one comment concerning breaking interfaces:
> >
> >If we don't want to break interfaces, then we could keep the
> >HighAvailabilityServices.getRunningJobsRegistry() method and add a default
> >implementation for HighAvailabilityServices.getJobResultStore(). We could
> >then deprecate the former method and then remove it in the subsequent
> >release (1.16).
> >
> >Apart from that, +1 for the FLIP.
> >
> >Cheers,
> >Till
> >
> >On Wed, Nov 17, 2021 at 6:05 PM David Morávek  wrote:
> >
> >> Hi everyone,
> >>
> >> Matthias, Mika and I want to start a discussion about introduction of a
> new
> >> Flink component, the *JobResultStore*.
> >>
> >> The main motivation is to address shortcomings of the
> *RunningJobsRegistry*
> >> and surpass it with the new component. These shortcomings have been
> first
> >> described in FLINK-11813 [1].
> >>
> >> This change should improve the overall stability of the JobManager's
> >> components and address the race conditions in some of the fail over
> >> scenarios during the job cleanup lifecycle.
> >>
> >> It should also help to ensure that Flink doesn't leave any uncleaned
> >> resources behind.
> >>
> >> We've prepared a FLIP-194 [2], which outlines the design and reasoning
> >> behind this new component.
> >>
> >> [1] https://issues.apache.org/jira/browse/FLINK-11813
> >> [2]
> >>
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=195726435
> >>
> >> We're looking forward for your feedback ;)
> >>
> >> Best,
> >> Matthias, Mika and David
> >>
>
> Mika Naylor
> https://autophagy.io
>


[jira] [Created] (FLINK-25115) Why Flink Sink operator metric numRecordsOut and numRecordsOutPerSecond always equal 0

2021-11-30 Thread hjw (Jira)
hjw created FLINK-25115:
---

 Summary: Why Flink Sink operator metric numRecordsOut and 
numRecordsOutPerSecond always equal 0
 Key: FLINK-25115
 URL: https://issues.apache.org/jira/browse/FLINK-25115
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.13.2
Reporter: hjw
 Attachments: image-2021-11-30-23-56-26-222.png

I submit a Flink-sql job .I found that the numRecordsOut  and 
numRecordsOutPerSecond  indicators are always 0.

 

!image-2021-11-30-23-56-26-222.png!



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25114) Remove flink-scala dependency from flink-table-runtime

2021-11-30 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-25114:
---

 Summary: Remove flink-scala dependency from flink-table-runtime
 Key: FLINK-25114
 URL: https://issues.apache.org/jira/browse/FLINK-25114
 Project: Flink
  Issue Type: Sub-task
Reporter: Francesco Guardiani
Assignee: Francesco Guardiani


flink-scala should not be necessary anymore to flink-table-runtime. We should 
try to remove it in order to help with the parent task 
[FLINK-24427|https://issues.apache.org/jira/browse/FLINK-24427]



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[ANNOUNCE] Open source of remote shuffle project for Flink batch data processing

2021-11-30 Thread Yingjie Cao
Hi dev & users,

We are happy to announce the open source of remote shuffle project [1] for
Flink. The project is originated in Alibaba and the main motivation is to
improve batch data processing for both performance & stability and further
embrace cloud native. For more features about the project, please refer to
[1].

Before going open source, the project has been used widely in production
and it behaves well on both stability and performance. We hope you enjoy
it. Collaborations and feedbacks are highly appreciated.

Best,
Yingjie on behalf of all contributors

[1] https://github.com/flink-extended/flink-remote-shuffle


Re: [DISCUSS] FLIP-190: Support Version Upgrades for Table API & SQL Programs

2021-11-30 Thread Timo Walther

Response to Wenlongs's feedback:

> I would prefer not to provide such a shortcut, let users use  COMPILE 
PLAN IF NOT EXISTS and EXECUTE explicitly, which can be understood by 
new users even without inferring the docs.


I would like to hear more opinions on this topic. Personally, I find a 
combined statement very useful. Not only for quicker development and 
debugging but also for readability. It helps in keeping the JSON path 
and the query close to each other in order to know the origin of the plan.


> but the plan and SQL are not matched. The result would be quite 
confusing if we still execute the plan directly, we may need to add a 
validation.


You are right that there could be a mismatch. But we have a similar 
problem when executing CREATE TABLE IF NOT EXISTS. The schema or options 
of a table could have changed completely in the catalog but the CREATE 
TABLE IF NOT EXISTS is not executed again. So a mismatch could also 
occur there.


Regards,
Timo

On 30.11.21 14:17, Timo Walther wrote:

Hi everyone,

thanks for the feedback so far. Let me answer each email indvidually.

I will start with a response to Ingo's feedback:

 > Will the JSON plan's schema be considered an API?

No, not in the first version. This is explicitly mentioned in the 
`General JSON Plan Assumptions`. I tried to improve the section once 
more to make it clearer. However, the JSON plan is definitely stable per 
minor version. And since the plan is versioned by Flink version, 
external tooling could be build around it. We might make it public API 
once the design has settled.


 > Given that upgrades across multiple versions at once are unsupported, 
do we verify this somehow?


Good question. I extended the `General JSON Plan Assumptions`. Now yes: 
the Flink version is part of the JSON plan and will be verified during 
restore. But keep in mind that we might support more that just the last 
version at least until the JSON plan has been migrated.


Regards,
Timo

On 30.11.21 09:39, Marios Trivyzas wrote:
I have a question regarding the `COMPILE PLAN OVEWRITE`. If we choose 
to go

with the config option instead,
that doesn't provide the flexibility to overwrite certain plans but not
others, since the config applies globally, isn't that
something to consider?

On Mon, Nov 29, 2021 at 10:15 AM Marios Trivyzas  
wrote:



Hi Timo!

Thanks a lot for taking all that time and effort to put together this
proposal!

Regarding:
For simplification of the design, we assume that upgrades use a step 
size

of a single
minor version. We don't guarantee skipping minor versions (e.g. 1.11 to
1.14).

I think that for this first step we should make it absolutely clear 
to the

users that they would need to go through all intermediate versions
to end up with the target version they wish. If we are to support 
skipping

versions in the future, i.e. upgrade from 1.14 to 1.17, this means
that we need to have a testing infrastructure in place that would 
test all

possible combinations of version upgrades, i.e. from 1.14 to 1.15,
from 1.14 to 1.16 and so forth, while still testing and of course
supporting all the upgrades from the previous minor version.

I like a lot the idea of introducing HINTS to define some behaviour 
in the

programs!
- the hints live together with the sql statements and consequently the
(JSON) plans.
- If multiple queries are involved in a program, each one of them can
define its own config (regarding plan optimisation, not null 
enforcement,

etc)

I agree with Francesco on his argument regarding the *JSON* plan. I
believe we should already provide flexibility here, since (who knows) in
the future
a JSON plan might not fulfil the desired functionality.

I also agree that we need some very obvious way (i.e. not log entry) to
show the users that their program doesn't support version upgrades, and
prevent them from being negatively surprised in the future, when 
trying to

upgrade their production pipelines.

This is an implementation detail, but I'd like to add that there 
should be

some good logging in place when the upgrade is taking place, to be
able to track every restoration action, and help debug any potential
issues arising from that.





On Fri, Nov 26, 2021 at 2:54 PM Till Rohrmann 
wrote:

Thanks for writing this FLIP Timo. I think this will be a very 
important

improvement for Flink and our SQL user :-)

Similar to Francesco I would like to understand the statement


For simplification of the design, we assume that upgrades use a step

size
of a single
minor version. We don't guarantee skipping minor versions (e.g. 1.11 to
1.14).

a bit better. Is it because Flink does not guarantee that a savepoint
created by version 1.x can be directly recovered by version 1.y with 
x + 1
< y but users might have to go through a cascade of upgrades? From 
how I
understand your proposal, the compiled plan won't be changed after 
being

written initially. Hence, I would assume that for the plan alone Flink
will
have to 

Re: [DISCUSS] FLIP-194: Introduce the JobResultStore

2021-11-30 Thread Mika Naylor

Hi Till,

We thought that breaking interfaces, specifically
HighAvailabilityServices and RunningJobsRegistry, was acceptable in this
instance because:

- Neither of these interfaces are marked @Public and so carry no
  guarantees about being public and stable.
- As far as we are aware, we currently have no users with custom
  HighAvailabilityServices implementations.
- The interface was already broken in 1.14 with the changes to
  CheckpointRecoveryFactory, and will likely be changed again in 1.15
  due to further changes in that factory.

Given that, we thought changes to the interface would not be disruptive.
Perhaps it could be annotated as @Internal - I'm not sure exactly what
guarantees we try and give for the stability of the
HighAvailabilityServices interface.

Kind regards,
Mika

On 26.11.2021 18:28, Till Rohrmann wrote:

Thanks for creating this FLIP Matthias, Mika and David.

I think the JobResultStore is an important piece for fixing Flink's last
high-availability problem (afaik). Once we have this piece in place, users
no longer risk to re-execute a successfully completed job.

I have one comment concerning breaking interfaces:

If we don't want to break interfaces, then we could keep the
HighAvailabilityServices.getRunningJobsRegistry() method and add a default
implementation for HighAvailabilityServices.getJobResultStore(). We could
then deprecate the former method and then remove it in the subsequent
release (1.16).

Apart from that, +1 for the FLIP.

Cheers,
Till

On Wed, Nov 17, 2021 at 6:05 PM David Morávek  wrote:


Hi everyone,

Matthias, Mika and I want to start a discussion about introduction of a new
Flink component, the *JobResultStore*.

The main motivation is to address shortcomings of the *RunningJobsRegistry*
and surpass it with the new component. These shortcomings have been first
described in FLINK-11813 [1].

This change should improve the overall stability of the JobManager's
components and address the race conditions in some of the fail over
scenarios during the job cleanup lifecycle.

It should also help to ensure that Flink doesn't leave any uncleaned
resources behind.

We've prepared a FLIP-194 [2], which outlines the design and reasoning
behind this new component.

[1] https://issues.apache.org/jira/browse/FLINK-11813
[2]
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=195726435

We're looking forward for your feedback ;)

Best,
Matthias, Mika and David



Mika Naylor
https://autophagy.io


Re: [DISCUSS] FLIP-190: Support Version Upgrades for Table API & SQL Programs

2021-11-30 Thread Timo Walther

Hi everyone,

thanks for the feedback so far. Let me answer each email indvidually.

I will start with a response to Ingo's feedback:

> Will the JSON plan's schema be considered an API?

No, not in the first version. This is explicitly mentioned in the 
`General JSON Plan Assumptions`. I tried to improve the section once 
more to make it clearer. However, the JSON plan is definitely stable per 
minor version. And since the plan is versioned by Flink version, 
external tooling could be build around it. We might make it public API 
once the design has settled.


> Given that upgrades across multiple versions at once are unsupported, 
do we verify this somehow?


Good question. I extended the `General JSON Plan Assumptions`. Now yes: 
the Flink version is part of the JSON plan and will be verified during 
restore. But keep in mind that we might support more that just the last 
version at least until the JSON plan has been migrated.


Regards,
Timo

On 30.11.21 09:39, Marios Trivyzas wrote:

I have a question regarding the `COMPILE PLAN OVEWRITE`. If we choose to go
with the config option instead,
that doesn't provide the flexibility to overwrite certain plans but not
others, since the config applies globally, isn't that
something to consider?

On Mon, Nov 29, 2021 at 10:15 AM Marios Trivyzas  wrote:


Hi Timo!

Thanks a lot for taking all that time and effort to put together this
proposal!

Regarding:

For simplification of the design, we assume that upgrades use a step size

of a single
minor version. We don't guarantee skipping minor versions (e.g. 1.11 to
1.14).

I think that for this first step we should make it absolutely clear to the
users that they would need to go through all intermediate versions
to end up with the target version they wish. If we are to support skipping
versions in the future, i.e. upgrade from 1.14 to 1.17, this means
that we need to have a testing infrastructure in place that would test all
possible combinations of version upgrades, i.e. from 1.14 to 1.15,
from 1.14 to 1.16 and so forth, while still testing and of course
supporting all the upgrades from the previous minor version.

I like a lot the idea of introducing HINTS to define some behaviour in the
programs!
- the hints live together with the sql statements and consequently the
(JSON) plans.
- If multiple queries are involved in a program, each one of them can
define its own config (regarding plan optimisation, not null enforcement,
etc)

I agree with Francesco on his argument regarding the *JSON* plan. I
believe we should already provide flexibility here, since (who knows) in
the future
a JSON plan might not fulfil the desired functionality.

I also agree that we need some very obvious way (i.e. not log entry) to
show the users that their program doesn't support version upgrades, and
prevent them from being negatively surprised in the future, when trying to
upgrade their production pipelines.

This is an implementation detail, but I'd like to add that there should be
some good logging in place when the upgrade is taking place, to be
able to track every restoration action, and help debug any potential
issues arising from that.





On Fri, Nov 26, 2021 at 2:54 PM Till Rohrmann 
wrote:


Thanks for writing this FLIP Timo. I think this will be a very important
improvement for Flink and our SQL user :-)

Similar to Francesco I would like to understand the statement


For simplification of the design, we assume that upgrades use a step

size
of a single
minor version. We don't guarantee skipping minor versions (e.g. 1.11 to
1.14).

a bit better. Is it because Flink does not guarantee that a savepoint
created by version 1.x can be directly recovered by version 1.y with x + 1
< y but users might have to go through a cascade of upgrades? From how I
understand your proposal, the compiled plan won't be changed after being
written initially. Hence, I would assume that for the plan alone Flink
will
have to give backwards compatibility guarantees for all versions. Am I
understanding this part correctly?

Cheers,
Till

On Thu, Nov 25, 2021 at 4:55 PM Francesco Guardiani <
france...@ververica.com>
wrote:


Hi Timo,

Thanks for putting this amazing work together, I have some
considerations/questions
about the FLIP:
*Proposed changes #6*: Other than defining this rule of thumb, we must
also make sure
that compiling plans with these objects that cannot be serialized in the
plan must fail hard,
so users don't bite themselves with such issues, or at least we need to
output warning
logs. In general, whenever the user is trying to use the CompiledPlan

APIs

and at the same
time, they're trying to do something "illegal" for the plan, we should
immediately either
log or fail depending on the issue, in order to avoid any surprises once
the user upgrades.
I would also say the same for things like registering a function,
registering a DataStream,
and for every other thing which won't end up in the plan, we should log
such info to the
user by 

[jira] [Created] (FLINK-25113) Cleanup from Parquet and Orc the partition key handling logic

2021-11-30 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-25113:
---

 Summary: Cleanup from Parquet and Orc the partition key handling 
logic
 Key: FLINK-25113
 URL: https://issues.apache.org/jira/browse/FLINK-25113
 Project: Flink
  Issue Type: Technical Debt
  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
Reporter: Francesco Guardiani


After https://issues.apache.org/jira/browse/FLINK-24617 the partition key 
handling logic is encapsuled within {{FileInfoExtractorBulkFormat}}. We should 
cleanup this logic from orc and parquet formats, in order to simplify it. Note: 
Hive still depends on this logic, but it should rather use 
{{FileInfoExtractorBulkFormat}} or similar.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25112) Remove TTL from e2e cache

2021-11-30 Thread Chesnay Schepler (Jira)
Chesnay Schepler created FLINK-25112:


 Summary: Remove TTL from e2e cache
 Key: FLINK-25112
 URL: https://issues.apache.org/jira/browse/FLINK-25112
 Project: Flink
  Issue Type: Technical Debt
  Components: Test Infrastructure
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler
 Fix For: 1.15.0, 1.14.1, 1.13.4


The e2e cache for java tests is currently configured to ignore entries if they 
are 3 months old.
However, because existing caches are immutable in Azure, this means that if 
something was put into the cache, and remained valid for 3 months (e.g., 
because no further java e2e test was added), then this will result in every run 
ignore the cached contents.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: [DISCUSS] FLIP-194: Introduce the JobResultStore

2021-11-30 Thread David Morávek
Hi Xintong,

However, it's probably not so good for users who don't need such
> retrieval and already used a ZooKeeper/Native-Kubernetes HA to specify
> another remote FS path for storing job results, even if they are
> automatically cleaned-up on committed.
>

Users of ZK / k8s HA are forced to use a remote FS as well.
*DefaultJobGraphStore* and *DefaultCompletedCheckpoint* store are both
implemented using *StateHandleStore*. This means that they are only storing
pointers in metadata store (ZK / ConfigMap) to the actual blobs that are
stored on DFS (these can be quite bulky). The reasoning behind this dates
back to early days when S3 was an eventually consistent FS and didn't
provide strong read-after-write consistency and we needed a consistent view
of the state. We've already started discussion with Till that we might want
to get rid of the metadata store for both of these in near future as it's
not longer needed with modern filesystems and is a source of various race
conditions that are hard to reason about.

We can reuse already configured "high-availability.storageDir" for storing
the results.

Best,
D.

On Tue, Nov 30, 2021 at 11:14 AM Xintong Song  wrote:

> Thanks for the explanations, Matthias.
>
> Including JobResultStore in HighAvailabilityServices as a replacement of
> RunningJobRegistry makes sense to me. And initializing JobResultStore in
> the same way initializing JobGraphStore also sounds good.
>
> I have another question concerning where to persist the job results. The
> FLIP proposes two implementations of JobResultStore: In-Memory and
> FileSystem. I'm wondering if we should by default persist the results in HA
> services (i.e., ZooKeeper or Kubernetes ConfigMap) when enabled. This is
> how RunningJobRegistry and JobGraphStore persist things currently.
>
> IIUC, the reason we want a FileSystemJobResultStore is that it allows not
> only the dispatcher but also a 3rd party to retrieve the result after the
> job finishes, making scenarios like multi-stage application mode possible.
> However, it's probably not so good for users who don't need such
> retrieval and already used a ZooKeeper/Native-Kubernetes HA to specify
> another remote FS path for storing job results, even if they are
> automatically cleaned-up on committed.
>
> Maybe we can use the HA storage by default, and make the FileSystem opt-in.
> WDYT?
>
> Thank you~
>
> Xintong Song
>
>
>
> On Tue, Nov 30, 2021 at 5:35 PM Matthias Pohl 
> wrote:
>
> > Hi Xintong,
> > your observation is correct. We probably didn't address this in the FLIP
> > explicitly enough. We planned to include it in the
> HighAvailabilityServices
> > analogously to the RunningJobRegistry (and replace the RunningJobRegistry
> > by the JobResultStore in the end).
> >
> > One additional thing, I want to point out: The JobResultStore and the
> > JobGraphStore have closely-related lifecycles. Both provide information
> for
> > the Dispatcher's initialization. Therefore, we're planning to initialize
> > the JobResultStore in the same way, the JobGraphStore is initialized in
> > the DispatcherLeaderProcessFactory and referenced
> > in DispatcherLeaderProcess. The Dispatcher will get the information about
> > recovered JobGraphs (as it's currently done on master) and the JobResult
> of
> > globally-terminated jobs that are still marked as "dirty" by the
> > JobResultStore. The cleanup based on the latter ones will then happen in
> > the Dispatcher.
> >
> > Matthias
> >
> > On Tue, Nov 30, 2021 at 3:28 AM Xintong Song 
> > wrote:
> >
> >> Thanks David, Matthias and Mika,
> >>
> >> I like this FLIP in the way it handles potential re-execution and
> >> resource leaks due to clean-up failures.
> >>
> >> I have one question: Why is this JobResultStore not part of the high
> >> availability services? Or ask differently, are there cases that we only
> >> need the HA services but not JobResultStore, or vice versa?
> >>
> >> Thank you~
> >>
> >> Xintong Song
> >>
> >>
> >>
> >> On Tue, Nov 30, 2021 at 9:19 AM Kurt Young  wrote:
> >>
> >>> Hi,
> >>>
> >>> I didn't fully read the FLIP but the name somehow confused me. My first
> >>> impression of
> >>> seeing this is we are providing some storage for job execution results,
> >>> like the one
> >>> returned with accumulators in batch mode. Would a name like
> >>> "JobStautsStore" be more
> >>> appropriate?
> >>>
> >>> Best,
> >>> Kurt
> >>>
> >>>
> >>> On Mon, Nov 29, 2021 at 8:22 PM Zhu Zhu  wrote:
> >>>
> >>> > Thanks for drafting this FLIP, Matthias, Mika and David.
> >>> >
> >>> > I like the proposed JobResultStore. Besides addressing the problem of
> >>> > re-executing finished jobs, it's also an important step towards HA of
> >>> > multi-job Flink applications.
> >>> >
> >>> > I have one question that, in the "Cleanup" section, it shows that the
> >>> > JobMaster is responsible for cleaning up
> >>> CheckpointCounter/CheckpointStore.
> >>> > Does this mean Flink will have to re-create
> >>> > 

[jira] [Created] (FLINK-25111) Introduce config option, to keep previous behaviour

2021-11-30 Thread Marios Trivyzas (Jira)
Marios Trivyzas created FLINK-25111:
---

 Summary: Introduce config option, to keep previous behaviour
 Key: FLINK-25111
 URL: https://issues.apache.org/jira/browse/FLINK-25111
 Project: Flink
  Issue Type: Sub-task
Reporter: Marios Trivyzas


Introduce a new config option which will allow the users to experience the 
previous behaviour of the `CAST` without any of the fixes/changes, and most 
importantly, when casting fails, *null* will still be returned instead of 
throwing an error, and requiring use of *TRY_CAST* to be able to return a 
default value.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: [VOTE] FLIP-188 Introduce Built-in Dynamic Table Storage

2021-11-30 Thread godfrey he
+1 (binding)

Best,
Godfrey

Jark Wu  于2021年11月30日周二 下午5:47写道:

>
> Thanks for the great discussion and updating.
> Still +1 from my side.
>
> Best,
> Jark
>
> On Tue, 30 Nov 2021 at 17:27, Kurt Young  wrote:
>
> > +1 from my side.
> >
> > Best,
> > Kurt
> >
> >
> > On Tue, Nov 30, 2021 at 5:12 PM Jingsong Li 
> > wrote:
> >
> > > Hi everyone,
> > >
> > > Many thanks to Stephan and Timo, this makes the design of FLIP much
> > > clearer and more reliable.
> > >
> > > I request that you can take another look at the updated FLIP and
> > > please respond directly if you have feedback.
> > >
> > > (I will contact binding voters directly to confirm)
> > >
> > > Best,
> > > Jingsong
> > >
> > > On Tue, Nov 30, 2021 at 4:32 PM Timo Walther  wrote:
> > > >
> > > > Thanks for the healthy discussion. Also +1 from my side for this FLIP.
> > > >
> > > > Thanks,
> > > > Timo
> > > >
> > > > On 24.11.21 19:05, Stephan Ewen wrote:
> > > > > Thanks for all the details and explanation.
> > > > >
> > > > > With the conclusion of the discussion, also +1 from my side for this
> > > FLIP
> > > > >
> > > > > On Sat, Nov 13, 2021 at 12:23 PM Jingsong Li  > >
> > > wrote:
> > > > >
> > > > >> Thanks Stephan and Timo, I have a rough look at your replies. They
> > are
> > > > >> all valuable opinions. I will take time to discuss, explain and
> > > > >> improve them.
> > > > >>
> > > > >> Hi Timo,
> > > > >>> At least a final "I will start the vote soon. Last call for
> > > comments."
> > > > >> would have been nice.
> > > > >>
> > > > >> I replied in the DISCUSS thread that we began to vote. If there are
> > > > >> supplementary comments or reply "pause voting first, I will reply
> > > > >> later", we can suspend or cancel the voting at any time.
> > > > >> I understand why the FLIP must take three days to vote, so that more
> > > > >> people can see it and put forward their opinions.
> > > > >>
> > > > >> Best,
> > > > >> Jingsong
> > > > >>
> > > > >> On Sat, Nov 13, 2021 at 1:27 AM Timo Walther 
> > > wrote:
> > > > >>>
> > > > >>> Hi everyone,
> > > > >>>
> > > > >>> even though the DISCUSS thread was open for 2 weeks. I have the
> > > feeling
> > > > >>> that the VOTE was initiated to quickly. At least a final "I will
> > > start
> > > > >>> the vote soon. Last call for comments." would have been nice.
> > > > >>>
> > > > >>> I also added some comments in the DISCUSS thread. Let's hope we can
> > > > >>> resolve those soon.
> > > > >>>
> > > > >>> Regards,
> > > > >>> Timo
> > > > >>>
> > > > >>> On 12.11.21 16:36, Stephan Ewen wrote:
> > > >  Hi all!
> > > > 
> > > >  I have a few questions on the design still, posted those in the
> > > > >> [DISCUSS]
> > > >  thread.
> > > >  It would be great to clarify those first before concluding this
> > > vote.
> > > > 
> > > >  Thanks,
> > > >  Stephan
> > > > 
> > > > 
> > > >  On Fri, Nov 12, 2021 at 7:22 AM Jark Wu  wrote:
> > > > 
> > > > > +1 (binding)
> > > > >
> > > > > Thanks for the great work Jingsong!
> > > > >
> > > > > Best,
> > > > > Jark
> > > > >
> > > > > On Thu, 11 Nov 2021 at 19:41, JING ZHANG 
> > > > >> wrote:
> > > > >
> > > > >> +1 (non-binding)
> > > > >>
> > > > >> A small suggestion:
> > > > >> The message queue is currently used to store middle layer data
> > of
> > > the
> > > > >> streaming data warehouse. We hope use built-in dynamic table
> > > storage
> > > > >> to
> > > > >> store those middle layer.
> > > > >> But those middle data of the streaming data warehouse are often
> > > > >> provided
> > > > > to
> > > > >> all business teams in a company. Some teams have not use Apache
> > > > >> Flink as
> > > > >> compute engine yet. In order to continue server those teams, the
> > > > >> data in
> > > > >> built-in dynamic table storage may be needed to copied to
> > message
> > > > >> queue
> > > > >> again.
> > > > >> If *the built-in storage could provide same consumer API as the
> > > > >> commonly
> > > > >> used message queues*, data copying may be avoided. So the
> > built-in
> > > > > dynamic
> > > > >> table storage may be promoted faster in the streaming data
> > > warehouse
> > > > >> business.
> > > > >>
> > > > >> Best regards,
> > > > >> Jing Zhang
> > > > >>
> > > > >> Yufei Zhang  于2021年11月11日周四 上午9:34写道:
> > > > >>
> > > > >>> Hi,
> > > > >>>
> > > > >>> +1 (non-binding)
> > > > >>>
> > > > >>> Very interesting design. I saw a lot of discussion on the
> > generic
> > > > >>> interface design, good to know it will address extensibility.
> > > > >>>
> > > > >>> Cheers,
> > > > >>> Yufei
> > > > >>>
> > > > >>>
> > > > >>> On 2021/11/10 02:51:55 Jingsong Li wrote:
> > > >  Hi everyone,
> > > > 
> > > >  Thanks for all the feedback so far. Based on the discussion[1]
> > > we
> > > > > seem
> > > 

[jira] [Created] (FLINK-25110) Flink SQL consumes Kafka up to a certain point in time

2021-11-30 Thread sky (Jira)
sky created FLINK-25110:
---

 Summary: Flink SQL consumes Kafka up to a certain point in time
 Key: FLINK-25110
 URL: https://issues.apache.org/jira/browse/FLINK-25110
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Kafka
Affects Versions: 1.14.1
Reporter: sky


We sometimes need to preview and calculate Kafka's data as of now or at some 
point in time.  To calculate and verify the reliability of the data.  We don't 
need to consume data in Kafka all the time.  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25109) Update jline3 to 3.21.0

2021-11-30 Thread Sergey Nuyanzin (Jira)
Sergey Nuyanzin created FLINK-25109:
---

 Summary: Update jline3 to 3.21.0
 Key: FLINK-25109
 URL: https://issues.apache.org/jira/browse/FLINK-25109
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Client
Reporter: Sergey Nuyanzin


There is a number improvements in a new jline which could be used in this FLIP
e.g. 
rgb support in style (could be used for prompts and highlighting)
line numbers support in prompt continuation
autopairing, display hints during completion



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: [VOTE] FLIP-175: Compose Estimator/Model/AlgoOperator from DAG of Estimator/Model/AlgoOperator

2021-11-30 Thread Dian Fu
+1

Regards,
Dian

On Tue, Nov 30, 2021 at 12:08 PM Yun Gao 
wrote:

> +1 for the new GraphBuilder API supports more complex
> estimator / model structure and helps to reduce the repeat
> code. Thanks Dong for the proposal.
>
> Best,
> Yun
>
>
> --
> From:Becket Qin 
> Send Time:2021 Nov. 22 (Mon.) 11:45
> To:dev 
> Subject:Re: [VOTE] FLIP-175: Compose Estimator/Model/AlgoOperator from DAG
> of Estimator/Model/AlgoOperator
>
> +1 on the latest proposal. Thanks for the writeup, Dong.
>
> The GraphBuilder API is useful and also relatively more complicated than
> the Pipeline API. It would probably take some time for the users to get
> familiar with the usage. Good documentation would be really helpful here.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Fri, Oct 15, 2021 at 2:11 PM Dong Lin  wrote:
>
> > Hi all,
> >
> > We would like to start the vote for FLIP-175: Compose
> > Estimator/Model/AlgoOperator from DAG of Estimator/Model/AlgoOperator
> [1].
> > This FLIP was discussed in this thread [2].
> >
> > With this FLIP, users can compose an Estimator from a DAG of
> > Estimator/Model/AlgoOperator by describing this DAG just once without
> > having to separately describe the DAG for the Model fitted by this
> > Estimator.
> >
> > The vote will be open for at least 72 hours, following the consensus
> voting
> > process.
> >
> > Thanks!
> > Dong Lin and Zhipeng Zhang
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181311363
> > [2]
> >
> >
> https://lists.apache.org/thread.html/r3bb7c2c145635f98ae22c5a917c7f0ce80265dd00ffecd754d8bedf8%40%3Cdev.flink.apache.org%3E
> >
>


Re: [DISCUSS] FLIP-194: Introduce the JobResultStore

2021-11-30 Thread Xintong Song
Thanks for the explanations, Matthias.

Including JobResultStore in HighAvailabilityServices as a replacement of
RunningJobRegistry makes sense to me. And initializing JobResultStore in
the same way initializing JobGraphStore also sounds good.

I have another question concerning where to persist the job results. The
FLIP proposes two implementations of JobResultStore: In-Memory and
FileSystem. I'm wondering if we should by default persist the results in HA
services (i.e., ZooKeeper or Kubernetes ConfigMap) when enabled. This is
how RunningJobRegistry and JobGraphStore persist things currently.

IIUC, the reason we want a FileSystemJobResultStore is that it allows not
only the dispatcher but also a 3rd party to retrieve the result after the
job finishes, making scenarios like multi-stage application mode possible.
However, it's probably not so good for users who don't need such
retrieval and already used a ZooKeeper/Native-Kubernetes HA to specify
another remote FS path for storing job results, even if they are
automatically cleaned-up on committed.

Maybe we can use the HA storage by default, and make the FileSystem opt-in.
WDYT?

Thank you~

Xintong Song



On Tue, Nov 30, 2021 at 5:35 PM Matthias Pohl 
wrote:

> Hi Xintong,
> your observation is correct. We probably didn't address this in the FLIP
> explicitly enough. We planned to include it in the HighAvailabilityServices
> analogously to the RunningJobRegistry (and replace the RunningJobRegistry
> by the JobResultStore in the end).
>
> One additional thing, I want to point out: The JobResultStore and the
> JobGraphStore have closely-related lifecycles. Both provide information for
> the Dispatcher's initialization. Therefore, we're planning to initialize
> the JobResultStore in the same way, the JobGraphStore is initialized in
> the DispatcherLeaderProcessFactory and referenced
> in DispatcherLeaderProcess. The Dispatcher will get the information about
> recovered JobGraphs (as it's currently done on master) and the JobResult of
> globally-terminated jobs that are still marked as "dirty" by the
> JobResultStore. The cleanup based on the latter ones will then happen in
> the Dispatcher.
>
> Matthias
>
> On Tue, Nov 30, 2021 at 3:28 AM Xintong Song 
> wrote:
>
>> Thanks David, Matthias and Mika,
>>
>> I like this FLIP in the way it handles potential re-execution and
>> resource leaks due to clean-up failures.
>>
>> I have one question: Why is this JobResultStore not part of the high
>> availability services? Or ask differently, are there cases that we only
>> need the HA services but not JobResultStore, or vice versa?
>>
>> Thank you~
>>
>> Xintong Song
>>
>>
>>
>> On Tue, Nov 30, 2021 at 9:19 AM Kurt Young  wrote:
>>
>>> Hi,
>>>
>>> I didn't fully read the FLIP but the name somehow confused me. My first
>>> impression of
>>> seeing this is we are providing some storage for job execution results,
>>> like the one
>>> returned with accumulators in batch mode. Would a name like
>>> "JobStautsStore" be more
>>> appropriate?
>>>
>>> Best,
>>> Kurt
>>>
>>>
>>> On Mon, Nov 29, 2021 at 8:22 PM Zhu Zhu  wrote:
>>>
>>> > Thanks for drafting this FLIP, Matthias, Mika and David.
>>> >
>>> > I like the proposed JobResultStore. Besides addressing the problem of
>>> > re-executing finished jobs, it's also an important step towards HA of
>>> > multi-job Flink applications.
>>> >
>>> > I have one question that, in the "Cleanup" section, it shows that the
>>> > JobMaster is responsible for cleaning up
>>> CheckpointCounter/CheckpointStore.
>>> > Does this mean Flink will have to re-create
>>> > JobMaster/Scheduler/ExecutionGraph for a terminated job to do the
>>> cleanup?
>>> > If so, this can be heavy in certain cases because the ExecutionGraph
>>> > creation may conduct connector initialization. So I'm thinking whether
>>> it's
>>> > possible to make CheckpointCounter/CheckpointStore a component of
>>> > Dispatcher?
>>> >
>>> > Thanks,
>>> > Zhu
>>> >
>>> > Till Rohrmann  于2021年11月27日周六 上午1:29写道:
>>> >
>>> > > Thanks for creating this FLIP Matthias, Mika and David.
>>> > >
>>> > > I think the JobResultStore is an important piece for fixing Flink's
>>> last
>>> > > high-availability problem (afaik). Once we have this piece in place,
>>> > users
>>> > > no longer risk to re-execute a successfully completed job.
>>> > >
>>> > > I have one comment concerning breaking interfaces:
>>> > >
>>> > > If we don't want to break interfaces, then we could keep the
>>> > > HighAvailabilityServices.getRunningJobsRegistry() method and add a
>>> > default
>>> > > implementation for HighAvailabilityServices.getJobResultStore(). We
>>> could
>>> > > then deprecate the former method and then remove it in the subsequent
>>> > > release (1.16).
>>> > >
>>> > > Apart from that, +1 for the FLIP.
>>> > >
>>> > > Cheers,
>>> > > Till
>>> > >
>>> > > On Wed, Nov 17, 2021 at 6:05 PM David Morávek 
>>> wrote:
>>> > >
>>> > > > Hi everyone,
>>> > > >
>>> > > > Matthias, Mika and I want to start a 

Re: [DISCUSS] FLIP-194: Introduce the JobResultStore

2021-11-30 Thread Matthias Pohl
Hi Kurt,
thanks for sharing your concerns. Our naming is based on the fact that
there is already a JobResult class. That's the metadata container we store
in the JobResultStore. That JobResult is furthermore used in the REST API
where (ironically) it is handled by the JobExecutionResultHandler. The
naming of the RestHandler is unfortunate in my opinion as it serves the
JobResult and not the JobExecutionResult. Even its JavaDoc is pointing to
the wrong class and the JSON object class JobExecutionResultResponse refers
to JobExecutionResult in its class name.

Does that reasoning make sense to you?

Best,
Matthias

On Tue, Nov 30, 2021 at 2:19 AM Kurt Young  wrote:

> Hi,
>
> I didn't fully read the FLIP but the name somehow confused me. My first
> impression of
> seeing this is we are providing some storage for job execution results,
> like the one
> returned with accumulators in batch mode. Would a name like
> "JobStautsStore" be more
> appropriate?
>
> Best,
> Kurt
>
>
> On Mon, Nov 29, 2021 at 8:22 PM Zhu Zhu  wrote:
>
>> Thanks for drafting this FLIP, Matthias, Mika and David.
>>
>> I like the proposed JobResultStore. Besides addressing the problem of
>> re-executing finished jobs, it's also an important step towards HA of
>> multi-job Flink applications.
>>
>> I have one question that, in the "Cleanup" section, it shows that the
>> JobMaster is responsible for cleaning up
>> CheckpointCounter/CheckpointStore.
>> Does this mean Flink will have to re-create
>> JobMaster/Scheduler/ExecutionGraph for a terminated job to do the cleanup?
>> If so, this can be heavy in certain cases because the ExecutionGraph
>> creation may conduct connector initialization. So I'm thinking whether
>> it's
>> possible to make CheckpointCounter/CheckpointStore a component of
>> Dispatcher?
>>
>> Thanks,
>> Zhu
>>
>> Till Rohrmann  于2021年11月27日周六 上午1:29写道:
>>
>> > Thanks for creating this FLIP Matthias, Mika and David.
>> >
>> > I think the JobResultStore is an important piece for fixing Flink's last
>> > high-availability problem (afaik). Once we have this piece in place,
>> users
>> > no longer risk to re-execute a successfully completed job.
>> >
>> > I have one comment concerning breaking interfaces:
>> >
>> > If we don't want to break interfaces, then we could keep the
>> > HighAvailabilityServices.getRunningJobsRegistry() method and add a
>> default
>> > implementation for HighAvailabilityServices.getJobResultStore(). We
>> could
>> > then deprecate the former method and then remove it in the subsequent
>> > release (1.16).
>> >
>> > Apart from that, +1 for the FLIP.
>> >
>> > Cheers,
>> > Till
>> >
>> > On Wed, Nov 17, 2021 at 6:05 PM David Morávek  wrote:
>> >
>> > > Hi everyone,
>> > >
>> > > Matthias, Mika and I want to start a discussion about introduction of
>> a
>> > new
>> > > Flink component, the *JobResultStore*.
>> > >
>> > > The main motivation is to address shortcomings of the
>> > *RunningJobsRegistry*
>> > > and surpass it with the new component. These shortcomings have been
>> first
>> > > described in FLINK-11813 [1].
>> > >
>> > > This change should improve the overall stability of the JobManager's
>> > > components and address the race conditions in some of the fail over
>> > > scenarios during the job cleanup lifecycle.
>> > >
>> > > It should also help to ensure that Flink doesn't leave any uncleaned
>> > > resources behind.
>> > >
>> > > We've prepared a FLIP-194 [2], which outlines the design and reasoning
>> > > behind this new component.
>> > >
>> > > [1] https://issues.apache.org/jira/browse/FLINK-11813
>> > > [2]
>> > >
>> >
>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=195726435
>> > >
>> > > We're looking forward for your feedback ;)
>> > >
>> > > Best,
>> > > Matthias, Mika and David
>> > >
>> >
>
>


Re: [VOTE] FLIP-188 Introduce Built-in Dynamic Table Storage

2021-11-30 Thread Jark Wu
Thanks for the great discussion and updating.
Still +1 from my side.

Best,
Jark

On Tue, 30 Nov 2021 at 17:27, Kurt Young  wrote:

> +1 from my side.
>
> Best,
> Kurt
>
>
> On Tue, Nov 30, 2021 at 5:12 PM Jingsong Li 
> wrote:
>
> > Hi everyone,
> >
> > Many thanks to Stephan and Timo, this makes the design of FLIP much
> > clearer and more reliable.
> >
> > I request that you can take another look at the updated FLIP and
> > please respond directly if you have feedback.
> >
> > (I will contact binding voters directly to confirm)
> >
> > Best,
> > Jingsong
> >
> > On Tue, Nov 30, 2021 at 4:32 PM Timo Walther  wrote:
> > >
> > > Thanks for the healthy discussion. Also +1 from my side for this FLIP.
> > >
> > > Thanks,
> > > Timo
> > >
> > > On 24.11.21 19:05, Stephan Ewen wrote:
> > > > Thanks for all the details and explanation.
> > > >
> > > > With the conclusion of the discussion, also +1 from my side for this
> > FLIP
> > > >
> > > > On Sat, Nov 13, 2021 at 12:23 PM Jingsong Li  >
> > wrote:
> > > >
> > > >> Thanks Stephan and Timo, I have a rough look at your replies. They
> are
> > > >> all valuable opinions. I will take time to discuss, explain and
> > > >> improve them.
> > > >>
> > > >> Hi Timo,
> > > >>> At least a final "I will start the vote soon. Last call for
> > comments."
> > > >> would have been nice.
> > > >>
> > > >> I replied in the DISCUSS thread that we began to vote. If there are
> > > >> supplementary comments or reply "pause voting first, I will reply
> > > >> later", we can suspend or cancel the voting at any time.
> > > >> I understand why the FLIP must take three days to vote, so that more
> > > >> people can see it and put forward their opinions.
> > > >>
> > > >> Best,
> > > >> Jingsong
> > > >>
> > > >> On Sat, Nov 13, 2021 at 1:27 AM Timo Walther 
> > wrote:
> > > >>>
> > > >>> Hi everyone,
> > > >>>
> > > >>> even though the DISCUSS thread was open for 2 weeks. I have the
> > feeling
> > > >>> that the VOTE was initiated to quickly. At least a final "I will
> > start
> > > >>> the vote soon. Last call for comments." would have been nice.
> > > >>>
> > > >>> I also added some comments in the DISCUSS thread. Let's hope we can
> > > >>> resolve those soon.
> > > >>>
> > > >>> Regards,
> > > >>> Timo
> > > >>>
> > > >>> On 12.11.21 16:36, Stephan Ewen wrote:
> > >  Hi all!
> > > 
> > >  I have a few questions on the design still, posted those in the
> > > >> [DISCUSS]
> > >  thread.
> > >  It would be great to clarify those first before concluding this
> > vote.
> > > 
> > >  Thanks,
> > >  Stephan
> > > 
> > > 
> > >  On Fri, Nov 12, 2021 at 7:22 AM Jark Wu  wrote:
> > > 
> > > > +1 (binding)
> > > >
> > > > Thanks for the great work Jingsong!
> > > >
> > > > Best,
> > > > Jark
> > > >
> > > > On Thu, 11 Nov 2021 at 19:41, JING ZHANG 
> > > >> wrote:
> > > >
> > > >> +1 (non-binding)
> > > >>
> > > >> A small suggestion:
> > > >> The message queue is currently used to store middle layer data
> of
> > the
> > > >> streaming data warehouse. We hope use built-in dynamic table
> > storage
> > > >> to
> > > >> store those middle layer.
> > > >> But those middle data of the streaming data warehouse are often
> > > >> provided
> > > > to
> > > >> all business teams in a company. Some teams have not use Apache
> > > >> Flink as
> > > >> compute engine yet. In order to continue server those teams, the
> > > >> data in
> > > >> built-in dynamic table storage may be needed to copied to
> message
> > > >> queue
> > > >> again.
> > > >> If *the built-in storage could provide same consumer API as the
> > > >> commonly
> > > >> used message queues*, data copying may be avoided. So the
> built-in
> > > > dynamic
> > > >> table storage may be promoted faster in the streaming data
> > warehouse
> > > >> business.
> > > >>
> > > >> Best regards,
> > > >> Jing Zhang
> > > >>
> > > >> Yufei Zhang  于2021年11月11日周四 上午9:34写道:
> > > >>
> > > >>> Hi,
> > > >>>
> > > >>> +1 (non-binding)
> > > >>>
> > > >>> Very interesting design. I saw a lot of discussion on the
> generic
> > > >>> interface design, good to know it will address extensibility.
> > > >>>
> > > >>> Cheers,
> > > >>> Yufei
> > > >>>
> > > >>>
> > > >>> On 2021/11/10 02:51:55 Jingsong Li wrote:
> > >  Hi everyone,
> > > 
> > >  Thanks for all the feedback so far. Based on the discussion[1]
> > we
> > > > seem
> > >  to have consensus, so I would like to start a vote on FLIP-188
> > for
> > >  which the FLIP has now also been updated[2].
> > > 
> > >  The vote will last for at least 72 hours (Nov 13th 3:00 GMT)
> > unless
> > >  there is an objection or insufficient votes.
> > > 
> > >  [1]
> > > >> 

Re: [DISCUSS] FLIP-194: Introduce the JobResultStore

2021-11-30 Thread Matthias Pohl
Hi Xintong,
your observation is correct. We probably didn't address this in the FLIP
explicitly enough. We planned to include it in the HighAvailabilityServices
analogously to the RunningJobRegistry (and replace the RunningJobRegistry
by the JobResultStore in the end).

One additional thing, I want to point out: The JobResultStore and the
JobGraphStore have closely-related lifecycles. Both provide information for
the Dispatcher's initialization. Therefore, we're planning to initialize
the JobResultStore in the same way, the JobGraphStore is initialized in
the DispatcherLeaderProcessFactory and referenced
in DispatcherLeaderProcess. The Dispatcher will get the information about
recovered JobGraphs (as it's currently done on master) and the JobResult of
globally-terminated jobs that are still marked as "dirty" by the
JobResultStore. The cleanup based on the latter ones will then happen in
the Dispatcher.

Matthias

On Tue, Nov 30, 2021 at 3:28 AM Xintong Song  wrote:

> Thanks David, Matthias and Mika,
>
> I like this FLIP in the way it handles potential re-execution and resource
> leaks due to clean-up failures.
>
> I have one question: Why is this JobResultStore not part of the high
> availability services? Or ask differently, are there cases that we only
> need the HA services but not JobResultStore, or vice versa?
>
> Thank you~
>
> Xintong Song
>
>
>
> On Tue, Nov 30, 2021 at 9:19 AM Kurt Young  wrote:
>
>> Hi,
>>
>> I didn't fully read the FLIP but the name somehow confused me. My first
>> impression of
>> seeing this is we are providing some storage for job execution results,
>> like the one
>> returned with accumulators in batch mode. Would a name like
>> "JobStautsStore" be more
>> appropriate?
>>
>> Best,
>> Kurt
>>
>>
>> On Mon, Nov 29, 2021 at 8:22 PM Zhu Zhu  wrote:
>>
>> > Thanks for drafting this FLIP, Matthias, Mika and David.
>> >
>> > I like the proposed JobResultStore. Besides addressing the problem of
>> > re-executing finished jobs, it's also an important step towards HA of
>> > multi-job Flink applications.
>> >
>> > I have one question that, in the "Cleanup" section, it shows that the
>> > JobMaster is responsible for cleaning up
>> CheckpointCounter/CheckpointStore.
>> > Does this mean Flink will have to re-create
>> > JobMaster/Scheduler/ExecutionGraph for a terminated job to do the
>> cleanup?
>> > If so, this can be heavy in certain cases because the ExecutionGraph
>> > creation may conduct connector initialization. So I'm thinking whether
>> it's
>> > possible to make CheckpointCounter/CheckpointStore a component of
>> > Dispatcher?
>> >
>> > Thanks,
>> > Zhu
>> >
>> > Till Rohrmann  于2021年11月27日周六 上午1:29写道:
>> >
>> > > Thanks for creating this FLIP Matthias, Mika and David.
>> > >
>> > > I think the JobResultStore is an important piece for fixing Flink's
>> last
>> > > high-availability problem (afaik). Once we have this piece in place,
>> > users
>> > > no longer risk to re-execute a successfully completed job.
>> > >
>> > > I have one comment concerning breaking interfaces:
>> > >
>> > > If we don't want to break interfaces, then we could keep the
>> > > HighAvailabilityServices.getRunningJobsRegistry() method and add a
>> > default
>> > > implementation for HighAvailabilityServices.getJobResultStore(). We
>> could
>> > > then deprecate the former method and then remove it in the subsequent
>> > > release (1.16).
>> > >
>> > > Apart from that, +1 for the FLIP.
>> > >
>> > > Cheers,
>> > > Till
>> > >
>> > > On Wed, Nov 17, 2021 at 6:05 PM David Morávek 
>> wrote:
>> > >
>> > > > Hi everyone,
>> > > >
>> > > > Matthias, Mika and I want to start a discussion about introduction
>> of a
>> > > new
>> > > > Flink component, the *JobResultStore*.
>> > > >
>> > > > The main motivation is to address shortcomings of the
>> > > *RunningJobsRegistry*
>> > > > and surpass it with the new component. These shortcomings have been
>> > first
>> > > > described in FLINK-11813 [1].
>> > > >
>> > > > This change should improve the overall stability of the JobManager's
>> > > > components and address the race conditions in some of the fail over
>> > > > scenarios during the job cleanup lifecycle.
>> > > >
>> > > > It should also help to ensure that Flink doesn't leave any uncleaned
>> > > > resources behind.
>> > > >
>> > > > We've prepared a FLIP-194 [2], which outlines the design and
>> reasoning
>> > > > behind this new component.
>> > > >
>> > > > [1] https://issues.apache.org/jira/browse/FLINK-11813
>> > > > [2]
>> > > >
>> > >
>> >
>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=195726435
>> > > >
>> > > > We're looking forward for your feedback ;)
>> > > >
>> > > > Best,
>> > > > Matthias, Mika and David
>> > > >
>> > >
>> >
>
>


Re: [VOTE] FLIP-188 Introduce Built-in Dynamic Table Storage

2021-11-30 Thread Kurt Young
+1 from my side.

Best,
Kurt


On Tue, Nov 30, 2021 at 5:12 PM Jingsong Li  wrote:

> Hi everyone,
>
> Many thanks to Stephan and Timo, this makes the design of FLIP much
> clearer and more reliable.
>
> I request that you can take another look at the updated FLIP and
> please respond directly if you have feedback.
>
> (I will contact binding voters directly to confirm)
>
> Best,
> Jingsong
>
> On Tue, Nov 30, 2021 at 4:32 PM Timo Walther  wrote:
> >
> > Thanks for the healthy discussion. Also +1 from my side for this FLIP.
> >
> > Thanks,
> > Timo
> >
> > On 24.11.21 19:05, Stephan Ewen wrote:
> > > Thanks for all the details and explanation.
> > >
> > > With the conclusion of the discussion, also +1 from my side for this
> FLIP
> > >
> > > On Sat, Nov 13, 2021 at 12:23 PM Jingsong Li 
> wrote:
> > >
> > >> Thanks Stephan and Timo, I have a rough look at your replies. They are
> > >> all valuable opinions. I will take time to discuss, explain and
> > >> improve them.
> > >>
> > >> Hi Timo,
> > >>> At least a final "I will start the vote soon. Last call for
> comments."
> > >> would have been nice.
> > >>
> > >> I replied in the DISCUSS thread that we began to vote. If there are
> > >> supplementary comments or reply "pause voting first, I will reply
> > >> later", we can suspend or cancel the voting at any time.
> > >> I understand why the FLIP must take three days to vote, so that more
> > >> people can see it and put forward their opinions.
> > >>
> > >> Best,
> > >> Jingsong
> > >>
> > >> On Sat, Nov 13, 2021 at 1:27 AM Timo Walther 
> wrote:
> > >>>
> > >>> Hi everyone,
> > >>>
> > >>> even though the DISCUSS thread was open for 2 weeks. I have the
> feeling
> > >>> that the VOTE was initiated to quickly. At least a final "I will
> start
> > >>> the vote soon. Last call for comments." would have been nice.
> > >>>
> > >>> I also added some comments in the DISCUSS thread. Let's hope we can
> > >>> resolve those soon.
> > >>>
> > >>> Regards,
> > >>> Timo
> > >>>
> > >>> On 12.11.21 16:36, Stephan Ewen wrote:
> >  Hi all!
> > 
> >  I have a few questions on the design still, posted those in the
> > >> [DISCUSS]
> >  thread.
> >  It would be great to clarify those first before concluding this
> vote.
> > 
> >  Thanks,
> >  Stephan
> > 
> > 
> >  On Fri, Nov 12, 2021 at 7:22 AM Jark Wu  wrote:
> > 
> > > +1 (binding)
> > >
> > > Thanks for the great work Jingsong!
> > >
> > > Best,
> > > Jark
> > >
> > > On Thu, 11 Nov 2021 at 19:41, JING ZHANG 
> > >> wrote:
> > >
> > >> +1 (non-binding)
> > >>
> > >> A small suggestion:
> > >> The message queue is currently used to store middle layer data of
> the
> > >> streaming data warehouse. We hope use built-in dynamic table
> storage
> > >> to
> > >> store those middle layer.
> > >> But those middle data of the streaming data warehouse are often
> > >> provided
> > > to
> > >> all business teams in a company. Some teams have not use Apache
> > >> Flink as
> > >> compute engine yet. In order to continue server those teams, the
> > >> data in
> > >> built-in dynamic table storage may be needed to copied to message
> > >> queue
> > >> again.
> > >> If *the built-in storage could provide same consumer API as the
> > >> commonly
> > >> used message queues*, data copying may be avoided. So the built-in
> > > dynamic
> > >> table storage may be promoted faster in the streaming data
> warehouse
> > >> business.
> > >>
> > >> Best regards,
> > >> Jing Zhang
> > >>
> > >> Yufei Zhang  于2021年11月11日周四 上午9:34写道:
> > >>
> > >>> Hi,
> > >>>
> > >>> +1 (non-binding)
> > >>>
> > >>> Very interesting design. I saw a lot of discussion on the generic
> > >>> interface design, good to know it will address extensibility.
> > >>>
> > >>> Cheers,
> > >>> Yufei
> > >>>
> > >>>
> > >>> On 2021/11/10 02:51:55 Jingsong Li wrote:
> >  Hi everyone,
> > 
> >  Thanks for all the feedback so far. Based on the discussion[1]
> we
> > > seem
> >  to have consensus, so I would like to start a vote on FLIP-188
> for
> >  which the FLIP has now also been updated[2].
> > 
> >  The vote will last for at least 72 hours (Nov 13th 3:00 GMT)
> unless
> >  there is an objection or insufficient votes.
> > 
> >  [1]
> > >> https://lists.apache.org/thread/tqyn1cro5ohl3c3fkjb1zvxbo03sofn7
> >  [2]
> > >>>
> > >>
> > >
> > >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-188%3A+Introduce+Built-in+Dynamic+Table+Storage
> > 
> >  Best,
> >  Jingsong
> > 
> > >>>
> > >>
> > >
> > 
> > >>>
> > >>
> > >>
> > >> --
> > >> Best, Jingsong Lee
> > >>
> > >
> >
>
>
> --
> Best, Jingsong Lee
>


Re: [VOTE] FLIP-188 Introduce Built-in Dynamic Table Storage

2021-11-30 Thread Jingsong Li
Hi everyone,

Many thanks to Stephan and Timo, this makes the design of FLIP much
clearer and more reliable.

I request that you can take another look at the updated FLIP and
please respond directly if you have feedback.

(I will contact binding voters directly to confirm)

Best,
Jingsong

On Tue, Nov 30, 2021 at 4:32 PM Timo Walther  wrote:
>
> Thanks for the healthy discussion. Also +1 from my side for this FLIP.
>
> Thanks,
> Timo
>
> On 24.11.21 19:05, Stephan Ewen wrote:
> > Thanks for all the details and explanation.
> >
> > With the conclusion of the discussion, also +1 from my side for this FLIP
> >
> > On Sat, Nov 13, 2021 at 12:23 PM Jingsong Li  wrote:
> >
> >> Thanks Stephan and Timo, I have a rough look at your replies. They are
> >> all valuable opinions. I will take time to discuss, explain and
> >> improve them.
> >>
> >> Hi Timo,
> >>> At least a final "I will start the vote soon. Last call for comments."
> >> would have been nice.
> >>
> >> I replied in the DISCUSS thread that we began to vote. If there are
> >> supplementary comments or reply "pause voting first, I will reply
> >> later", we can suspend or cancel the voting at any time.
> >> I understand why the FLIP must take three days to vote, so that more
> >> people can see it and put forward their opinions.
> >>
> >> Best,
> >> Jingsong
> >>
> >> On Sat, Nov 13, 2021 at 1:27 AM Timo Walther  wrote:
> >>>
> >>> Hi everyone,
> >>>
> >>> even though the DISCUSS thread was open for 2 weeks. I have the feeling
> >>> that the VOTE was initiated to quickly. At least a final "I will start
> >>> the vote soon. Last call for comments." would have been nice.
> >>>
> >>> I also added some comments in the DISCUSS thread. Let's hope we can
> >>> resolve those soon.
> >>>
> >>> Regards,
> >>> Timo
> >>>
> >>> On 12.11.21 16:36, Stephan Ewen wrote:
>  Hi all!
> 
>  I have a few questions on the design still, posted those in the
> >> [DISCUSS]
>  thread.
>  It would be great to clarify those first before concluding this vote.
> 
>  Thanks,
>  Stephan
> 
> 
>  On Fri, Nov 12, 2021 at 7:22 AM Jark Wu  wrote:
> 
> > +1 (binding)
> >
> > Thanks for the great work Jingsong!
> >
> > Best,
> > Jark
> >
> > On Thu, 11 Nov 2021 at 19:41, JING ZHANG 
> >> wrote:
> >
> >> +1 (non-binding)
> >>
> >> A small suggestion:
> >> The message queue is currently used to store middle layer data of the
> >> streaming data warehouse. We hope use built-in dynamic table storage
> >> to
> >> store those middle layer.
> >> But those middle data of the streaming data warehouse are often
> >> provided
> > to
> >> all business teams in a company. Some teams have not use Apache
> >> Flink as
> >> compute engine yet. In order to continue server those teams, the
> >> data in
> >> built-in dynamic table storage may be needed to copied to message
> >> queue
> >> again.
> >> If *the built-in storage could provide same consumer API as the
> >> commonly
> >> used message queues*, data copying may be avoided. So the built-in
> > dynamic
> >> table storage may be promoted faster in the streaming data warehouse
> >> business.
> >>
> >> Best regards,
> >> Jing Zhang
> >>
> >> Yufei Zhang  于2021年11月11日周四 上午9:34写道:
> >>
> >>> Hi,
> >>>
> >>> +1 (non-binding)
> >>>
> >>> Very interesting design. I saw a lot of discussion on the generic
> >>> interface design, good to know it will address extensibility.
> >>>
> >>> Cheers,
> >>> Yufei
> >>>
> >>>
> >>> On 2021/11/10 02:51:55 Jingsong Li wrote:
>  Hi everyone,
> 
>  Thanks for all the feedback so far. Based on the discussion[1] we
> > seem
>  to have consensus, so I would like to start a vote on FLIP-188 for
>  which the FLIP has now also been updated[2].
> 
>  The vote will last for at least 72 hours (Nov 13th 3:00 GMT) unless
>  there is an objection or insufficient votes.
> 
>  [1]
> >> https://lists.apache.org/thread/tqyn1cro5ohl3c3fkjb1zvxbo03sofn7
>  [2]
> >>>
> >>
> >
> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-188%3A+Introduce+Built-in+Dynamic+Table+Storage
> 
>  Best,
>  Jingsong
> 
> >>>
> >>
> >
> 
> >>>
> >>
> >>
> >> --
> >> Best, Jingsong Lee
> >>
> >
>


-- 
Best, Jingsong Lee


Re: [DISCUSS] FLIP-194: Introduce the JobResultStore

2021-11-30 Thread Matthias Pohl
Hi Zhu Zhu,
thanks for your reply. Your concern is valid. Our goal is to only touch the
CompletedCheckpointStore and CheckpointIDCounter without instantiating
JobMaster/Scheduler/ExecutionGraph. We would have to initialize these
classes (and for the CompletedCheckpointStore reload the
CompletedCheckpoints) in order to rerun their shutdown functionality. That
can be achieved through the CheckpointRecoveryFactory which is provided by
the HighAvailabilityServices. The shutdown call itself only requires the
JobID and the JobStatus, both being provided by the JobResultStore.

We're not planning to touch the process which is currently available on
master for the case where the failover happens before the job was marked as
globally-terminated. That means that the JobMaster still owns instances to
the CompletedCheckpointStore and CheckpointIDCounter via the scheduler.
Therefore, the JobMaster is responsible for cleaning up these components in
that case.

Matthias

On Mon, Nov 29, 2021 at 1:21 PM Zhu Zhu  wrote:

> Thanks for drafting this FLIP, Matthias, Mika and David.
>
> I like the proposed JobResultStore. Besides addressing the problem of
> re-executing finished jobs, it's also an important step towards HA of
> multi-job Flink applications.
>
> I have one question that, in the "Cleanup" section, it shows that the
> JobMaster is responsible for cleaning up CheckpointCounter/CheckpointStore.
> Does this mean Flink will have to re-create
> JobMaster/Scheduler/ExecutionGraph for a terminated job to do the cleanup?
> If so, this can be heavy in certain cases because the ExecutionGraph
> creation may conduct connector initialization. So I'm thinking whether it's
> possible to make CheckpointCounter/CheckpointStore a component of
> Dispatcher?
>
> Thanks,
> Zhu
>
> Till Rohrmann  于2021年11月27日周六 上午1:29写道:
>
>> Thanks for creating this FLIP Matthias, Mika and David.
>>
>> I think the JobResultStore is an important piece for fixing Flink's last
>> high-availability problem (afaik). Once we have this piece in place, users
>> no longer risk to re-execute a successfully completed job.
>>
>> I have one comment concerning breaking interfaces:
>>
>> If we don't want to break interfaces, then we could keep the
>> HighAvailabilityServices.getRunningJobsRegistry() method and add a default
>> implementation for HighAvailabilityServices.getJobResultStore(). We could
>> then deprecate the former method and then remove it in the subsequent
>> release (1.16).
>>
>> Apart from that, +1 for the FLIP.
>>
>> Cheers,
>> Till
>>
>> On Wed, Nov 17, 2021 at 6:05 PM David Morávek  wrote:
>>
>> > Hi everyone,
>> >
>> > Matthias, Mika and I want to start a discussion about introduction of a
>> new
>> > Flink component, the *JobResultStore*.
>> >
>> > The main motivation is to address shortcomings of the
>> *RunningJobsRegistry*
>> > and surpass it with the new component. These shortcomings have been
>> first
>> > described in FLINK-11813 [1].
>> >
>> > This change should improve the overall stability of the JobManager's
>> > components and address the race conditions in some of the fail over
>> > scenarios during the job cleanup lifecycle.
>> >
>> > It should also help to ensure that Flink doesn't leave any uncleaned
>> > resources behind.
>> >
>> > We've prepared a FLIP-194 [2], which outlines the design and reasoning
>> > behind this new component.
>> >
>> > [1] https://issues.apache.org/jira/browse/FLINK-11813
>> > [2]
>> >
>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=195726435
>> >
>> > We're looking forward for your feedback ;)
>> >
>> > Best,
>> > Matthias, Mika and David
>> >
>
>


[jira] [Created] (FLINK-25108) When the environment variable HADOOP_CONF_DIR flink kerberos authentication error is set

2021-11-30 Thread aresyhzhang (Jira)
aresyhzhang created FLINK-25108:
---

 Summary: When the environment variable HADOOP_CONF_DIR flink 
kerberos authentication error is set
 Key: FLINK-25108
 URL: https://issues.apache.org/jira/browse/FLINK-25108
 Project: Flink
  Issue Type: Bug
  Components: Deployment / Kubernetes
Affects Versions: 1.14.0
Reporter: aresyhzhang


version:
flink version:1.14.0
java version:1.8
run mode:flink native k8s session

problem:
When I use flink sql batch mode to read the data in the hive table, (我们采用flink 
native k8s session提交任务)it appears:
Caused by: java.io.IOException: Can't get Master Kerberos principal for use as 
renewer
at 
org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodesInternal(TokenCache.java:116)
 ~[hadoop.jar:2.6.5-10.0]
at 
org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodesInternal(TokenCache.java:100)
 ~[hadoop.jar:2.6.5-10.0]
at 
org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodes(TokenCache.java:80)
 ~[hadoop.jar:2.6.5-10.0]
at 
org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:205) 
~[hadoop.jar:2.6.5-10.0].
I think this exception is caused by the absence of yarn-site.xml under the 
environment variable HADOOP_CONF_DIR. The default value of the environment 
variable is: /opt/hadoop/conf
I tried by specifying pod-template.yaml:
- name: HADOOP_CONF_DIR
value: "/etc/hive/conf"
Change the value of HADOOP_CONF_DIR because I have stored yarn-site.xml under 
/etc/hive/conf, but it will always be overwritten by "/opt/hadoop/conf".

Remark:
1. When I look at the source code of getHadoopConfigurationFileItems of the 
HadoopConfMountDecorator class, I find
final List expectedFileNames = new ArrayList<>();
 expectedFileNames.add("core-site.xml");
 expectedFileNames.add("hdfs-site.xml");
Only core-site.xml and hdfs-site.xml are downloaded here, but "yarn-site.xml" 
is not downloaded, which leads to failure to pass kerberos authentication
Should I add another line of code:
epectedFileNames.add("yarn-site.xml") To pass kerberos authentication

2. Or is there any other way to actually change the value of the environment 
variable HADOOP_CONF_DIR so that it points to the "/etc/hive/conf" I want 
instead of "/opt/hadoop/conf" To pass kerberos authentication



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: [DISCUSS] FLIP-190: Support Version Upgrades for Table API & SQL Programs

2021-11-30 Thread Marios Trivyzas
I have a question regarding the `COMPILE PLAN OVEWRITE`. If we choose to go
with the config option instead,
that doesn't provide the flexibility to overwrite certain plans but not
others, since the config applies globally, isn't that
something to consider?

On Mon, Nov 29, 2021 at 10:15 AM Marios Trivyzas  wrote:

> Hi Timo!
>
> Thanks a lot for taking all that time and effort to put together this
> proposal!
>
> Regarding:
> > For simplification of the design, we assume that upgrades use a step size
> of a single
> minor version. We don't guarantee skipping minor versions (e.g. 1.11 to
> 1.14).
>
> I think that for this first step we should make it absolutely clear to the
> users that they would need to go through all intermediate versions
> to end up with the target version they wish. If we are to support skipping
> versions in the future, i.e. upgrade from 1.14 to 1.17, this means
> that we need to have a testing infrastructure in place that would test all
> possible combinations of version upgrades, i.e. from 1.14 to 1.15,
> from 1.14 to 1.16 and so forth, while still testing and of course
> supporting all the upgrades from the previous minor version.
>
> I like a lot the idea of introducing HINTS to define some behaviour in the
> programs!
> - the hints live together with the sql statements and consequently the
> (JSON) plans.
> - If multiple queries are involved in a program, each one of them can
> define its own config (regarding plan optimisation, not null enforcement,
> etc)
>
> I agree with Francesco on his argument regarding the *JSON* plan. I
> believe we should already provide flexibility here, since (who knows) in
> the future
> a JSON plan might not fulfil the desired functionality.
>
> I also agree that we need some very obvious way (i.e. not log entry) to
> show the users that their program doesn't support version upgrades, and
> prevent them from being negatively surprised in the future, when trying to
> upgrade their production pipelines.
>
> This is an implementation detail, but I'd like to add that there should be
> some good logging in place when the upgrade is taking place, to be
> able to track every restoration action, and help debug any potential
> issues arising from that.
>
>
>
>
>
> On Fri, Nov 26, 2021 at 2:54 PM Till Rohrmann 
> wrote:
>
>> Thanks for writing this FLIP Timo. I think this will be a very important
>> improvement for Flink and our SQL user :-)
>>
>> Similar to Francesco I would like to understand the statement
>>
>> > For simplification of the design, we assume that upgrades use a step
>> size
>> of a single
>> minor version. We don't guarantee skipping minor versions (e.g. 1.11 to
>> 1.14).
>>
>> a bit better. Is it because Flink does not guarantee that a savepoint
>> created by version 1.x can be directly recovered by version 1.y with x + 1
>> < y but users might have to go through a cascade of upgrades? From how I
>> understand your proposal, the compiled plan won't be changed after being
>> written initially. Hence, I would assume that for the plan alone Flink
>> will
>> have to give backwards compatibility guarantees for all versions. Am I
>> understanding this part correctly?
>>
>> Cheers,
>> Till
>>
>> On Thu, Nov 25, 2021 at 4:55 PM Francesco Guardiani <
>> france...@ververica.com>
>> wrote:
>>
>> > Hi Timo,
>> >
>> > Thanks for putting this amazing work together, I have some
>> > considerations/questions
>> > about the FLIP:
>> > *Proposed changes #6*: Other than defining this rule of thumb, we must
>> > also make sure
>> > that compiling plans with these objects that cannot be serialized in the
>> > plan must fail hard,
>> > so users don't bite themselves with such issues, or at least we need to
>> > output warning
>> > logs. In general, whenever the user is trying to use the CompiledPlan
>> APIs
>> > and at the same
>> > time, they're trying to do something "illegal" for the plan, we should
>> > immediately either
>> > log or fail depending on the issue, in order to avoid any surprises once
>> > the user upgrades.
>> > I would also say the same for things like registering a function,
>> > registering a DataStream,
>> > and for every other thing which won't end up in the plan, we should log
>> > such info to the
>> > user by default.
>> >
>> > *General JSON Plan Assumptions #9:* When thinking to connectors and
>> > formats, I think
>> > it's reasonable to assume and keep out of the feature design that no
>> > feature/ability can
>> > deleted from a connector/format. I also don't think new
>> features/abilities
>> > can influence
>> > this FLIP as well, given the plan is static, so if for example,
>> > MyCoolTableSink in the next
>> > flink version implements SupportsProjectionsPushDown, then it shouldn't
>> be
>> > a problem
>> > for the upgrade story since the plan is still configured as computed
>> from
>> > the previous flink
>> > version. What worries me is breaking changes, in particular behavioural
>> > changes that
>> > might happen in 

Re: [VOTE] FLIP-188 Introduce Built-in Dynamic Table Storage

2021-11-30 Thread Timo Walther

Thanks for the healthy discussion. Also +1 from my side for this FLIP.

Thanks,
Timo

On 24.11.21 19:05, Stephan Ewen wrote:

Thanks for all the details and explanation.

With the conclusion of the discussion, also +1 from my side for this FLIP

On Sat, Nov 13, 2021 at 12:23 PM Jingsong Li  wrote:


Thanks Stephan and Timo, I have a rough look at your replies. They are
all valuable opinions. I will take time to discuss, explain and
improve them.

Hi Timo,

At least a final "I will start the vote soon. Last call for comments."

would have been nice.

I replied in the DISCUSS thread that we began to vote. If there are
supplementary comments or reply "pause voting first, I will reply
later", we can suspend or cancel the voting at any time.
I understand why the FLIP must take three days to vote, so that more
people can see it and put forward their opinions.

Best,
Jingsong

On Sat, Nov 13, 2021 at 1:27 AM Timo Walther  wrote:


Hi everyone,

even though the DISCUSS thread was open for 2 weeks. I have the feeling
that the VOTE was initiated to quickly. At least a final "I will start
the vote soon. Last call for comments." would have been nice.

I also added some comments in the DISCUSS thread. Let's hope we can
resolve those soon.

Regards,
Timo

On 12.11.21 16:36, Stephan Ewen wrote:

Hi all!

I have a few questions on the design still, posted those in the

[DISCUSS]

thread.
It would be great to clarify those first before concluding this vote.

Thanks,
Stephan


On Fri, Nov 12, 2021 at 7:22 AM Jark Wu  wrote:


+1 (binding)

Thanks for the great work Jingsong!

Best,
Jark

On Thu, 11 Nov 2021 at 19:41, JING ZHANG 

wrote:



+1 (non-binding)

A small suggestion:
The message queue is currently used to store middle layer data of the
streaming data warehouse. We hope use built-in dynamic table storage

to

store those middle layer.
But those middle data of the streaming data warehouse are often

provided

to

all business teams in a company. Some teams have not use Apache

Flink as

compute engine yet. In order to continue server those teams, the

data in

built-in dynamic table storage may be needed to copied to message

queue

again.
If *the built-in storage could provide same consumer API as the

commonly

used message queues*, data copying may be avoided. So the built-in

dynamic

table storage may be promoted faster in the streaming data warehouse
business.

Best regards,
Jing Zhang

Yufei Zhang  于2021年11月11日周四 上午9:34写道:


Hi,

+1 (non-binding)

Very interesting design. I saw a lot of discussion on the generic
interface design, good to know it will address extensibility.

Cheers,
Yufei


On 2021/11/10 02:51:55 Jingsong Li wrote:

Hi everyone,

Thanks for all the feedback so far. Based on the discussion[1] we

seem

to have consensus, so I would like to start a vote on FLIP-188 for
which the FLIP has now also been updated[2].

The vote will last for at least 72 hours (Nov 13th 3:00 GMT) unless
there is an objection or insufficient votes.

[1]

https://lists.apache.org/thread/tqyn1cro5ohl3c3fkjb1zvxbo03sofn7

[2]







https://cwiki.apache.org/confluence/display/FLINK/FLIP-188%3A+Introduce+Built-in+Dynamic+Table+Storage


Best,
Jingsong














--
Best, Jingsong Lee







[jira] [Created] (FLINK-25107) GlueSchemaRegistryAvroKinesisITCase and GlueSchemaRegistryJsonKinesisITCase are skipped on AzureCI but fail when enabled

2021-11-30 Thread Matthias (Jira)
Matthias created FLINK-25107:


 Summary: GlueSchemaRegistryAvroKinesisITCase and 
GlueSchemaRegistryJsonKinesisITCase are skipped on AzureCI but fail when enabled
 Key: FLINK-25107
 URL: https://issues.apache.org/jira/browse/FLINK-25107
 Project: Flink
  Issue Type: Bug
  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
Affects Versions: 1.15.0
Reporter: Matthias


[GlueSchemaRegistryAvroKinesisITCase|https://github.com/apache/flink/blob/master/flink-end-to-end-tests/flink-glue-schema-registry-avro-test/src/test/java/org/apache/flink/glue/schema/registry/test/GlueSchemaRegistryAvroKinesisITCase.java]
 and 
[GlueSchemaRegistryJsonKinesisITCase|https://github.com/apache/flink/blob/master/flink-end-to-end-tests/flink-glue-schema-registry-json-test/src/test/java/org/apache/flink/glue/schema/registry/test/json/GlueSchemaRegistryJsonKinesisITCase.java],
 are not executed on Azure CI runs, because the access key and secret key env 
variables are not present, see e.g. [this 
run|https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=26986=logs=af184cdd-c6d8-5084-0b69-7e9c67b35f7a=160c9ae5-96fd-516e-1c91-deb81f59292a=15852].

Then, during recent testing on Github Actions, we noticed that the tests don't 
work even if the env variables are present because AWS expects different 
variable names (it expects AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY). See 
[this 
run|https://github.com/ververica/flink/runs/4301833493?check_suite_focus=true#step:13:17885].
{code:java}
Nov 23 18:40:46 Caused by: 
software.amazon.awssdk.core.exception.SdkClientException: Unable to load 
credentials from any of the providers in the chain 
AwsCredentialsProviderChain(credentialsProviders=[SystemPropertyCredentialsProvider(),
 EnvironmentVariableCredentialsProvider(), 
WebIdentityTokenCredentialsProvider(), ProfileCredentialsProvider(), 
ContainerCredentialsProvider(), InstanceProfileCredentialsProvider()]) : 
[SystemPropertyCredentialsProvider(): Unable to load credentials from system 
settings. Access key must be specified either via environment variable 
(AWS_ACCESS_KEY_ID) or system property (aws.accessKeyId)., 
EnvironmentVariableCredentialsProvider(): Unable to load credentials from 
system settings. Access key must be specified either via environment variable 
(AWS_ACCESS_KEY_ID) or system property (aws.accessKeyId)., 
WebIdentityTokenCredentialsProvider(): Either the environment variable 
AWS_WEB_IDENTITY_TOKEN_FILE or the javaproperty aws.webIdentityTokenFile must 
be set., ProfileCredentialsProvider(): Profile file contained no credentials 
for profile 'default': ProfileFile(profiles=[]), 
ContainerCredentialsProvider(): Cannot fetch credentials from container - 
neither AWS_CONTAINER_CREDENTIALS_FULL_URI or 
AWS_CONTAINER_CREDENTIALS_RELATIVE_URI environment variables are set., 
InstanceProfileCredentialsProvider(): The requested metadata is not found at 
http://169.254.169.254/latest/meta-data/iam/security-credentials/]
{code}
Finally, even with correct env variable naming, the test still fails because of 
an assertion error. It looks like only the first record ever makes it to the 
results. See [this 
run|https://github.com/ververica/flink/runs/4315084463?check_suite_focus=true#step:13:5317]
 for the error, also posted here for convenience:
{code:java}
Nov 24 18:00:55 java.lang.AssertionError: Results received from 
'gsr_json_output_stream': 
[JsonDataWithSchema(schema={"$id":"https://example.com/address.schema.json","$schema":"http://json-schema.org/draft-07/schema#","type":"object","properties":{"f1":{"type":"string"},"f2":{"type":"integer","maximum":1}}},
 payload={"f1":"olympic","f2":2020})] expected:<8> but was:<1>
{code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25106) Support tombstone messages in FLINK's "kafka" connector

2021-11-30 Thread Varun Yeligar (Jira)
Varun Yeligar created FLINK-25106:
-

 Summary: Support tombstone messages in FLINK's "kafka" connector
 Key: FLINK-25106
 URL: https://issues.apache.org/jira/browse/FLINK-25106
 Project: Flink
  Issue Type: Improvement
  Components: API / Core
Reporter: Varun Yeligar


Currently, FLINK's "kafka" connector ignores all the tombstone messages, 
whereas the "upsert-kafka" connector supports tombstone messages and sets the 
type of the row to RowKind.DELETE ([Code 
Link|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java#L126]).

I wanted to know if it is feasible to support tombstone messages in "kafka" 
connector by setting all the value fields to NULL and the RowKind to DELETE. I 
could also raise a PR with the respective changes if required.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)