flink写入mysql数据异常

2023-03-22 Thread 小昌同学
使用flink sql多表关联实时的将数据写入到mysql,mysql中定义了联合主键,查看日志发现为啥相同的数据插入到mysql表中,一条是insert 
,另外一条是delete啊,我想实现的是upsert,这样该怎么操作啊


| |
小昌同学
|
|
ccc0606fight...@163.com
|

Re:Re: Re: 我上报的一个sql bug没人处理怎么办?

2023-03-22 Thread Jeff
这个方法有效,多谢

















在 2023-03-22 17:11:19,"Jane Chan"  写道:
>Hi,
>
>如回复所述, 如果不想切换版本, 在 1.15 上可以尝试手动 cast 'abc' 字段为 varchar 来绕过这个问题
>map ['msg_code','0', 'msg_reason', cast('abc' as string)]
>
>如果不想修改 SQL, 目前只能手动编译出 release-1.17 分支, 编译方法参考 [1]
>
>[1]
>https://nightlies.apache.org/flink/flink-docs-master/docs/flinkdev/building/
>
>祝好!
>Jane
>
>On Wed, Mar 22, 2023 at 6:04 PM Jeff  wrote:
>
>> 通过读calcite1.27.0相关源码发现它已经修复了,但我使用的是flink 1.15无法直接使用1.27.0,所以只能使用本地编译的版本么?
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2023-03-22 10:41:42,"Shuo Cheng"  写道:
>> >Hi,
>> >
>> >如果你知道问题出现在哪儿, 可以自己提个 PR 哦.
>> >
>> >Sincerely,
>> >Shuo
>> >
>> >On Wed, Mar 22, 2023 at 11:23 AM Jeff  wrote:
>> >
>> >> 复制执行我提供的两个sql就一定会复现!
>> >> 不管哪个flink版本一定都会有这个问题,因为它们都是使用calcite 1.26.0。
>> >> 这个问题是这个版本calcite引起的。
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> 在 2023-03-22 09:28:17,"Jeff"  写道:
>> >> >bug地址:
>> >> >https://issues.apache.org/jira/browse/FLINK-31375?filter=-2
>> >> >
>> >> >
>> >> >bug详细内容:
>> >> >the values of map are truncated by the CASE WHEN
>> function.
>> >> >// sql
>> >> >create table test (a map) with ('connector'='print');
>> >> >insert into test  select * from (values(case when true then
>> >> map['test','123456789'] else map ['msg_code','0', 'msg_reason', 'abc']
>> >> end));
>> >> >
>> >> >the result:
>> >> >
>> >> >+I[{test=123}]
>> >> >
>> >> >We hope the value of result is '123456789', but I get '123', the length
>> >> is limited by 'abc'.
>> >>
>>


Re: Job hanging taking savepoint on legacy Flink

2023-03-22 Thread Shammon FY
Hi Le

The problem looks like there's not enough memory segment in the task
manager where the source wants to emit result data.

You can check the usage of network buffer pool in your webui and try to
increase the size of network buffer pool if it is not enough. You can see
more information about network buffer in doc [1]

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/memory/network_mem_tuning/

Best,
Shammon FY


On Wed, Mar 22, 2023 at 1:20 PM Le Xu  wrote:

> Hello!
>
> I would like to run a legacy flink project on top of old-version Flink
> (1.4.1) and I'm getting error when trying to cancel a job with savepoint.
> Specifically, it reports the following error on requestBuffer:
>
> My understanding would be that the save point operation probably requires
> all outstanding messages to be processed, which somehow requires larger
> buffer space (not entirely sure about this). However, it seems that my job
> has no problem processing regular messages as long as I'm not cancelling it
> with savepoint. And I have reduced the "web.backpressure.refresh-interval"
> to 100 to force it to check back pressure frequently, but it still leads to
> this error.
>
> I am aware that I'd probably get more configuration knobs by running a
> newer version of Flink but this particular version has some particular
> modified functionalities I want to try. Any suggestions?
>
>
> 2023-03-21 23:04:59,718 WARN org.apache.flink.runtime.taskmanager.Task -
> Task 'Source: bid-source -> Filter -> Flat Map -> flatmap-timestamp
> (10/32)' did not react to cancelling signal, but is stuck in method:
> java.lang.Object.wait(Native Method)
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(
> LocalBufferPool.java:222)
>
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking
> (LocalBufferPool.java:191)
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(
> RecordWriter.java:146)
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(
> RecordWriter.java:92)
> org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(
> StreamRecordWriter.java:84)
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter
> (RecordWriterOutput.java:106)
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(
> RecordWriterOutput.java:88)
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(
> RecordWriterOutput.java:43)
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:830)
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:808)
>
> org.apache.flink.streaming.runtime.operators.TimestampsAndPunctuatedWatermarksOperator.processElement
> (TimestampsAndPunctuatedWatermarksOperator.java:52)
> org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.pushToOperator(OperatorChain.java:552)
> org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.collect(OperatorChain.java:527)
> org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.collect(OperatorChain.java:507)
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:830)
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:808)
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(
> TimestampedCollector.java:51)
> ch.ethz.systems.strymon.ds2.flink.nexmark.queries.KeyedHighestBidCount$2
> .flatMap(KeyedHighestBidCount.java:245)
> ch.ethz.systems.strymon.ds2.flink.nexmark.queries.KeyedHighestBidCount$2
> .flatMap(KeyedHighestBidCount.java:173)
> org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(
> StreamFlatMap.java:50)
> org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.pushToOperator(OperatorChain.java:552)
> org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.collect(OperatorChain.java:527)
> org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.collect(OperatorChain.java:507)
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:830)
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:808)
> org.apache.flink.streaming.api.operators.StreamFilter.processElement(
> StreamFilter.java:40)
> org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.pushToOperator(OperatorChain.java:552)
> org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.collect(OperatorChain.java:527)
> org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.collect(OperatorChain.java:507)
> 

Re: org.apache.flink.runtime.rpc.exceptions.RecipientUnreachableException

2023-03-22 Thread Shammon FY
Hi Ajinkya

It seems that the JobMaster is not started successfully. If possible, you
can provide more information to help analyze the problem.

Best,
Shammon FY


On Wed, Mar 22, 2023 at 10:57 AM Ajinkya Pathrudkar <
ajinkya.pathrudka...@gmail.com> wrote:

>
> I am writing to inform you of a recent update we made to our Flink
> version, upgrading from 1.14 to 1.15, along with a shift from Java 8 to
> Java 11. Since the update, are seeing below exception
>
> We would appreciate any insights you may have regarding this issue, and
> any suggestions on how to proceed would be greatly appreciated
>
> org.apache.flink.runtime.rpc.exceptions.RecipientUnreachableException
>
> org.apache.flink.runtime.rpc.exceptions.RecipientUnreachableException:
> Could not send message
> [LocalRpcInvocation(ResourceManagerGateway.registerJobManager(JobMasterId,
> ResourceID, String, JobID, Time))] from sender [unknown] to recipient
> [akka.tcp://fl...@xx.xxx.7.6:46017/user/rpc/resourcemanager_2], because
> the recipient is unreachable. This can either mean that the recipient has
> been terminated or that the remote RpcService is currently not reachable.
> Could not send message
> [LocalRpcInvocation(ResourceManagerGateway.registerJobManager(JobMasterId,
> ResourceID, String, JobID, Time))] from sender [unknown] to recipient
> [akka.tcp://fl...@xx.xxx.7.6:46017/user/rpc/resourcemanager_2], because
> the recipient is unreachable. This can either mean that the recipient has
> been terminated or that the remote RpcService is currently not reachable.
> Recipient [Actor[akka://flink/user/rpc/resourcemanager_2#1878283075]] had
> already been terminated. Message of type
> [org.apache.flink.runtime.rpc.messages.LocalFencedMessage].
>
>
> Thanks,
> Ajinkya
>


Re: 退订

2023-03-22 Thread Shammon FY
退订请发送邮件到 user-zh-unsubscr...@flink.apache.org


On Wed, Mar 22, 2023 at 8:13 PM jianbo zhang  wrote:

> 退订
>


Re: 退订

2023-03-22 Thread Ran Tao
退订是发送邮件到 user-zh-unsubscr...@flink.apache.org 这个地址就可以了。

Best Regards,
Ran Tao


李朋 <1134415...@qq.com.invalid> 于2023年3月22日周三 20:10写道:

> 退订!


退订

2023-03-22 Thread jianbo zhang
退订


Re: Handling JSON Serialization without Kryo

2023-03-22 Thread Rion Williams
Hi Ken,I’m going to profile the job today to try and get a better handle on where the bottleneck is. The job currently just passes around JsonObjects between the operators, which are relying on Kryo. The job also writes to Postgres, Kafka, and Elasticsearch so it’s possible that one of those is causing the back-pressure.I’m a bit shocked at the stunningly low speeds as well. Initially, the job would perform fine but checkpointing sizes would gradually build up (as would durations for them) until performance degraded to the borderline unusable 1-2 records/second.On Mar 21, 2023, at 2:35 PM, Ken Krugler  wrote:Hi Rion,I’m using Gson to deserialize to a Map.1-2 records/second sounds way too slow, unless each record is enormous.— KenOn Mar 21, 2023, at 6:18 AM, Rion Williams  wrote:Hi Ken,Thanks for the response. I hadn't tried exploring the use of the Record class, which I'm assuming you're referring to a flink.types.Record, to read the JSON into. Did you handle this via using a mapper to read the properties in (e.g. Gson, Jackson) as fields or take a different approach? Additionally, how has your experience been with performance? Kryo with the existing job leveraging JsonObjects (via Gson) is horrific (~1-2 records/second) and can't keep up with the speed of the producers, which is the impetus behind reevaluating the serialization.I'll explore this a bit more.Thanks,RionOn Mon, Mar 20, 2023 at 10:28 PM Ken Krugler  wrote:Hi Rion,For my similar use case, I was able to make a simplifying assumption that my top-level JSON object was a record.I then registered a custom Kryo serde that knew how to handle the handful of JsonPrimitive types for the record entries.I recently looked at extending that to support arrays and nested records, but haven’t had to do that.— KenOn Mar 20, 2023, at 6:56 PM, Rion Williams  wrote:Hi Shammon,Unfortunately it’s a data stream job. I’ve been exploring a few options but haven’t found anything I’ve decided on yet. I’m currently looking at seeing if I can leverage some type of partial serialization to bind to the properties that I know the job will use and retain the rest as a JSON blob. I’ve also consider trying to store the fields as a large map of string-object pairs and translating thay into a string prior to writing to the sinks.Still accepting any/all ideas that I come across to see if I can handle this in an efficient, reasonable way.Thanks,RionOn Mar 20, 2023, at 8:40 PM, Shammon FY  wrote:Hi RionIs your job datastream or table/sql? If it is a table/sql job, and you can define all the fields in json you need, then you can directly use json format [1] to parse the data. You can also customize udf functions to parse json data into struct data, such as map, row and other types supported by flink[1] https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/json/Best,Shammon FYOn Sun, Mar 19, 2023 at 7:44 AM Rion Williams  wrote:Hi all,

I’m reaching out today for some suggestions (and hopefully a solution) for a Flink job that I’m working on. The job itself reads JSON strings from a Kafka topic and reads those into JSONObjects (currently via Gson), which are then operated against, before ultimately being written out to Kafka again.

The problem here is that the shape of the data can vary wildly and dynamically. Some records may have properties unique to only that record, which makes defining a POJO difficult. In addition to this, the JSONObjects fall by to Kryo serialization which is leading to atrocious throughput.

I basically need to read in JSON strings, enrich properties on these objects, and ultimately write them to various sinks.  Is there some type of JSON-based class or library or an approach I could use to accomplish this in an efficient manner? Or if possibly a way to partially write a POJO that would allow me to interact with sections/properties of the JSON while retaining other properties that might be dynamically present or unique to the message?

Any advice or suggestions would be welcome! I’ll also be happy to provide any additional context if it would help!

Thanks,

Rion

(cross-posted to users+dev for reach)

--Ken Kruglerhttp://www.scaleunlimited.comCustom big data solutionsFlink, Pinot, Solr, Elasticsearch




--Ken Kruglerhttp://www.scaleunlimited.comCustom big data solutionsFlink, Pinot, Solr, Elasticsearch




(无主题)

2023-03-22 Thread 李朋


Re:Re: Re: 我上报的一个sql bug没人处理怎么办?

2023-03-22 Thread Jeff
试过了,不兼容,1.27.0都不兼容

















在 2023-03-22 18:04:17,"tison"  写道:
>如果 calcite 层的接口不变,直接替换 jar 包或许也可行?不确定从 1.27 -> 1.29 有没有不兼容的情况。
>
>Best,
>tison.
>
>
>Jane Chan  于2023年3月22日周三 18:11写道:
>
>> Hi,
>>
>> 如回复所述, 如果不想切换版本, 在 1.15 上可以尝试手动 cast 'abc' 字段为 varchar 来绕过这个问题
>> map ['msg_code','0', 'msg_reason', cast('abc' as string)]
>>
>> 如果不想修改 SQL, 目前只能手动编译出 release-1.17 分支, 编译方法参考 [1]
>>
>> [1]
>>
>> https://nightlies.apache.org/flink/flink-docs-master/docs/flinkdev/building/
>>
>> 祝好!
>> Jane
>>
>> On Wed, Mar 22, 2023 at 6:04 PM Jeff  wrote:
>>
>> > 通过读calcite1.27.0相关源码发现它已经修复了,但我使用的是flink 1.15无法直接使用1.27.0,所以只能使用本地编译的版本么?
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> > 在 2023-03-22 10:41:42,"Shuo Cheng"  写道:
>> > >Hi,
>> > >
>> > >如果你知道问题出现在哪儿, 可以自己提个 PR 哦.
>> > >
>> > >Sincerely,
>> > >Shuo
>> > >
>> > >On Wed, Mar 22, 2023 at 11:23 AM Jeff  wrote:
>> > >
>> > >> 复制执行我提供的两个sql就一定会复现!
>> > >> 不管哪个flink版本一定都会有这个问题,因为它们都是使用calcite 1.26.0。
>> > >> 这个问题是这个版本calcite引起的。
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >> 在 2023-03-22 09:28:17,"Jeff"  写道:
>> > >> >bug地址:
>> > >> >https://issues.apache.org/jira/browse/FLINK-31375?filter=-2
>> > >> >
>> > >> >
>> > >> >bug详细内容:
>> > >> >the values of map are truncated by the CASE WHEN
>> > function.
>> > >> >// sql
>> > >> >create table test (a map) with
>> ('connector'='print');
>> > >> >insert into test  select * from (values(case when true then
>> > >> map['test','123456789'] else map ['msg_code','0', 'msg_reason', 'abc']
>> > >> end));
>> > >> >
>> > >> >the result:
>> > >> >
>> > >> >+I[{test=123}]
>> > >> >
>> > >> >We hope the value of result is '123456789', but I get '123', the
>> length
>> > >> is limited by 'abc'.
>> > >>
>> >
>>


Re: Re: 我上报的一个sql bug没人处理怎么办?

2023-03-22 Thread tison
如果 calcite 层的接口不变,直接替换 jar 包或许也可行?不确定从 1.27 -> 1.29 有没有不兼容的情况。

Best,
tison.


Jane Chan  于2023年3月22日周三 18:11写道:

> Hi,
>
> 如回复所述, 如果不想切换版本, 在 1.15 上可以尝试手动 cast 'abc' 字段为 varchar 来绕过这个问题
> map ['msg_code','0', 'msg_reason', cast('abc' as string)]
>
> 如果不想修改 SQL, 目前只能手动编译出 release-1.17 分支, 编译方法参考 [1]
>
> [1]
>
> https://nightlies.apache.org/flink/flink-docs-master/docs/flinkdev/building/
>
> 祝好!
> Jane
>
> On Wed, Mar 22, 2023 at 6:04 PM Jeff  wrote:
>
> > 通过读calcite1.27.0相关源码发现它已经修复了,但我使用的是flink 1.15无法直接使用1.27.0,所以只能使用本地编译的版本么?
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > 在 2023-03-22 10:41:42,"Shuo Cheng"  写道:
> > >Hi,
> > >
> > >如果你知道问题出现在哪儿, 可以自己提个 PR 哦.
> > >
> > >Sincerely,
> > >Shuo
> > >
> > >On Wed, Mar 22, 2023 at 11:23 AM Jeff  wrote:
> > >
> > >> 复制执行我提供的两个sql就一定会复现!
> > >> 不管哪个flink版本一定都会有这个问题,因为它们都是使用calcite 1.26.0。
> > >> 这个问题是这个版本calcite引起的。
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >> 在 2023-03-22 09:28:17,"Jeff"  写道:
> > >> >bug地址:
> > >> >https://issues.apache.org/jira/browse/FLINK-31375?filter=-2
> > >> >
> > >> >
> > >> >bug详细内容:
> > >> >the values of map are truncated by the CASE WHEN
> > function.
> > >> >// sql
> > >> >create table test (a map) with
> ('connector'='print');
> > >> >insert into test  select * from (values(case when true then
> > >> map['test','123456789'] else map ['msg_code','0', 'msg_reason', 'abc']
> > >> end));
> > >> >
> > >> >the result:
> > >> >
> > >> >+I[{test=123}]
> > >> >
> > >> >We hope the value of result is '123456789', but I get '123', the
> length
> > >> is limited by 'abc'.
> > >>
> >
>


Re: Bootstrapping multiple state within same operator

2023-03-22 Thread David Artiga
Not familiar with the implementation but thinking some options:

- composable transformations
- underlying MultiMap
- ...

On Wed, Mar 22, 2023 at 10:24 AM Hang Ruan  wrote:

> Hi, David,
> I also read the code about the `SavepointWriter#withOperator`. The
> transformations are stored in a `Map` whose key is `OperatorID`. I don't
> come up with a way that we could register multi transformations for one
> operator with the provided API.
>
> Maybe we need a new type of  `XXXStateBootstrapFunction` to change more
> states at one time.
>
> Best,
> Hang
>
> David Artiga  于2023年3月22日周三 15:22写道:
>
>> We are using state
>> 
>>  processor
>> API
>> 
>>  to
>> bootstrap the state of some operators. It has been working fine until now,
>> when we tried to bootstrap an operator that has both a keyed state and a
>> broadcasted state. Seems the API does not provide a convenient method to
>> apply multiple transformations on the same *uid...*
>>
>> Is there a way to do that and we just missed it? Any insights appreciated.
>>
>> Cheers,
>> /David
>>
>


Re: Re: 我上报的一个sql bug没人处理怎么办?

2023-03-22 Thread Jane Chan
Hi,

如回复所述, 如果不想切换版本, 在 1.15 上可以尝试手动 cast 'abc' 字段为 varchar 来绕过这个问题
map ['msg_code','0', 'msg_reason', cast('abc' as string)]

如果不想修改 SQL, 目前只能手动编译出 release-1.17 分支, 编译方法参考 [1]

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/flinkdev/building/

祝好!
Jane

On Wed, Mar 22, 2023 at 6:04 PM Jeff  wrote:

> 通过读calcite1.27.0相关源码发现它已经修复了,但我使用的是flink 1.15无法直接使用1.27.0,所以只能使用本地编译的版本么?
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2023-03-22 10:41:42,"Shuo Cheng"  写道:
> >Hi,
> >
> >如果你知道问题出现在哪儿, 可以自己提个 PR 哦.
> >
> >Sincerely,
> >Shuo
> >
> >On Wed, Mar 22, 2023 at 11:23 AM Jeff  wrote:
> >
> >> 复制执行我提供的两个sql就一定会复现!
> >> 不管哪个flink版本一定都会有这个问题,因为它们都是使用calcite 1.26.0。
> >> 这个问题是这个版本calcite引起的。
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> 在 2023-03-22 09:28:17,"Jeff"  写道:
> >> >bug地址:
> >> >https://issues.apache.org/jira/browse/FLINK-31375?filter=-2
> >> >
> >> >
> >> >bug详细内容:
> >> >the values of map are truncated by the CASE WHEN
> function.
> >> >// sql
> >> >create table test (a map) with ('connector'='print');
> >> >insert into test  select * from (values(case when true then
> >> map['test','123456789'] else map ['msg_code','0', 'msg_reason', 'abc']
> >> end));
> >> >
> >> >the result:
> >> >
> >> >+I[{test=123}]
> >> >
> >> >We hope the value of result is '123456789', but I get '123', the length
> >> is limited by 'abc'.
> >>
>


Re:Re: 我上报的一个sql bug没人处理怎么办?

2023-03-22 Thread Jeff
通过读calcite1.27.0相关源码发现它已经修复了,但我使用的是flink 1.15无法直接使用1.27.0,所以只能使用本地编译的版本么?



















在 2023-03-22 10:41:42,"Shuo Cheng"  写道:
>Hi,
>
>如果你知道问题出现在哪儿, 可以自己提个 PR 哦.
>
>Sincerely,
>Shuo
>
>On Wed, Mar 22, 2023 at 11:23 AM Jeff  wrote:
>
>> 复制执行我提供的两个sql就一定会复现!
>> 不管哪个flink版本一定都会有这个问题,因为它们都是使用calcite 1.26.0。
>> 这个问题是这个版本calcite引起的。
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2023-03-22 09:28:17,"Jeff"  写道:
>> >bug地址:
>> >https://issues.apache.org/jira/browse/FLINK-31375?filter=-2
>> >
>> >
>> >bug详细内容:
>> >the values of map are truncated by the CASE WHEN function.
>> >// sql
>> >create table test (a map) with ('connector'='print');
>> >insert into test  select * from (values(case when true then
>> map['test','123456789'] else map ['msg_code','0', 'msg_reason', 'abc']
>> end));
>> >
>> >the result:
>> >
>> >+I[{test=123}]
>> >
>> >We hope the value of result is '123456789', but I get '123', the length
>> is limited by 'abc'.
>>


Re: regular join每条流单独设置ttl

2023-03-22 Thread Jane Chan
Hi,

我在社区发起了在 Operator 粒度设置 State TTL 的讨论 [1], 支持为每条流单独设置 TTL, 欢迎参与讨论 :)

[1] https://lists.apache.org/thread/ffmc96gv8ofoskbxlhtm7w8oxv8nqzct

Best,
Jane

On Wed, Feb 15, 2023 at 1:26 PM Jane Chan  wrote:

> 你好,
>
> 目前 Flink SQL 还不支持为每条流单独设置 state TTL, 不过社区计划支持这个功能, 最近就会有 FLIP 提出, 也欢迎参与讨论.
>
> Best regards,
> Jane
>
> On Wed, Feb 15, 2023 at 11:13 AM Jason_H  wrote:
>
>> 大家好,
>> 我遇到一个问题,在使用flinksql做双流join时,我用的是常规的regular
>> join,分别是一条数据流,一条维表流,现在我想单独给数据流设置ttl,请教一下可以怎么做,flink版本是1.15.2
>>
>>
>> | |
>> Jason_H
>> |
>> |
>> hyb_he...@163.com
>> |
>
>


Re: Bootstrapping multiple state within same operator

2023-03-22 Thread Hang Ruan
Hi, David,
I also read the code about the `SavepointWriter#withOperator`. The
transformations are stored in a `Map` whose key is `OperatorID`. I don't
come up with a way that we could register multi transformations for one
operator with the provided API.

Maybe we need a new type of  `XXXStateBootstrapFunction` to change more
states at one time.

Best,
Hang

David Artiga  于2023年3月22日周三 15:22写道:

> We are using state
> 
>  processor
> API
> 
>  to
> bootstrap the state of some operators. It has been working fine until now,
> when we tried to bootstrap an operator that has both a keyed state and a
> broadcasted state. Seems the API does not provide a convenient method to
> apply multiple transformations on the same *uid...*
>
> Is there a way to do that and we just missed it? Any insights appreciated.
>
> Cheers,
> /David
>


Bootstrapping multiple state within same operator

2023-03-22 Thread David Artiga
We are using state

processor
API

to
bootstrap the state of some operators. It has been working fine until now,
when we tried to bootstrap an operator that has both a keyed state and a
broadcasted state. Seems the API does not provide a convenient method to
apply multiple transformations on the same *uid...*

Is there a way to do that and we just missed it? Any insights appreciated.

Cheers,
/David


Re: 我上报的一个sql bug没人处理怎么办?

2023-03-22 Thread Jane Chan
Hi,

这是 Calcite 的一个 bug[1], 已经在 1.27.0 上修复. 不过由于 Flink 1.15.1, 1.15.2 和 1.16.1
都依赖 Calcite 1.26.0, 所以目前只能尝试如下方式绕过, 可以等 release-1.17 发布后升级到新版本上, 应该不会再有问题了.

select * from (values(case when true then map['test','123456789'] else
map ['msg_code','0', 'msg_reason', cast('abc' as string)] end));


[1] https://issues.apache.org/jira/browse/CALCITE-4603

Best,
Jane

On Wed, Mar 22, 2023 at 11:49 AM tison  wrote:

> 你可以关注下发布动态,测试一下 RC
> https://lists.apache.org/thread/d9o0tgnv0fl9goqsdo8wmq9121b9wolv
>
> Best,
> tison.
>
>
> tison  于2023年3月22日周三 11:47写道:
>
> > Flink master 上 calcite 的版本是 1.29,看起来会在 Flink 1.17 release 出来
> >
> > Best,
> > tison.
> >
> >
> > Shuo Cheng  于2023年3月22日周三 11:42写道:
> >
> >> Hi,
> >>
> >> 如果你知道问题出现在哪儿, 可以自己提个 PR 哦.
> >>
> >> Sincerely,
> >> Shuo
> >>
> >> On Wed, Mar 22, 2023 at 11:23 AM Jeff  wrote:
> >>
> >> > 复制执行我提供的两个sql就一定会复现!
> >> > 不管哪个flink版本一定都会有这个问题,因为它们都是使用calcite 1.26.0。
> >> > 这个问题是这个版本calcite引起的。
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> > 在 2023-03-22 09:28:17,"Jeff"  写道:
> >> > >bug地址:
> >> > >https://issues.apache.org/jira/browse/FLINK-31375?filter=-2
> >> > >
> >> > >
> >> > >bug详细内容:
> >> > >the values of map are truncated by the CASE WHEN
> >> function.
> >> > >// sql
> >> > >create table test (a map) with
> ('connector'='print');
> >> > >insert into test  select * from (values(case when true then
> >> > map['test','123456789'] else map ['msg_code','0', 'msg_reason', 'abc']
> >> > end));
> >> > >
> >> > >the result:
> >> > >
> >> > >+I[{test=123}]
> >> > >
> >> > >We hope the value of result is '123456789', but I get '123', the
> length
> >> > is limited by 'abc'.
> >> >
> >>
> >
>