Flink JDBC connect with secret

2021-10-22 Thread Qihua Yang
Hi,

We plan to use JDBC SQL connector to read/write database. I saw JDBC
connector use username and password. Is it possible to use secret(*.crt) to
access database. I didn't find guideline how to use it. How to config jdbc
with secret?

val jdbc: JdbcConnectionOptions =
JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl(url)
.withDriverName("org.postgresql.Driver")
.withUsername(userName)
.withPassword(password)
.build()

Thanks,
Qihua


Re: High availability data clean up

2021-10-22 Thread Weiqing Yang
Thanks for the replies, Yangze and Vijay!

We are using standalone Flink on K8s (we created a K8s operator to manage
the life cycle of the flink clusters (session mode)). Seems there is no way
for the operator to know when these HA related configMaps are created (if
the operator somehow can know when these HA configMap are created, then we
can add ownerRef for them). Please let me know if I missed anything and if
you have any recommended way to clean these HA related data/configMaps when
deleting a flink cluster.

Best,
Wq

On Thu, Oct 21, 2021 at 11:05 PM Vijay Bhaskar 
wrote:

> In HA mode the configMap will be retained  after deletion of the
> deployment:
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/ha/kubernetes_ha/
>   ( Refer High availability data clean up)
>
>
>
> On Fri, Oct 22, 2021 at 8:13 AM Yangze Guo  wrote:
>
>> For application mode, when the job finished normally or be canceled,
>> the ConfigMaps will be cleanup.
>> For session mode, when you stop the session through [1], the
>> ConfigMaps will be cleanup.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/resource-providers/native_kubernetes/#stop-a-running-session-cluster
>>
>> Best,
>> Yangze Guo
>>
>> On Thu, Oct 21, 2021 at 6:37 AM Weiqing Yang 
>> wrote:
>> >
>> >
>> > Hi,
>> >
>> > Per the doc, `kubernetes.jobmanager.owner.reference` can be used to set
>> up the owners of the job manager Deployment. If the owner is deleted, then
>> the job manager and its related pods will be deleted. How about the HA
>> related ConfigMaps? Are they also deleted when deleting the owner of the
>> job manager Deployment? Per the wiki here, the HA data will be retained
>> when deleting jobmanager Deployment. If we want to delete these HA related
>> configMaps as well when deleting the job manager, what is the suggested way
>> to do that?
>> >
>> > Thanks,
>> > weiqing
>> >
>>
>


Re: SplitEnumeratorContext callAsync() cleanup

2021-10-22 Thread Mason Chen
Hi Fabian,

Here we are: https://issues.apache.org/jira/browse/FLINK-24622 


Feel free to modify the description as I lazily copied and pasted our 
discussion here.

Best,
Mason

> On Oct 22, 2021, at 3:31 AM, Fabian Paul  wrote:
> 
> Hi Mason,
> 
> This seems to be a bug with the current KafkaSource and also the unified 
> Sources in general. Can you open a bug ticket in jira? I think the enumerator 
> should take of first joining all the async threads before closing the 
> enumerator.
> 
> Best,
> Fabian



Re: [ANNOUNCE] Apache Flink 1.13.3 released

2021-10-22 Thread Till Rohrmann
Thanks Chesnay and Martijn for managing this release and to everyone who
contributed to it.

Cheers,
Till

On Fri, Oct 22, 2021 at 11:04 AM Yangze Guo  wrote:

> Thank Chesnay, Martijn, and everyone involved!
>
> Best,
> Yangze Guo
>
> On Fri, Oct 22, 2021 at 4:25 PM Yun Tang  wrote:
> >
> > Thanks for Chesnay & Martijn and everyone who made this release happen.
> >
> > Best
> > Yun Tang
> > 
> > From: JING ZHANG 
> > Sent: Friday, October 22, 2021 10:17
> > To: dev 
> > Cc: Martijn Visser ; Jingsong Li <
> jingsongl...@gmail.com>; Chesnay Schepler ; user <
> user@flink.apache.org>
> > Subject: Re: [ANNOUNCE] Apache Flink 1.13.3 released
> >
> > Thank Chesnay, Martijn and every contributor for making this happen!
> >
> >
> > Thomas Weise  于2021年10月22日周五 上午12:15写道:
> >
> > Thanks for making the release happen!
> >
> > On Thu, Oct 21, 2021 at 5:54 AM Leonard Xu  wrote:
> > >
> > > Thanks to Chesnay & Martijn and everyone who made this release happen.
> > >
> > >
> > > > 在 2021年10月21日,20:08,Martijn Visser  写道:
> > > >
> > > > Thank you Chesnay, Leonard and all contributors!
> > > >
> > > > On Thu, 21 Oct 2021 at 13:40, Jingsong Li  > wrote:
> > > > Thanks, Chesnay & Martijn
> > > >
> > > > 1.13.3 really solves many problems.
> > > >
> > > > Best,
> > > > Jingsong
> > > >
> > > > On Thu, Oct 21, 2021 at 6:46 PM Konstantin Knauf  > wrote:
> > > > >
> > > > > Thank you, Chesnay & Martijn, for managing this release!
> > > > >
> > > > > On Thu, Oct 21, 2021 at 10:29 AM Chesnay Schepler <
> ches...@apache.org >
> > > > > wrote:
> > > > >
> > > > > > The Apache Flink community is very happy to announce the release
> of
> > > > > > Apache Flink 1.13.3, which is the third bugfix release for the
> Apache
> > > > > > Flink 1.13 series.
> > > > > >
> > > > > > Apache Flink® is an open-source stream processing framework for
> > > > > > distributed, high-performing, always-available, and accurate data
> > > > > > streaming applications.
> > > > > >
> > > > > > The release is available for download at:
> > > > > > https://flink.apache.org/downloads.html <
> https://flink.apache.org/downloads.html>
> > > > > >
> > > > > > Please check out the release blog post for an overview of the
> > > > > > improvements for this bugfix release:
> > > > > > https://flink.apache.org/news/2021/10/19/release-1.13.3.html <
> https://flink.apache.org/news/2021/10/19/release-1.13.3.html>
> > > > > >
> > > > > > The full release notes are available in Jira:
> > > > > >
> > > > > >
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12350329
> <
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12350329
> >
> > > > > >
> > > > > > We would like to thank all contributors of the Apache Flink
> community
> > > > > > who made this release possible!
> > > > > >
> > > > > > Regards,
> > > > > > Chesnay
> > > > > >
> > > > > >
> > > > >
> > > > > --
> > > > >
> > > > > Konstantin Knauf
> > > > >
> > > > > https://twitter.com/snntrable 
> > > > >
> > > > > https://github.com/knaufk 
> > > >
> > > >
> > > >
> > > > --
> > > > Best, Jingsong Lee
> > >
>


Re: Upgrade from 1.13.1 to 1.13.2/1.13.3 failing

2021-10-22 Thread Chesnay Schepler
The only suggestion I can offer is to take a savepoint with 1.13.1 and 
try to restore from that.


We will investigate the problem in 
https://issues.apache.org/jira/browse/FLINK-24621; currently we don't 
know why you are experiencing this issue.


On 22/10/2021 16:02, Sweta Kalakuntla wrote:

Hi,

We are seeing error while upgrading minor versions from 1.13.1 to 
1.13.2. JobManager is unable to recover the checkpoint state. What 
would be the solution to this issue?


Caused by: org.apache.flink.util.FlinkException: Could not retrieve 
checkpoint 2844 from state handle under 
checkpointID-0002844. This indicates that the retrieved 
state handle is broken. Try cleaning the state handle store.
at 
org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore.retrieveCompletedCheckpoint(DefaultCompletedCheckpointStore.java:309) 
~[flink-dist_2.12-1.13.3.jar:1.13.3]
at 
org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore.recover(DefaultCompletedCheckpointStore.java:151) 
~[flink-dist_2.12-1.13.3.jar:1.13.3]
at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal(CheckpointCoordinator.java:1513) 
~[flink-dist_2.12-1.13.3.jar:1.13.3]
at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreInitialCheckpointIfPresent(CheckpointCoordinator.java:1476) 
~[flink-dist_2.12-1.13.3.jar:1.13.3]
at 
org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:134) 
~[flink-dist_2.12-1.13.3.jar:1.13.3]
at 
org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:342) 
~[flink-dist_2.12-1.13.3.jar:1.13.3]
at 
org.apache.flink.runtime.scheduler.SchedulerBase.(SchedulerBase.java:190) 
~[flink-dist_2.12-1.13.3.jar:1.13.3]
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.(DefaultScheduler.java:122) 
~[flink-dist_2.12-1.13.3.jar:1.13.3]
at 
org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:132) 
~[flink-dist_2.12-1.13.3.jar:1.13.3]
at 
org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:110) 
~[flink-dist_2.12-1.13.3.jar:1.13.3]
at 
org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:340) 
~[flink-dist_2.12-1.13.3.jar:1.13.3]
at 
org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:317) 
~[flink-dist_2.12-1.13.3.jar:1.13.3]
at 
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:107) 
~[flink-dist_2.12-1.13.3.jar:1.13.3]
at 
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:95) 
~[flink-dist_2.12-1.13.3.jar:1.13.3]
at 
org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112) 
~[flink-dist_2.12-1.13.3.jar:1.13.3]
at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown 
Source) ~[?:?]
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) 
~[?:?]

at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown 
Source) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) 
~[?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) 
~[?:?]

at java.lang.Thread.run(Unknown Source) ~[?:?]
Caused by: java.io.InvalidClassException: 
org.apache.flink.runtime.checkpoint.InflightDataRescalingDescriptor$NoRescalingDescriptor; 
local class incompatible: stream classdesc serialVersionUID = 
-5544173933105855751, local class serialVersionUID = 1

at java.io.ObjectStreamClass.initNonProxy(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readNonProxyDesc(Unknown Source) ~[?:?]



Thank you,

Sweta K






Upgrade from 1.13.1 to 1.13.2/1.13.3 failing

2021-10-22 Thread Sweta Kalakuntla
Hi,

We are seeing error while upgrading minor versions from 1.13.1 to 1.13.2.
JobManager is unable to recover the checkpoint state. What would be the
solution to this issue?

Caused by: org.apache.flink.util.FlinkException: Could not retrieve
checkpoint 2844 from state handle under checkpointID-0002844.
This indicates that the retrieved state handle is broken. Try cleaning the
state handle store.
at
org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore.retrieveCompletedCheckpoint(DefaultCompletedCheckpointStore.java:309)
~[flink-dist_2.12-1.13.3.jar:1.13.3]
at
org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore.recover(DefaultCompletedCheckpointStore.java:151)
~[flink-dist_2.12-1.13.3.jar:1.13.3]
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal(CheckpointCoordinator.java:1513)
~[flink-dist_2.12-1.13.3.jar:1.13.3]
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreInitialCheckpointIfPresent(CheckpointCoordinator.java:1476)
~[flink-dist_2.12-1.13.3.jar:1.13.3]
at
org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:134)
~[flink-dist_2.12-1.13.3.jar:1.13.3]
at
org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:342)
~[flink-dist_2.12-1.13.3.jar:1.13.3]
at
org.apache.flink.runtime.scheduler.SchedulerBase.(SchedulerBase.java:190)
~[flink-dist_2.12-1.13.3.jar:1.13.3]
at
org.apache.flink.runtime.scheduler.DefaultScheduler.(DefaultScheduler.java:122)
~[flink-dist_2.12-1.13.3.jar:1.13.3]
at
org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:132)
~[flink-dist_2.12-1.13.3.jar:1.13.3]
at
org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:110)
~[flink-dist_2.12-1.13.3.jar:1.13.3]
at
org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:340)
~[flink-dist_2.12-1.13.3.jar:1.13.3]
at org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:317)
~[flink-dist_2.12-1.13.3.jar:1.13.3]
at
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:107)
~[flink-dist_2.12-1.13.3.jar:1.13.3]
at
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:95)
~[flink-dist_2.12-1.13.3.jar:1.13.3]
at
org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112)
~[flink-dist_2.12-1.13.3.jar:1.13.3]
at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source)
~[?:?]
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
~[?:?]
at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown
Source) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) ~[?:?]
at java.lang.Thread.run(Unknown Source) ~[?:?]
Caused by: java.io.InvalidClassException:
org.apache.flink.runtime.checkpoint.InflightDataRescalingDescriptor$NoRescalingDescriptor;
local class incompatible: stream classdesc serialVersionUID =
-5544173933105855751, local class serialVersionUID = 1
at java.io.ObjectStreamClass.initNonProxy(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readNonProxyDesc(Unknown Source) ~[?:?]



Thank you,

Sweta K


Re: Fwd: How to deserialize Avro enum type in Flink SQL?

2021-10-22 Thread Timo Walther

Hi Peter,

don't get confused by the year 2017 in the ticket. We had better Avro 
support in the meantime but this was based on the old type system around 
TypeInformation. Now we need to build up this support again for the new 
type system. I just found this ticket and found that the title fits. But 
we are planning to have better support soon either 1.15 or 1.16 latest.


Regards,
Timo

On 20.10.21 18:43, Peter Schrott wrote:

Hi Timo,

sorry for being the party-pooper here! :O

My problem with the UDF is, that the SQL select will be passen from 
outside and the outside world does not know about the UDF.


For the UTF8, I know that feature, unfortunately the schema is already 
up and running and can't be touched that easily. But this would actually 
be covered by your UDF suggestion.


Thanks for the update about the the open ticket, its open since 2017 – 
seems not to be fixed in near future. :)


Best, Peter

On Wed, Oct 20, 2021 at 5:55 PM Timo Walther > wrote:


Hi Peter,

as a temporary workaround I would simply implement a UDF like:

public class EverythingToString extends ScalarFunction {

     public String eval(@DataTypeHint(inputGroup = ANY) Object o) {
       return o.toString();
     }
}

For the Utf8 issue, you can instruct Avro to generate Java classes with
String instead using the `avro.java.string` option.

The rework of the type system messed up the Avro support in Flink. This
is a known issue that is tracked under

https://issues.apache.org/jira/browse/FLINK-8183


Regards,
Timo

On 20.10.21 17:30, Peter Schrott wrote:
 > Hi Timo,
 >
 > thanks a lot for your suggestion.
 >
 > I also considered this workaround but when going from DataStreams
API to
 > Table API (using the POJO generated by maven avro plugin) types
are not
 > mapped correctly, esp. UTF8 (avros implementation of CharSquence)
and
 > also enums. In the table I have then mostly RAW types, which are not
 > handy to perform SQL statements on. It is already discussed here:
 > https://www.mail-archive.com/user@flink.apache.org/msg9.html

 > >
 >
 > Best, Peter
 >
 > On Wed, Oct 20, 2021 at 5:21 PM Timo Walther mailto:twal...@apache.org>
 > >> wrote:
 >
 >     A current workaround is to use DataStream API to read the
data and
 >     provide your custom Avro schema to configure the format. Then
switch to
 >     Table API.
 >
 >     StreamTableEnvironment.fromDataStream(...) accepts all data
types. Enum
 >     classes will be represented as RAW types but you can forward
them as
 >     blackboxes or convert them in a UDF.
 >
 >     We will further improve the support of external types in the
Table API
 >     type system in the near future.
 >
 >     Regards,
 >     Timo
 >
 >     On 20.10.21 15:51, Peter Schrott wrote:
 >      > Hi people!
 >      >
 >      > I was digging deeper this days and found the "root cause"
of the
 >     issue and the difference between avro reading from files and avro
 >     reading from Kafka & SR.
 >      >
 >      > plz see:
 >

https://lists.apache.org/x/thread.html/r8ad7bd574f7dc4904139295c7de612a35438571c5b9caac673521d22@%3Cuser.flink.apache.org%3E


 >   
  >

 >      >
 >      > The main problem with Kafka & SR is, that the
 >     "org.apache.avro.generic.GenericDatumReader" is initialized
with and
 >     "expected" schema which is taken from the flinks sql table
 >     definition. When it comes to deserializing the and attribute with
 >     type "enum" it does not match with the expected schema where this
 >     same attribute is typed as "string". Hence avro deserializer
breaks
 >     here.
 >      >
 >      > Not sure how to tackle that issue. The functioning of the
 >     "GeneraticDatumReader" can not really be changed. A solution
could
 >     be to create an analogues reader for reading data based on
SQL ddl.
 >      >
 >      > Cheers, Peter
 >      >
 >      > On 2021/10/12 16:18:30 Dongwon Kim wrote:
 >      >> Hi community,
 >

Re: SplitEnumeratorContext callAsync() cleanup

2021-10-22 Thread Fabian Paul
Hi Mason,

This seems to be a bug with the current KafkaSource and also the unified 
Sources in general. Can you open a bug ticket in jira? I think the enumerator 
should take of first joining all the async threads before closing the 
enumerator.

Best,
Fabian

Re: [ANNOUNCE] Apache Flink 1.13.3 released

2021-10-22 Thread Yangze Guo
Thank Chesnay, Martijn, and everyone involved!

Best,
Yangze Guo

On Fri, Oct 22, 2021 at 4:25 PM Yun Tang  wrote:
>
> Thanks for Chesnay & Martijn and everyone who made this release happen.
>
> Best
> Yun Tang
> 
> From: JING ZHANG 
> Sent: Friday, October 22, 2021 10:17
> To: dev 
> Cc: Martijn Visser ; Jingsong Li 
> ; Chesnay Schepler ; user 
> 
> Subject: Re: [ANNOUNCE] Apache Flink 1.13.3 released
>
> Thank Chesnay, Martijn and every contributor for making this happen!
>
>
> Thomas Weise  于2021年10月22日周五 上午12:15写道:
>
> Thanks for making the release happen!
>
> On Thu, Oct 21, 2021 at 5:54 AM Leonard Xu  wrote:
> >
> > Thanks to Chesnay & Martijn and everyone who made this release happen.
> >
> >
> > > 在 2021年10月21日,20:08,Martijn Visser  写道:
> > >
> > > Thank you Chesnay, Leonard and all contributors!
> > >
> > > On Thu, 21 Oct 2021 at 13:40, Jingsong Li  > > > wrote:
> > > Thanks, Chesnay & Martijn
> > >
> > > 1.13.3 really solves many problems.
> > >
> > > Best,
> > > Jingsong
> > >
> > > On Thu, Oct 21, 2021 at 6:46 PM Konstantin Knauf  > > > wrote:
> > > >
> > > > Thank you, Chesnay & Martijn, for managing this release!
> > > >
> > > > On Thu, Oct 21, 2021 at 10:29 AM Chesnay Schepler  > > > >
> > > > wrote:
> > > >
> > > > > The Apache Flink community is very happy to announce the release of
> > > > > Apache Flink 1.13.3, which is the third bugfix release for the Apache
> > > > > Flink 1.13 series.
> > > > >
> > > > > Apache Flink® is an open-source stream processing framework for
> > > > > distributed, high-performing, always-available, and accurate data
> > > > > streaming applications.
> > > > >
> > > > > The release is available for download at:
> > > > > https://flink.apache.org/downloads.html 
> > > > > 
> > > > >
> > > > > Please check out the release blog post for an overview of the
> > > > > improvements for this bugfix release:
> > > > > https://flink.apache.org/news/2021/10/19/release-1.13.3.html 
> > > > > 
> > > > >
> > > > > The full release notes are available in Jira:
> > > > >
> > > > > https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12350329
> > > > >  
> > > > > 
> > > > >
> > > > > We would like to thank all contributors of the Apache Flink community
> > > > > who made this release possible!
> > > > >
> > > > > Regards,
> > > > > Chesnay
> > > > >
> > > > >
> > > >
> > > > --
> > > >
> > > > Konstantin Knauf
> > > >
> > > > https://twitter.com/snntrable 
> > > >
> > > > https://github.com/knaufk 
> > >
> > >
> > >
> > > --
> > > Best, Jingsong Lee
> >


Re: how to delete all rows one by one in batch execution mode;shutdown cluster after all tasks finished

2021-10-22 Thread vtygoss
Hi!


Thanks for your reply! I think i didn't make myself clear for problem 1, so i 
draw a picture. 


1."tables in DB": things start at the database, and we sync the tables' 
changelog to dynamic tables by CDC tool. Each changelog includes RowData and 
RowKind such as INSERT / UPDATE / DELETE. 
2. "logics": modeling like ods / dwd 
3. "Table 1": a table which has some downstream tables. Table 2 is produced by 
"count(1) from table_1" and input Table 1; Table 3 is produced by "udf(...) 
from table_1 where a>0" and input Table 1. And when an insert event or delete 
event occurs in Table 1, Table 2 and Table 3 will change accordingly, as does 
the downstream tables of Table 2 and Table 3. 


- problem: The logic which generates Table 1 changes from "select * from 
table_0 where a>0" to "select * from table_0 where a<0". The old data in Table 
1 generated by filter "a>0" is error now, and all downstream tables of Table 1 
are error too.  So I want to find an easy way to truncate error data in Table 1 
and all downstream tables of Table 1, but truncating Table 1 does not emit 
deletion event of each record in Table 1, so truncating doesn't trigger the 
deletion of corresponding records in all downstream tables which i think is the 
most important. Now I want to read all records in Table 1 and modify the 
rowkind of each Row from RowKind.INSERT to RowKind.DELETE, but i didn't find 
correspond API in BatchTableEnvironment or BatchExecutionEnvironment, code as 
below.


```
TableEnvironemnt tenv; 
Table t1 = tenv.from("table_1 /*+OPTIONS('read.streaming.enabled'='false')*/") 

Table t2 = t1.map(row->row.setRowKind(RowKind.DELETE))
t2.insertInto("table_1")
```


The suggestion creating a new table based on new logic, "new Table 1' " as 
shown in pic. I think creating new table will not solute this problem unless 
createing all downstream tables of Table 1 for example Table 2', but it's too 
heavy. 


Thanks for your suggestion. Do you have any other suggestions? 


Best Regards!






 






在 2021年10月22日 10:55,Caizhi Weng 写道:


Hi!


For problem 1, Flink does not support deleting specific records. As you're 
running a batch job, I suggest creating a new table based on the new filter 
condition. Even if you can delete the old records you'll still have to generate 
the new ones, so why not generate them directly into a new place?


For problem 2, yarn-cluster is the mode for a yarn session cluster, which means 
the cluster will remain even after the job is finished. If you want to finish 
the Flink job as well as the yarn job, use yarn-per-job mode instead.


vtygoss  于2021年10月21日周四 下午2:46写道:



Hi, community!


I am working on building data processing pipeline based on changelog(CDC) and i 
met two problems. 


--(sql_0)--> Table A --(sql_1)---> Table B --->other tables downstream
  --(sql_2)--->Table C---> other tables downstream


Table A is generated based on sql_0; Table B is generated based on sql_1 and 
input Table A; Table C is generated based on sql_2 and input Table A; Table B 
and C have some downstream tables based on modeling. 


- problem 1. When sql_0 logic is changed, e.g. from "select * from xx where 
a>0" to " from xx where a<0", the data produced by filter "a>0" is error. I 
want to find a way to clear the error data in Table A and trigger the 
corresponding deletions of all tables downstream, then produce new data by new 
filter a<0. So how to change the rowkind of each row in Table A to 
RowKind.DELETE in Flink Batch execution mode? It will be very nice if there is 
an use case of Flink 1.12.0.  


- problem 2.  I found that Flink will launch a session cluster even runtime 
mode is "yarn-cluster". In batch execution mode, the cluster still run after 
all tasks finished. How to shutdown the cluster? 




Thanks for your any suggestion or reply!


Best Regards!

787261bd-dff3-4162-9f6e-876312bd312b.png
Description: Binary data


Re: [ANNOUNCE] Apache Flink 1.13.3 released

2021-10-22 Thread Yun Tang
Thanks for Chesnay & Martijn and everyone who made this release happen.

Best
Yun Tang

From: JING ZHANG 
Sent: Friday, October 22, 2021 10:17
To: dev 
Cc: Martijn Visser ; Jingsong Li 
; Chesnay Schepler ; user 

Subject: Re: [ANNOUNCE] Apache Flink 1.13.3 released

Thank Chesnay, Martijn and every contributor for making this happen!


Thomas Weise mailto:t...@apache.org>> 于2021年10月22日周五 上午12:15写道:
Thanks for making the release happen!

On Thu, Oct 21, 2021 at 5:54 AM Leonard Xu 
mailto:xbjt...@gmail.com>> wrote:
>
> Thanks to Chesnay & Martijn and everyone who made this release happen.
>
>
> > 在 2021年10月21日,20:08,Martijn Visser 
> > mailto:mart...@ververica.com>> 写道:
> >
> > Thank you Chesnay, Leonard and all contributors!
> >
> > On Thu, 21 Oct 2021 at 13:40, Jingsong Li 
> > mailto:jingsongl...@gmail.com> 
> > >> wrote:
> > Thanks, Chesnay & Martijn
> >
> > 1.13.3 really solves many problems.
> >
> > Best,
> > Jingsong
> >
> > On Thu, Oct 21, 2021 at 6:46 PM Konstantin Knauf 
> > mailto:kna...@apache.org> 
> > >> wrote:
> > >
> > > Thank you, Chesnay & Martijn, for managing this release!
> > >
> > > On Thu, Oct 21, 2021 at 10:29 AM Chesnay Schepler 
> > > mailto:ches...@apache.org> 
> > > >>
> > > wrote:
> > >
> > > > The Apache Flink community is very happy to announce the release of
> > > > Apache Flink 1.13.3, which is the third bugfix release for the Apache
> > > > Flink 1.13 series.
> > > >
> > > > Apache Flink® is an open-source stream processing framework for
> > > > distributed, high-performing, always-available, and accurate data
> > > > streaming applications.
> > > >
> > > > The release is available for download at:
> > > > https://flink.apache.org/downloads.html 
> > > > 
> > > >
> > > > Please check out the release blog post for an overview of the
> > > > improvements for this bugfix release:
> > > > https://flink.apache.org/news/2021/10/19/release-1.13.3.html 
> > > > 
> > > >
> > > > The full release notes are available in Jira:
> > > >
> > > > https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12350329
> > > >  
> > > > 
> > > >
> > > > We would like to thank all contributors of the Apache Flink community
> > > > who made this release possible!
> > > >
> > > > Regards,
> > > > Chesnay
> > > >
> > > >
> > >
> > > --
> > >
> > > Konstantin Knauf
> > >
> > > https://twitter.com/snntrable 
> > >
> > > https://github.com/knaufk 
> >
> >
> >
> > --
> > Best, Jingsong Lee
>