Re: [ANNOUNCE] New Apache Flink Committer - Jane Chan

2023-10-16 Thread LakeShen
Congratulations, Ron!

Best,
LakeShen

Sergey Nuyanzin  于2023年10月16日周一 16:18写道:

> Congratulations, Jane!
>
> On Mon, Oct 16, 2023 at 9:39 AM Qingsheng Ren  wrote:
>
> > Congratulations and welcome, Jane!
> >
> > Best,
> > Qingsheng
> >
> > > On Oct 16, 2023, at 14:20, Benchao Li  wrote:
> > >
> > > Congrats, Jane! Well deserved~
> > >
> > > Leonard Xu  于2023年10月16日周一 14:19写道:
> > >>
> > >>
> > >>> Congrats! I noticed Jane has been around for a while; well-deserved.
> > >>>
> > >>> Best,
> > >>> tison.
> > >>
> > >> +1
> > >>
> > >> Best,
> > >> Leonard
> > >>
> > >>
> > >>
> > >>>
> > >>>
> > >>> Jark Wu  于2023年10月16日周一 09:58写道:
> > >>>
> > >>>> Hi, everyone
> > >>>>
> > >>>> On behalf of the PMC, I'm very happy to announce Jane Chan as a new
> > Flink
> > >>>> Committer.
> > >>>>
> > >>>> Jane started code contribution in Jan 2021 and has been active in
> the
> > Flink
> > >>>> community since. She authored more than 60 PRs and reviewed more
> than
> > 40
> > >>>> PRs. Her contribution mainly revolves around Flink SQL, including
> Plan
> > >>>> Advice (FLIP-280), operator-level state TTL (FLIP-292), and ALTER
> > TABLE
> > >>>> statements (FLINK-21634). Jane participated deeply in development
> > >>>> discussions and also helped answer user question emails. Jane was
> > also a
> > >>>> core contributor of Flink Table Store (now Paimon) when the project
> > was in
> > >>>> the early days.
> > >>>>
> > >>>> Please join me in congratulating Jane Chan for becoming a Flink
> > Committer!
> > >>>>
> > >>>> Best,
> > >>>> Jark Wu (on behalf of the Flink PMC)
> > >>>>
> > >>
> > >
> > >
> > > --
> > >
> > > Best,
> > > Benchao Li
> >
> >
>
> --
> Best regards,
> Sergey
>


Re: [ANNOUNCE] New Apache Flink Committer - Ron Liu

2023-10-16 Thread LakeShen
Congratulations, Ron!

Best,
LakeShen

Sergey Nuyanzin  于2023年10月16日周一 16:17写道:

> Congratulations, Ron!
>
> On Mon, Oct 16, 2023 at 9:38 AM Qingsheng Ren  wrote:
>
> > Congratulations and welcome aboard, Ron!
> >
> > Best,
> > Qingsheng
> >
> > > On Oct 16, 2023, at 11:22, yuxia  wrote:
> > >
> > > Congratulations, Ron!
> > >
> > > Best regards,
> > > Yuxia
> > >
> > > - 原始邮件 -
> > > 发件人: "Feng Jin" 
> > > 收件人: "dev" 
> > > 发送时间: 星期一, 2023年 10 月 16日 上午 11:29:55
> > > 主题: Re: [ANNOUNCE] New Apache Flink Committer - Ron Liu
> > >
> > > Congratulations, Ron!
> > >
> > > Best,
> > > Feng
> > >
> > > On Mon, Oct 16, 2023 at 11:22 AM yh z 
> wrote:
> > >
> > >> Congratulations, Ron!
> > >>
> > >> Best,
> > >> Yunhong (SwuferHong)
> > >>
> > >> Yuxin Tan  于2023年10月16日周一 11:12写道:
> > >>
> > >>> Congratulations, Ron!
> > >>>
> > >>> Best,
> > >>> Yuxin
> > >>>
> > >>>
> > >>> Junrui Lee  于2023年10月16日周一 10:24写道:
> > >>>
> > >>>> Congratulations Ron !
> > >>>>
> > >>>> Best,
> > >>>> Junrui
> > >>>>
> > >>>> Yun Tang  于2023年10月16日周一 10:22写道:
> > >>>>
> > >>>>> Congratulations, Ron!
> > >>>>>
> > >>>>> Best
> > >>>>> Yun Tang
> > >>>>> 
> > >>>>> From: yu zelin 
> > >>>>> Sent: Monday, October 16, 2023 10:16
> > >>>>> To: dev@flink.apache.org 
> > >>>>> Cc: ron9@gmail.com 
> > >>>>> Subject: Re: [ANNOUNCE] New Apache Flink Committer - Ron Liu
> > >>>>>
> > >>>>> Congratulations!
> > >>>>>
> > >>>>> Best,
> > >>>>> Yu Zelin
> > >>>>>
> > >>>>>> 2023年10月16日 09:56,Jark Wu  写道:
> > >>>>>>
> > >>>>>> Hi, everyone
> > >>>>>>
> > >>>>>> On behalf of the PMC, I'm very happy to announce Ron Liu as a new
> > >>> Flink
> > >>>>>> Committer.
> > >>>>>>
> > >>>>>> Ron has been continuously contributing to the Flink project for
> > >> many
> > >>>>> years,
> > >>>>>> authored and reviewed a lot of codes. He mainly works on Flink SQL
> > >>>> parts
> > >>>>>> and drove several important FLIPs, e.g., USING JAR (FLIP-214),
> > >>> Operator
> > >>>>>> Fusion CodeGen (FLIP-315), Runtime Filter (FLIP-324). He has a
> > >> great
> > >>>>>> knowledge of the Batch SQL and improved a lot of batch performance
> > >> in
> > >>>> the
> > >>>>>> past several releases. He is also quite active in mailing lists,
> > >>>>>> participating in discussions and answering user questions.
> > >>>>>>
> > >>>>>> Please join me in congratulating Ron Liu for becoming a Flink
> > >>>> Committer!
> > >>>>>>
> > >>>>>> Best,
> > >>>>>> Jark Wu (on behalf of the Flink PMC)
> > >>>>>
> > >>>>>
> > >>>>
> > >>>
> > >>
> >
> >
>
> --
> Best regards,
> Sergey
>


[jira] [Created] (FLINK-32528) The RexCall a = a,if a's datatype is nullable, and when a is null, a = a is null, it isn't true in BinaryComparisonExprReducer

2023-07-04 Thread LakeShen (Jira)
LakeShen created FLINK-32528:


 Summary: The RexCall a = a,if a's datatype is nullable, and when a 
is null, a = a is null, it isn't true in BinaryComparisonExprReducer
 Key: FLINK-32528
 URL: https://issues.apache.org/jira/browse/FLINK-32528
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.17.0
Reporter: LakeShen






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-18440) ROW_NUMBER function: ROW/RANGE not allowed with RANK, DENSE_RANK or ROW_NUMBER functions

2020-06-27 Thread LakeShen (Jira)
LakeShen created FLINK-18440:


 Summary: ROW_NUMBER function: ROW/RANGE not allowed with RANK, 
DENSE_RANK or ROW_NUMBER functions
 Key: FLINK-18440
 URL: https://issues.apache.org/jira/browse/FLINK-18440
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.11.0
Reporter: LakeShen
 Fix For: 1.11.1


When I run flink sql ,the flink sql like this:

create view test as select  name, eat ,sum(age) as cnt from test_source group 
by  name,eat;

create view results as select *, ROW_NUMBER() OVER (PARTITION BY name ORDER 
BY cnt DESC) as row_num from test;

The same sql code I could run success in flink 1.10, now I change the flink 
version into flink 1.11, it throw the exception.





--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-18376) java.lang.ArrayIndexOutOfBoundsException in RetractableTopNFunction

2020-06-19 Thread LakeShen (Jira)
LakeShen created FLINK-18376:


 Summary: java.lang.ArrayIndexOutOfBoundsException in 
RetractableTopNFunction
 Key: FLINK-18376
 URL: https://issues.apache.org/jira/browse/FLINK-18376
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.10.0
Reporter: LakeShen
 Fix For: 1.11.1


java.lang.ArrayIndexOutOfBoundsException: -1
at java.util.ArrayList.elementData(ArrayList.java:422)
at java.util.ArrayList.get(ArrayList.java:435)
at 
org.apache.flink.table.runtime.operators.rank.RetractableTopNFunction.retractRecordWithoutRowNumber(RetractableTopNFunction.java:392)
at 
org.apache.flink.table.runtime.operators.rank.RetractableTopNFunction.processElement(RetractableTopNFunction.java:160)
at 
org.apache.flink.table.runtime.operators.rank.RetractableTopNFunction.processElement(RetractableTopNFunction.java:54)
at 
org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:173)
at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)
at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)
at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
at java.lang.Thread.run(Thread.java:748)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


What is the RocksDB local directory in flink checkpointing?

2020-05-05 Thread LakeShen
Hi community,

Now I have a question about flink checkpoint local directory , our flink
version is 1.6, job mode is

flink on yarn per job . I saw the flink source code , and I find the flink
checkpoint local directory is

/tmp when you didn't config the "state.backend.rocksdb.localdir". But I go
into the /tmp dir ,I

couldn't find the flink checkpoint state local directory.

What is the RocksDB local directory in flink checkpointing?  I am looking
forward to your reply.

Best,
LakeShen


Flink On Yarn , ResourceManager is HA , if active ResourceManager changed,what is flink task status ?

2020-04-15 Thread LakeShen
Hi community,

I have a question about flink on yarn ha , if active resourcemanager
changed, what is the flink task staus. Is flink task running normally?
Should I must restart my flink task to run?

Thanks to your reply.

Best,
LakeShen


Re: [VOTE] FLIP-122: New Connector Property Keys for New Factory

2020-04-02 Thread LakeShen
+1 (non-binding)

Benchao Li  于2020年4月3日周五 上午9:50写道:

> +1 (non-binding)
>
> Dawid Wysakowicz  于2020年4月3日周五 上午12:33写道:
>
> > +1
> >
> > Best,
> >
> > Dawid
> >
> > On 02/04/2020 18:28, Timo Walther wrote:
> > > +1
> > >
> > > Thanks,
> > > Timo
> > >
> > > On 02.04.20 17:22, Jark Wu wrote:
> > >> Hi all,
> > >>
> > >> I would like to start the vote for FLIP-122 [1], which is discussed
> and
> > >> reached a consensus in the discussion thread [2].
> > >>
> > >> The vote will be open for at least 72h, unless there is an objection
> > >> or not
> > >> enough votes.
> > >>
> > >> Thanks,
> > >> Timo
> > >>
> > >> [1]
> > >>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-122%3A+New+Connector+Property+Keys+for+New+Factory
> > >>
> > >> [2]
> > >>
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-122-New-Connector-Property-Keys-for-New-Factory-td39462.html
> > >>
> > >>
> > >
> >
> >
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>


Question about the flink 1.6 memory config

2020-03-31 Thread LakeShen
Hi community,

Now I am optimizing the flink 1.6 task memory configuration. I see the
source code, at first, the flink task config the cut-off memory, cut-off
memory = Math.max(600,containerized.heap-cutoff-ratio  * TaskManager
Memory), containerized.heap-cutoff-ratio default value is 0.25. For
example, if TaskManager Memory is 4G, cut-off memory is 1 G.

However, I set the taskmanager's gc.log, I find the  metaspace only used 60
MB. I personally feel that the memory configuration of cut-off is a little
too large. Can this cut-off memory configuration be reduced, like making
the containerized.heap-cutoff-ratio be 0.15.
Is there any problem for this config?

I am looking forward to your reply.

Best wishes,
LakeShen


Re: [DISCUSS] FLIP-122: New Connector Property Keys for New Factory

2020-03-29 Thread LakeShen
Hi Jark,

I am really looking forward to this feature. I think this feature
could simplify flink sql code,and at the same time ,
it could make the developer more easlier to config the flink sql WITH
options.

Now when I am using flink sql to write flink task , sometimes I think the
WITH options is too long for user.
For example,I config the kafka source connector parameter,for consumer
group and brokers parameter:

  'connector.properties.0.key' = 'group.id'
>  , 'connector.properties.0.value' = 'xxx'
>  , 'connector.properties.1.key' = 'bootstrap.servers'
>  , 'connector.properties.1.value' = 'x'
>

I can understand this config , but for the flink fresh man,maybe it
is confused for him.
In my thought, I am really looking forward to this feature,thank you to
propose this feature.

Best wishes,
LakeShen


Jark Wu  于2020年3月30日周一 下午2:02写道:

> Hi everyone,
>
> I want to start a discussion about further improve and simplify our current
> connector porperty keys, aka WITH options. Currently, we have a
> 'connector.' prefix for many properties, but they are verbose, and we see a
> big inconsistency between the properties when designing FLIP-107.
>
> So we propose to remove all the 'connector.' prefix and rename
> 'connector.type' to 'connector', 'format.type' to 'format'. So a new Kafka
> DDL may look like this:
>
> CREATE TABLE kafka_table (
>  ...
> ) WITH (
>  'connector' = 'kafka',
>  'version' = '0.10',
>  'topic' = 'test-topic',
>  'startup-mode' = 'earliest-offset',
>  'properties.bootstrap.servers' = 'localhost:9092',
>  'properties.group.id' = 'testGroup',
>  'format' = 'json',
>  'format.fail-on-missing-field' = 'false'
> );
>
> The new connector property key set will come together with new Factory
> inferface which is proposed in FLIP-95. Old properties are still compatible
> with their existing implementation. New properties are only available in
> new DynamicTableFactory implementations.
>
> You can access the detailed FLIP here:
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-122%3A+New+Connector+Property+Keys+for+New+Factory
>
> Best,
> Jark
>


The question about the FLIP-45

2020-03-19 Thread LakeShen
Hi community,

Now I am reading the FLIP-45 Reinforce Job Stop Semantic, I have three
questions about it :
1. What the command to use to stop the Flink task, stop or cancel?

2. If use stop command to stop filnk task , but I see the flink source code
, the stop command we can set the savepoint dir , if we didn't set it , the
default savepoint dir will use . Both the target Savepoint  Dir or default
savepoint dir are null , the flink will throw the exception. But in FLIP-45
, If retained checkpoint is enabled, we should always do a checkpoint when
stopping job. I can't find this code.

Thanks to your reply.

Best regards,
LakeShen


[jira] [Created] (FLINK-16681) Jdbc JDBCOutputFormat and JDBCLookupFunction PreparedStatement loss connection, if long time not records to write.

2020-03-19 Thread LakeShen (Jira)
LakeShen created FLINK-16681:


 Summary: Jdbc JDBCOutputFormat   and JDBCLookupFunction 
PreparedStatement loss connection, if long time not records to write.
 Key: FLINK-16681
 URL: https://issues.apache.org/jira/browse/FLINK-16681
 Project: Flink
  Issue Type: Improvement
Reporter: LakeShen
 Fix For: 1.10.2


In my thought, jdbc connector is the one of most frequently used connector in 
flink .  But maybe there is a problem for jdbc connector.  For example, if 
there are no records to write or join with dim table for a long time  , the 
exception will throw like this :

 java.sql.SQLException: No operations allowed after statement closed

Because there are long time no records to write , the PreparedStatement loss 
the connection.
If there is no  other jira to solve this problem , can you assign this jira to 
me , I will try my best to solve it , thank you .



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-16639) Flink SQL Kafka source connector, add the no json format filter params when format.type is json

2020-03-17 Thread LakeShen (Jira)
LakeShen created FLINK-16639:


 Summary: Flink SQL Kafka source connector, add the no json format 
filter params when format.type is json
 Key: FLINK-16639
 URL: https://issues.apache.org/jira/browse/FLINK-16639
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Kafka
Reporter: LakeShen
 Fix For: 1.10.2


In my thought, kafka source connector is the one of most frequently used 
connector in flink sql.  Flink sql kafka source connector supports the json,csv 
or other data format. But there is a problem for json format in kafka source 
connector.  For example, flink sql kafka source ddl l 
 like this:
CREATE TABLE team_olap_table (
a varchar,
b varchar,
  )
  with (
'connector.type' = 'kafka', 
'connector.version' = '0.10', 
'connector.topic' = topics', 
'connector.properties.0.key' = 'group.id', 
'connector.properties.0.value' = 'hello_world', 
'connector.properties.1.key' = 'bootstrap.servers', 
'connector.properties.1.value' = 'xxx', 
'connector.property-version' = '1', 
'connector.startup-mode' = 'latest-offset', 
'format.type' = 'json', 
'format.property-version' = '1', 
'format.derive-schema' = 'true', 
'update-mode' = 'append'
);

If the kafka topic messages are not json format ,just one or two records,the 
flink sql task will fail-over all the time .
In order to solve this problem , if flink sql source connector use the 
json-format, I want to add the 'format.fail-on-not-json-record' param  in 
flink-json module, if this param is true(default),when read the no-json 
records, the flink will fail, if this param is false, the flink sql task will 
filter no-json records,the flink task running normally.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] FLIP-115: Filesystem connector in Table

2020-03-17 Thread LakeShen
Hi Jingsong ,

I am looking forward this feature. Because in some streaming application,it
need transfer their messages to hdfs , in order to offline analysis.

Best wishes,
LakeShen

Stephan Ewen  于2020年3月17日周二 下午7:42写道:

> I would really like to see us converging the stack and the functionality
> here.
> Meaning to try and use the same sinks in the Table API as for the
> DataStream API, and using the same sink for batch and streaming.
>
> The StreamingFileSink has a lot of things that can help with that. If
> possible, it would be nice to extend it (which would help move towards the
> above goal) rather than build a second sink. Building a second sink leads
> us further away from unification.
>
> I am a bit puzzled by the statement that sinks are primarily for Hive. The
> Table API should not be coupled to Hive, it should be an independent
> batch/streaming API for many use cases, supporting very well for batch and
> streaming interplay. Supporting Hive is great, but we should not be
> building this towards Hive, as just yet another Hive runtime. Why "yet
> another Hive runtime" when what we have a unique streaming engine that can
> do much more? We would drop our own strength and reduce ourselves to a
> limited subset.
>
> Let's build a File Sink that can also support Hive, but can do so much
> more. For example, efficient streaming file ingestion as materialized views
> from changelogs.
>
>
> *## Writing Files in Streaming*
>
> To write files in streaming, I don't see another way than using the
> streaming file sink. If you want to write files across checkpoints, support
> exactly-once, and support consistent "stop with savepoint", it is not
> trivial.
>
> A part of the complexity comes from the fact that not all targets are
> actually file systems, and not all have simple semantics for persistence.
> S3 for example does not support renames (only copies, which may take a lot
> of time) and it does not support flush/sync of data (the S3 file system in
> Hadoop exposes that but it does not work. flush/sync, followed by a
> failure, leads to data loss). You need to devise a separate protocol for
> that, which is exactly what has already been done and abstracted behind the
> recoverable writers.
>
> If you re-engineer that in the, you will end up either missing many things
> (intermediate persistence on different file systems, and atomic commit in
> the absence of renames, etc.), or you end up doing something similar as the
> recoverable writers do.
>
>
> *## Atomic Commit in Batch*
>
> For batch sinks, it is also desirable to write the data first and then
> atomically commit it once the job is done.
> Hadoop has spent a lot of time making this work, see this doc here,
> specifically the section on 'The "Magic" Committer'. [1]
>
> What Flink has built in the RecoverableWriter is in some way an even better
> version of this, because it works without extra files (we pass data through
> checkpoint state) and it supports not only committing once at the end, but
> committing multiple time intermediate parts during checkpoints.
>
> Meaning using the recoverable writer mechanism in batch would allow us to
> immediately get the efficient atomic commit implementations on file://
> hdfs:// and s3://, with a well defined way to implement it also for other
> file systems.
>
>
> *## Batch / Streaming Unification*
>
> It would be great to start looking at these things in the same way:
>   - streaming (exactly-once): commits files (after finished) at the next
> checkpoint
>   - batch: single commit at the end of the job
>
>
> *## DataStream / Table API Stack Unification*
>
> Having the same set of capabilities would make it much easier for users to
> understand the system.
> Especially when it comes to consistent behavior across external systems.
> Having a different file sink in Table API and DataStream API means that
> DataStream can write correctly to S3 while Table API cannot.
>
>
> *## What is missing?*
>
> It seems there are some things that get in the way of naturally
> Can you make a list of what features are missing in the StreamingFileSink
> that make it usable for the use cases you have in mind?
>
> Best,
> Stephan
>
> [1]
>
> https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/committer_architecture.html
>
>
> On Mon, Mar 16, 2020 at 12:31 PM Jingsong Li 
> wrote:
>
> > Hi Piotr,
> >
> > I am very entangled.
> >
> > Let me re-list the table streaming sink requirements:
> > - In table, maybe 90% sinks are for Hive. The parquet and orc are the
> most
> > important formats. Hive provide RecordWriters, it is 

Question about RocksDBStateBackend Compaction Filter state cleanup

2020-03-17 Thread LakeShen
Hi community ,

I see the flink RocksDBStateBackend state cleanup,now the code like this :

StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1))
.cleanupInRocksdbCompactFilter(1000)
.build();



> The default background cleanup for RocksDB backend queries the current
> timestamp each time 1000 entries have been processed.


What's the meaning of  1000 entries? 1000 different key ?

Thanks to your reply.

Best regards,
LakeShen


Re: How to change the flink web-ui jobServer?

2020-03-14 Thread LakeShen
Ok, thanks! Arvid

Arvid Heise  于2020年3月10日周二 下午4:14写道:

> Hi LakeShen,
>
> you can change the port with
>
> conf.setInteger(RestOptions.PORT, 8082);
>
> or if want to be on the safe side specify a range
>
> conf.setString(RestOptions.BIND_PORT, "8081-8099");
>
>
> On Mon, Mar 9, 2020 at 10:47 AM LakeShen 
> wrote:
>
>> Hi community,
>>now I am moving the flink job to k8s,and I plan to use the ingress
>> to show the flink web ui  , the problem is that fink job server aren't
>> correct, so I want to change the flink web-ui jobserver ,I don't find the
>> any method  to change it ,are there some method to do that?
>>Thanks to your reply.
>>
>> Best wishes,
>> LakeShen
>>
>


Re: Cancel the flink task and restore from checkpoint ,can I change the flink operator's parallelism

2020-03-13 Thread LakeShen
Hi Eleanore , if you resume from savepoint , you can't change the flink
operator's max parallelism .

Eleanore Jin  于2020年3月14日周六 上午12:51写道:

> Hi Piotr,
> Does this also apply to savepoint? (meaning the max parallelism should not
> change for job resume from savepoint?)
>
> Thanks a lot!
> Eleanore
>
> On Fri, Mar 13, 2020 at 6:33 AM Piotr Nowojski 
> wrote:
>
> > Hi,
> >
> > Yes, you can change the parallelism. One thing that you can not change is
> > “max parallelism”.
> >
> > Piotrek
> >
> > > On 13 Mar 2020, at 04:34, Sivaprasanna 
> > wrote:
> > >
> > > I think you can modify the operator’s parallelism. It is only if you
> > have set maxParallelism, and while restoring from a checkpoint, you
> > shouldn’t modify the maxParallelism. Otherwise, I believe the state will
> be
> > lost.
> > >
> > > -
> > > Sivaprasanna
> > >
> > > On Fri, 13 Mar 2020 at 9:01 AM, LakeShen  > <mailto:shenleifight...@gmail.com>> wrote:
> > > Hi community,
> > >   I have a question is that I cancel the flink task and retain the
> > checkpoint dir, then restore from the checkpoint dir ,can I change the
> > flink operator's parallelism,in my thoughts, I think I can't change the
> > flink operator's parallelism,but I am not sure.
> > >  Thanks to your reply.
> > >
> > > Best wishes,
> > > LakeShen
> >
> >
>


Re: Flink 1.10 StopWithSavepoint only suit for the sources that implement the StoppableFunction interface?

2020-03-12 Thread LakeShen
Thanks a lot!, tison

tison  于2020年3月12日周四 下午5:56写道:

> The StoppableFunction is gone.
>
> See also https://issues.apache.org/jira/browse/FLINK-11889
>
> Best,
> tison.
>
>
> LakeShen  于2020年3月12日周四 下午5:44写道:
>
>> Hi community,
>> now  I am seeing the FLIP-45 , as I see the stop command only
>> suit for the sources that implement the StoppableFunction interface.
>> So I have a question is that if I use StopWithSavepoint command
>> to stop my flink task , just like './flink stop -p xxx ...', this
>> command only suit for the sources that implement the StoppableFunction
>> interface, is it correct?
>> Thanks to your reply.
>>
>> Best wishes,
>> LakeShen
>>
>


Cancel the flink task and restore from checkpoint ,can I change the flink operator's parallelism

2020-03-12 Thread LakeShen
Hi community,
  I have a question is that I cancel the flink task and retain the
checkpoint dir, then restore from the checkpoint dir ,can I change the
flink operator's parallelism,in my thoughts, I think I can't change the
flink operator's parallelism,but I am not sure.
 Thanks to your reply.

Best wishes,
LakeShen


Flink 1.10 StopWithSavepoint only suit for the sources that implement the StoppableFunction interface?

2020-03-12 Thread LakeShen
Hi community,
now  I am seeing the FLIP-45 , as I see the stop command only suit
for the sources that implement the StoppableFunction interface.
So I have a question is that if I use StopWithSavepoint command to
stop my flink task , just like './flink stop -p xxx ...', this command only
suit for the sources that implement the StoppableFunction interface, is it
correct?
Thanks to your reply.

Best wishes,
LakeShen


How to change the flink web-ui jobServer?

2020-03-09 Thread LakeShen
Hi community,
   now I am moving the flink job to k8s,and I plan to use the ingress
to show the flink web ui  , the problem is that fink job server aren't
correct, so I want to change the flink web-ui jobserver ,I don't find the
any method  to change it ,are there some method to do that?
   Thanks to your reply.

Best wishes,
LakeShen


Re: Flink Web UI display nothing in k8s when use ingress

2020-03-03 Thread LakeShen
In my thought , I think I should config the correct flink jobserver for
flink task

LakeShen  于2020年3月4日周三 下午2:07写道:

> Hi community,
> now we plan to move all flink tasks to k8s cluster. For one flink
> task , we want to see this flink task web ui . First , we create the k8s
> Service to expose 8081 port of jobmanager, then we use ingress controller
> so that we can see it outside.But the flink web like this :
>
> [image: image.png]
>
> The flink web ui images and other info not display , what can I do to
> display flink web info ?
> Thanks to your replay.
>


Flink Web UI display nothing in k8s when use ingress

2020-03-03 Thread LakeShen
Hi community,
now we plan to move all flink tasks to k8s cluster. For one flink
task , we want to see this flink task web ui . First , we create the k8s
Service to expose 8081 port of jobmanager, then we use ingress controller
so that we can see it outside.But the flink web like this :

[image: image.png]

The flink web ui images and other info not display , what can I do to
display flink web info ?
Thanks to your replay.


Re: Flink 1.10 exception : Unable to instantiate java compiler

2020-02-28 Thread LakeShen
 I have solved this problem. I set the  flink-table-planner-blink maven
scope to provided .

kant kodali  于2020年2月28日周五 下午3:32写道:

> Same problem!
>
> On Thu, Feb 27, 2020 at 11:10 PM LakeShen 
> wrote:
>
>> Hi community,
>>   now  I am using the flink 1.10 to run the flink task
>> ,cluster type is yarn . I use commandline to submit my flink job , the
>> commandline just like this :
>>
>> flink run  -m yarn-cluster  --allowNonRestoredState  -c xxx.xxx.xx
>>  flink-stream-xxx.jar
>>
>> Bug there is a exception to throw,the exception info is :
>>
>> *org.apache.flink.client.program.ProgramInvocationException: The main
>> method caused an error: Unable to instantiate java compiler*
>> at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
>> at
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
>> at
>> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
>> at
>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
>> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
>> at
>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
>> at
>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
>> at java.security.AccessController.doPrivileged(Native Method)
>> at javax.security.auth.Subject.doAs(Subject.java:422)
>> at
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
>> at
>> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
>> Caused by: java.lang.IllegalStateException: Unable to instantiate java
>> compiler
>> at
>> org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.compile(JaninoRelMetadataProvider.java:434)
>> at
>> org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.load3(JaninoRelMetadataProvider.java:375)
>> at
>> org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.lambda$static$0(JaninoRelMetadataProvider.java:109)
>> at
>> org.apache.flink.calcite.shaded.com.google.common.cache.CacheLoader$FunctionToCacheLoader.load(CacheLoader.java:149)
>> at
>> org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3542)
>> at
>> org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2323)
>> at
>> org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2286)
>> at
>> org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2201)
>> at
>> org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache.get(LocalCache.java:3953)
>> at
>> org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3957)
>> at
>> org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4875)
>> at
>> org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.create(JaninoRelMetadataProvider.java:475)
>> at
>> org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.revise(JaninoRelMetadataProvider.java:488)
>> at
>> org.apache.calcite.rel.metadata.RelMetadataQuery.revise(RelMetadataQuery.java:193)
>> at
>> org.apache.calcite.rel.metadata.RelMetadataQuery.getPulledUpPredicates(RelMetadataQuery.java:797)
>> at
>> org.apache.calcite.rel.rules.ReduceExpressionsRule$ProjectReduceExpressionsRule.onMatch(ReduceExpressionsRule.java:298)
>> at
>> org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:319)
>> at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:560)
>> at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:419)
>> at
>> org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:256)
>> at
>> org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
>> at
>> org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:215)
>> at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:202)
>> at
>> org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69)
>> at
>> org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
>> at
>> org.apac

Flink 1.10 exception : Unable to instantiate java compiler

2020-02-27 Thread LakeShen
Hi community,
  now  I am using the flink 1.10 to run the flink task ,cluster
type is yarn . I use commandline to submit my flink job , the commandline
just like this :

flink run  -m yarn-cluster  --allowNonRestoredState  -c xxx.xxx.xx
 flink-stream-xxx.jar

Bug there is a exception to throw,the exception info is :

*org.apache.flink.client.program.ProgramInvocationException: The main
method caused an error: Unable to instantiate java compiler*
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
at
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
Caused by: java.lang.IllegalStateException: Unable to instantiate java
compiler
at
org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.compile(JaninoRelMetadataProvider.java:434)
at
org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.load3(JaninoRelMetadataProvider.java:375)
at
org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.lambda$static$0(JaninoRelMetadataProvider.java:109)
at
org.apache.flink.calcite.shaded.com.google.common.cache.CacheLoader$FunctionToCacheLoader.load(CacheLoader.java:149)
at
org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3542)
at
org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2323)
at
org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2286)
at
org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2201)
at
org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache.get(LocalCache.java:3953)
at
org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3957)
at
org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4875)
at
org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.create(JaninoRelMetadataProvider.java:475)
at
org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.revise(JaninoRelMetadataProvider.java:488)
at
org.apache.calcite.rel.metadata.RelMetadataQuery.revise(RelMetadataQuery.java:193)
at
org.apache.calcite.rel.metadata.RelMetadataQuery.getPulledUpPredicates(RelMetadataQuery.java:797)
at
org.apache.calcite.rel.rules.ReduceExpressionsRule$ProjectReduceExpressionsRule.onMatch(ReduceExpressionsRule.java:298)
at
org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:319)
at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:560)
at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:419)
at
org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:256)
at
org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
at
org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:215)
at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:202)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
at
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at
scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
at
org

Flink 1.6, increment Checkpoint, the shared dir stored the last year checkpoint state

2020-01-19 Thread LakeShen
Hi community,
now I have a flink sql job, and I set the flink sql sate retention
time, there are three dir in flink checkpoint dir :
1. chk -xx dir
2. shared dir
3. taskowned dir

I find the shared dir store the last year checkpoint state,the only reason
I thought is that the latest
checkpoint retain reference of last year checkpoint state file.
Are there any other reason to lead this? Or is it a bug?

Thanks to your replay.

Best wishes,
Lake Shen


Frequently checkpoint failure, could make the flink sql state not clear?

2020-01-16 Thread LakeShen
Hi community, now I am using Flink sql , and I set the retention time, As I
all know is that Flink will set the timer for per key to clear their state,
if Flink task always checkpoint failure, are the  key state cleared by
timer?
Thanks to your replay.


Flink Sql Join, how to clear the sql join state?

2020-01-15 Thread LakeShen
Hi community,now I am use flink sql inner join in my code,I saw the flink
document, the flink sql inner join will keep both sides of the join input
in Flink’s state forever.
As result , the hdfs files size are so big , is there any way to clear the
sql join state?
Thanks to your reply.


Re: How long Flink state default TTL,if I don't config the state ttl config?

2020-01-05 Thread LakeShen
Ok, got it ,thank you

Zhu Zhu  于2020年1月6日周一 上午10:30写道:

> Yes. State TTL is by default disabled.
>
> Thanks,
> Zhu Zhu
>
> LakeShen  于2020年1月6日周一 上午10:09写道:
>
>> I saw the flink source code, I find the flink state ttl default is
>> never expire,is it right?
>>
>> LakeShen  于2020年1月6日周一 上午9:58写道:
>>
>>> Hi community,I have a question about flink state ttl.If I don't config
>>> the flink state ttl config,
>>> How long the flink state retain?Is it forever retain in hdfs?
>>> Thanks your replay.
>>>
>>


Re: How long Flink state default TTL,if I don't config the state ttl config?

2020-01-05 Thread LakeShen
I saw the flink source code, I find the flink state ttl default is
never expire,is it right?

LakeShen  于2020年1月6日周一 上午9:58写道:

> Hi community,I have a question about flink state ttl.If I don't config the
> flink state ttl config,
> How long the flink state retain?Is it forever retain in hdfs?
> Thanks your replay.
>


How long Flink state default TTL,if I don't config the state ttl config?

2020-01-05 Thread LakeShen
Hi community,I have a question about flink state ttl.If I don't config the
flink state ttl config,
How long the flink state retain?Is it forever retain in hdfs?
Thanks your replay.


Flink 1.9 SQL Kafka Connector,Json format,how to deal with not json message?

2019-12-25 Thread LakeShen
Hi community,when I write the flink ddl sql like this:

CREATE TABLE kafka_src (
  id varchar,
  a varchar,
  b TIMESTAMP,
  c TIMESTAMP
)
  with (
   ...
'format.type' = 'json',
'format.property-version' = '1',
'format.derive-schema' = 'true',
'update-mode' = 'append'
);

If the message is not the json format ,there is a error in the log。
My question is that how to deal with the message which it not json format?
My thought is that I can catch the exception
in JsonRowDeserializationSchema deserialize() method,is there any
parameters to do this?
Thanks your replay.


Flink On K8s, build docker image very slowly, is there some way to make it faster?

2019-12-22 Thread LakeShen
Hi community , when I run the flink task on k8s , the first thing is that
to build the flink task jar to
Docker Image . I find that It would spend much time to build docker image.
Is there some way to makr it faster.
Thank your replay.


Re: Flink SQL Kafka topic DDL ,the kafka' json field conflict with flinkSQL Keywords

2019-12-09 Thread LakeShen
thank u lucas

lucas.wu  于2019年12月10日周二 下午2:12写道:

> You can use ` ` to surround the field
>
>
> 原始邮件
> 发件人:lakeshenshenleifight...@gmail.com
> 收件人:dev...@flink.apache.org; useru...@flink.apache.org
> 发送时间:2019年12月10日(周二) 14:05
> 主题:Flink SQL Kafka topic DDL ,the kafka' json field conflict with flinkSQL
> Keywords
>
>
> Hi community, when I use Flink SQL DDL ,the kafka' json field conflict
> with flink SQL Keywords,my thought is that using the UDTF to solve it . Is
> there graceful way to solve this problem?


Flink SQL Kafka topic DDL ,the kafka' json field conflict with flink SQL Keywords

2019-12-09 Thread LakeShen
Hi community, when I use Flink SQL DDL ,the kafka' json field conflict with
flink SQL Keywords,my thought is that using the UDTF to solve it . Is there
graceful way to solve this problem?


How long is the flink sql task state default ttl?

2019-11-06 Thread LakeShen
Hi community, as I know I can use idle state retention time to clear the
flink sql task state,I have a question is that how long the flink sql task
state default ttl is . Thanks


Re: Blink Planner HBaseUpsertTableSink Exception

2019-09-12 Thread LakeShen
Thank you , Jark . I have added  the primary key in my flink sql before ,
and it throwed  the * Primary key and unique key are not supported yet. *Now
I know it ,thank you sincerely to reply me .

Best wishes,
LakeShen


Jark Wu  于2019年9月12日周四 下午3:15写道:

> Hi Lake,
>
> This is not a problem of HBaseUpsertTableSink.
> This is because the query loses primary key (e.g. concat(key1, key2) will
> lose the primary key information [key1, key2] currently.),
> but the validation of inserting checks the upsert query should have a
> primary key. That’s why the exception is thrown.
>
> IMO, in order to fix this problem, we need to enrich the primary key
> inference to support all kinds of built-in function/operators.
> But this is a large work which means it may not happen in 1.9.1.
>
> Regards,
> Jark
>
> On Thu, 12 Sep 2019 at 14:39, LakeShen  wrote:
>
> > Hi community , when I create the hbase sink table  in my  flink ddl sql
> > ,just like this:
> >
> >
> >
> >
> >
> > *create table sink_hbase_table(rowkey VARCHAR,cf
> >   row(kdt_it_count  bigint )) with (xx);*
> >
> > and I run my flink task , it throws the exception like this :
> > *UpsertStreamTableSink requires that Table has a full primary keys if it
> is
> > updated.*
> > at
> >
> >
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:115)
> > at
> >
> >
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:50)
> > at
> >
> >
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
> > at
> >
> >
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:50)
> > at
> >
> >
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:61)
> > at
> >
> >
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
> > at
> >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> > at
> >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> > at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> >
> > I saw the flink source code , I find that in HBaseUpsertTableSink , the
> > method setKeyFields doesnt' has any code content,in StreamExecSink
> class,I
> > saw the code content like this :
> > *//TODO UpsertStreamTableSink setKeyFields interface should be
> > Array[Array[String]]*
> > but now the  UpsertStreamTableSink setKeyFields interface is
> Array[String],
> > it seems like the conflict with the above content.
> > Could we use HBaseUpsertTableSink in our flink task?Thanks your reply.
> >
>


Blink Planner HBaseUpsertTableSink Exception

2019-09-11 Thread LakeShen
Hi community , when I create the hbase sink table  in my  flink ddl sql
,just like this:





*create table sink_hbase_table(rowkey VARCHAR,cf
  row(kdt_it_count  bigint )) with (xx);*

and I run my flink task , it throws the exception like this :
*UpsertStreamTableSink requires that Table has a full primary keys if it is
updated.*
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:115)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:50)
at
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:50)
at
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:61)
at
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)

I saw the flink source code , I find that in HBaseUpsertTableSink , the
method setKeyFields doesnt' has any code content,in StreamExecSink class,I
saw the code content like this :
*//TODO UpsertStreamTableSink setKeyFields interface should be
Array[Array[String]]*
but now the  UpsertStreamTableSink setKeyFields interface is Array[String],
it seems like the conflict with the above content.
Could we use HBaseUpsertTableSink in our flink task?Thanks your reply.


Re: How to use Flink Command flink run -yt -C?

2019-08-27 Thread LakeShen
I Have solve this problem , thanks.

LakeShen  于2019年8月27日周二 下午4:55写道:

> Hi community , I have a question about flink command, when I use flink run
> command to submit my flink job to yarn , I use -yt to upload my function
> jar , but I set -C file:xxxfunction.jar , the flink job throw the exception
> like this :
>  The main method caused an error: Could not find class
> 'com.youzan.bigdata.HashCode' for creating an instance.
> stderr> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:593)
> stderr> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
> stderr> at
> org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
> stderr> at
> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80)
> stderr> at
> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:122)
> stderr> at
> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:227)
> stderr> at
> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
> stderr> at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010)
> stderr> at
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
> stderr> at java.security.AccessController.doPrivileged(Native Method)
> stderr> at javax.security.auth.Subject.doAs(Subject.java:422)
> stderr> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
> stderr> at
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:
>
> When I set -yt to upload my Function jar , what value will I set  -C
> parameters?  Thanks.
>


How to use Flink Command flink run -yt -C?

2019-08-27 Thread LakeShen
Hi community , I have a question about flink command, when I use flink run
command to submit my flink job to yarn , I use -yt to upload my function
jar , but I set -C file:xxxfunction.jar , the flink job throw the exception
like this :
 The main method caused an error: Could not find class
'com.youzan.bigdata.HashCode' for creating an instance.
stderr> at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:593)
stderr> at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
stderr> at
org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
stderr> at
org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80)
stderr> at
org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:122)
stderr> at
org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:227)
stderr> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
stderr> at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010)
stderr> at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
stderr> at java.security.AccessController.doPrivileged(Native Method)
stderr> at javax.security.auth.Subject.doAs(Subject.java:422)
stderr> at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
stderr> at
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:

When I set -yt to upload my Function jar , what value will I set  -C
parameters?  Thanks.


NoSuchMethodError: org.apache.calcite.tools.FrameworkConfig.getTraitDefs()

2019-07-30 Thread LakeShen
Hi all,when I use blink flink-sql-parser module,the maven dependency like
this:


com.alibaba.blink
flink-sql-parser
1.5.1


I also import the flink 1.9 blink-table-planner module , I
use FlinkPlannerImpl to parse the sql to get the List. But
when I run the program , it throws the exception like this:



*Exception in thread "main" java.lang.NoSuchMethodError:
org.apache.calcite.tools.FrameworkConfig.getTraitDefs()Lorg/apache/flink/shaded/calcite/com/google/common/collect/ImmutableList;
at
org.apache.flink.sql.parser.plan.FlinkPlannerImpl.(FlinkPlannerImpl.java:93)
at
com.youzan.bigdata.allsqldemo.utils.FlinkSqlUtil.getSqlNodeInfos(FlinkSqlUtil.java:33)
at
com.youzan.bigdata.allsqldemo.KafkaSrcKafkaSinkSqlDemo.main(KafkaSrcKafkaSinkSqlDemo.java:56)*

* How can I solve this problem? Thanks to your reply.*


Re: [ANNOUNCE] Kete Young is now part of the Flink PMC

2019-07-23 Thread LakeShen
Congratulations Kurt!

Congxian Qiu  于2019年7月23日周二 下午5:37写道:

> Congratulations Kurt!
> Best,
> Congxian
>
>
> Dian Fu  于2019年7月23日周二 下午5:36写道:
>
> > Congrats, Kurt!
> >
> > > 在 2019年7月23日,下午5:33,Zili Chen  写道:
> > >
> > > Congratulations Kurt!
> > >
> > > Best,
> > > tison.
> > >
> > >
> > > JingsongLee  于2019年7月23日周二 下午5:29写道:
> > >
> > >> Congratulations Kurt!
> > >>
> > >> Best, Jingsong Lee
> > >>
> > >>
> > >> --
> > >> From:Robert Metzger 
> > >> Send Time:2019年7月23日(星期二) 17:24
> > >> To:dev 
> > >> Subject:[ANNOUNCE] Kete Young is now part of the Flink PMC
> > >>
> > >> Hi all,
> > >>
> > >> On behalf of the Flink PMC, I'm happy to announce that Kete Young is
> now
> > >> part of the Apache Flink Project Management Committee (PMC).
> > >>
> > >> Kete has been a committer since February 2017, working a lot on Table
> > API /
> > >> SQL. He's currently co-managing the 1.9 release! Thanks a lot for your
> > work
> > >> for Flink!
> > >>
> > >> Congratulations & Welcome Kurt!
> > >>
> > >> Best,
> > >> Robert
> > >>
> >
> >
>


could rest api : /jobs/:jobid/yarn-cancel trigger the savepoint?

2019-07-18 Thread LakeShen
Hi community, I have a question is that could  rest api :
/jobs/:jobid/yarn-cancel trigger the savepoint? I saw the fink src code,
and I find it didn't trigger the savepoint, is it right?
Thank you to reply .


[jira] [Created] (FLINK-13289) Blink Planner JDBCUpsertTableSink : UnsupportedOperationException "JDBCUpsertTableSink can not support "

2019-07-16 Thread LakeShen (JIRA)
LakeShen created FLINK-13289:


 Summary: Blink Planner JDBCUpsertTableSink : 
UnsupportedOperationException "JDBCUpsertTableSink can not support "
 Key: FLINK-13289
 URL: https://issues.apache.org/jira/browse/FLINK-13289
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.9.0
Reporter: LakeShen


Hi , in flink-jdbc connector module, I change the Flink planner to Blink 
planner to test all test case,because we want to use Blank planner in our 
program. When I test the JDBCUpsertTableSinkITCase class , the method 
testUpsert throw the exception:
{color:red}java.lang.UnsupportedOperationException: JDBCUpsertTableSink can not 
support {color}
I saw the src code,in Flink planner , the StreamPlanner set the 
JDBCUpsertTableSink' keyFields,
but in Blink planner , I didn't find anywhere to set JDBCUpsertTableSink' 
keyFields,so JDBCUpsertTableSink keyFields is null, when execute 
JDBCUpsertTableSink newFormat(),
it thrown the exception.




--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Created] (FLINK-13283) JDBCLookup Exception: Unsupported type: LocalDate

2019-07-16 Thread LakeShen (JIRA)
LakeShen created FLINK-13283:


 Summary: JDBCLookup Exception: Unsupported type: LocalDate
 Key: FLINK-13283
 URL: https://issues.apache.org/jira/browse/FLINK-13283
 Project: Flink
  Issue Type: Bug
Reporter: LakeShen


Hi , when I use Flink 1.9  JDBCTableSource,and I create  TableSchema like this:
final TableSchema schema = TableSchema.builder()
.field("id", DataTypes.INT())
.field("create", DataTypes.DATE())
.field("update", DataTypes.DATE())
.field("name", DataTypes.STRING())
.field("age", DataTypes.INT())
.field("address", DataTypes.STRING())
.field("birthday",DataTypes.DATE())
.field("likethings", DataTypes.STRING())
.build();
I use  JDBCTableSource.builder() to create JDBCTableSource, I run the program, 
and there is a exception :
{color:red}java.lang.IllegalArgumentException: Unsupported type: 
LocalDate{color}
I saw the src code , I find that in LegacyTypeInfoDataTypeConverter , DateType 
convert to Types.LOCAL_DATE,but in JDBCTypeUtil class, the HashMap  
TYPE_MAPPING  doesn't have the key Types.LOCAL_DATE,so that throw the exception.
Does the JDBC dim table support the time data,Like Date?



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


Re: When publish the flink 1.9 Doc

2019-07-04 Thread LakeShen
Thank you

Tzu-Li (Gordon) Tai  于2019年7月4日周四 下午5:54写道:

> Hi,
>
> There is already document available for 1.9-SNAPSHOT [1].
> The documentation for the official stable release will not be finalized
> until 1.9 is actually released. Feature freeze for 1.9.0 is happening this
> week, so ideally this should be available near end of July.
>
> Cheers,
> Gordon
>
> [1] https://ci.apache.org/projects/flink/flink-docs-master/
>
> On Thu, Jul 4, 2019 at 5:49 PM LakeShen  wrote:
>
> > Hi community,I want to know when publish the flink 1.9 document? Thanks.
> >
>


When publish the flink 1.9 Doc

2019-07-04 Thread LakeShen
Hi community,I want to know when publish the flink 1.9 document? Thanks.


Re: [ANNOUNCE] Jincheng Sun is now part of the Flink PMC

2019-06-24 Thread LakeShen
Congratulations! Jincheng Sun

Best,
LakeShen

Robert Metzger  于2019年6月24日周一 下午11:09写道:

> Hi all,
>
> On behalf of the Flink PMC, I'm happy to announce that Jincheng Sun is now
> part of the Apache Flink Project Management Committee (PMC).
>
> Jincheng has been a committer since July 2017. He has been very active on
> Flink's Table API / SQL component, as well as helping with releases.
>
> Congratulations & Welcome Jincheng!
>
> Best,
> Robert
>