[jira] [Created] (FLINK-33963) There is only one UDF instance after serializing the same task

2024-01-01 Thread lifengchao (Jira)
lifengchao created FLINK-33963:
--

 Summary: There is only one UDF instance after serializing the same 
task
 Key: FLINK-33963
 URL: https://issues.apache.org/jira/browse/FLINK-33963
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.18.0
 Environment: local env in idea test.

java 8
Reporter: lifengchao
 Fix For: 1.18.0


I define this UDF and expect the following SQL to return 'a', 'b', but it 
return 'a', 'a'.

```java
public class UdfSerializeFunc extends ScalarFunction {
static final Logger LOG = LoggerFactory.getLogger(UdfSerializeFunc.class);
String cache;
@Override
public void open(FunctionContext context) throws Exception {
LOG.warn("open:{}.", this.hashCode());
}

public String eval(String a, String b){
if(cache == null){
LOG.warn("cache_null.cache:{}", b);
cache = b;
}
return cache;
}
}
```

```
select
name,
udf_ser(name, 'a') name1,
udf_ser(name, 'b') name2
from heros
```

Changing UDF to this will achieve the expected results.

```java
public class UdfSerializeFunc2 extends ScalarFunction {
static final Logger LOG = LoggerFactory.getLogger(UdfSerializeFunc2.class);
String cache;
@Override
public void open(FunctionContext context) throws Exception {
LOG.warn("open:{}.", this.hashCode());
}

public String eval(String a, String b){
if(cache == null){
LOG.warn("cache_null.cache:{}", b);
cache = b;
}
return cache;
}

@Override
public TypeInference getTypeInference(DataTypeFactory typeFactory) {
return TypeInference.newBuilder()
.outputTypeStrategy(new TypeStrategy() {
@Override
public Optional inferType(CallContext callContext) {
List argumentDataTypes = callContext.getArgumentDataTypes();
if (argumentDataTypes.size() != 2) {
throw callContext.newValidationError("arg size error");
}
if (!callContext.isArgumentLiteral(1) || callContext.isArgumentNull(1)) {
throw callContext.newValidationError("Literal expected for second argument.");
}
cache = callContext.getArgumentValue(1, String.class).get();
return Optional.of(DataTypes.STRING());
}
})
.build();
}
}
```

 

My complete test code:

```
public class UdfSerializeFunc extends ScalarFunction {
static final Logger LOG = LoggerFactory.getLogger(UdfSerializeFunc.class);
String cache;
@Override
public void open(FunctionContext context) throws Exception {
LOG.warn("open:{}.", this.hashCode());
}

public String eval(String a, String b){
if(cache == null){
LOG.warn("cache_null.cache:{}", b);
cache = b;
}
return cache;
}
}
 
public class UdfSerializeFunc2 extends ScalarFunction {
static final Logger LOG = LoggerFactory.getLogger(UdfSerializeFunc2.class);
String cache;
@Override
public void open(FunctionContext context) throws Exception {
LOG.warn("open:{}.", this.hashCode());
}

public String eval(String a, String b){
if(cache == null){
LOG.warn("cache_null.cache:{}", b);
cache = b;
}
return cache;
}

@Override
public TypeInference getTypeInference(DataTypeFactory typeFactory) {
return TypeInference.newBuilder()
.outputTypeStrategy(new TypeStrategy() {
@Override
public Optional inferType(CallContext callContext) {
List argumentDataTypes = callContext.getArgumentDataTypes();
if (argumentDataTypes.size() != 2) {
throw callContext.newValidationError("arg size error");
}
if (!callContext.isArgumentLiteral(1) || callContext.isArgumentNull(1)) {
throw callContext.newValidationError("Literal expected for second argument.");
}
cache = callContext.getArgumentValue(1, String.class).get();
return Optional.of(DataTypes.STRING());
}
})
.build();
}
}
 
class UdfSerializeSuite extends AnyFunSuite with BeforeAndAfterAll{
var env: StreamExecutionEnvironment = _
var tEnv: StreamTableEnvironment = _

override protected def beforeAll(): Unit = {
val conf = new Configuration()
env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf)
env.setParallelism(2)
env.getConfig.enableObjectReuse()

tEnv = StreamTableEnvironment.create(env)
}

/**
* 2个task,只是每个task有一个udf,udf_ser(name, 'a')和udf_ser(name, 'b')没区分开
* 它这函数的序列化真傻屌,单个task的2个udf_ser序列化后还是同一个对象,不是2个
* getTypeInference中修改udf的属性可以实现2个不同的对象
*/
test("UdfSerializeFunc"){
tEnv.createTemporarySystemFunction("udf_ser", classOf[UdfSerializeFunc])

var sql = """
CREATE TEMPORARY TABLE heros (
`name` STRING,
`power` STRING,
`age` INT
) WITH (
'connector' = 'faker',
'fields.name.expression' = '#\{superhero.name}',
'fields.power.expression' = '#\{superhero.power}',
'fields.power.null-rate' = '0.05',
'rows-per-second' = '1',
'fields.age.expression' = '#\{number.numberBetween ''0'',''1000''}'
)
"""
tEnv.executeSql(sql)

sql = """
select
udf_ser(name, 'a') name1,
udf_ser(name, 'b') name2
from heros
"""
val rstTable = tEnv.sqlQuery(sql)
rstTable.printSchema()

rstTable.execute().print()
}

/**
* 修改ScalarFunction的属性,能使之序列化后是不同的对象
*/
test("UdfSerializeFunc2"){
tEnv.createTemporarySystemFunction("udf_ser", classOf[UdfSerializeFunc2])

var sql = """
CREATE 

[jira] [Created] (FLINK-33962) Chaining-agnostic OperatorID generation for improved state compatibility on parallelism change

2024-01-01 Thread Zhanghao Chen (Jira)
Zhanghao Chen created FLINK-33962:
-

 Summary: Chaining-agnostic OperatorID generation for improved 
state compatibility on parallelism change
 Key: FLINK-33962
 URL: https://issues.apache.org/jira/browse/FLINK-33962
 Project: Flink
  Issue Type: Improvement
  Components: API / Core
Reporter: Zhanghao Chen


*Background*

Flink restores opeartor state from snapshots based on matching the operatorIDs. 
Since Flink 1.2, {{StreamGraphHasherV2}} is used for operatorID generation when 
no user-set uid exist. The generated OperatorID is deterministic with respect 
to:
 * node-local properties (the traverse ID in the BFS for the DAG)
 * chained output nodes
 * input nodes hashes

*Problem*

The chaining behavior will affect state compatibility, as the generation of the 
OperatorID of an Op is dependent on its chained output nodes. For example, a 
simple source->sink DAG with source and sink chained together is state 
imcompatible with an otherwise identical DAG with source and sink unchained 
(either because the parallelisms of the two ops are changed to be unequal or 
chaining is disabled). This greatly limits the flexibility to perform 
chain-breaking/joining for performance tuning.

*Proposal*

Introduce ** {{StreamGraphHasherV3}} that is agnostic to the chaining behavior 
of operators, which effectively just removes L227-235 of 
[flink/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphHasherV2.java
 at master · apache/flink 
(github.com)|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphHasherV2.java].
 

This will not hurt the deteministicity of the ID generation across job 
submission as long as the stream graph topology doesn't change, and since new 
versions of Flink have already adopted pure operator-level state recovery, this 
will not break state recovery across job submission as long as both submissions 
use the same hasher.

This will, however, breaks cross-version state compatibility. So we can 
introduce a new option to enable using HasherV3 in v1.19 and consider making it 
the default hasher in v2.0.

Looking forward to suggestions on this.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [Discuss] FLIP-407: Improve Flink Client performance in interactive scenarios

2024-01-01 Thread Yangze Guo
Thanks for driving this, Xiangyu!

+1 for the overall proposal. I believe this will enhance the
availability and competitiveness of Flink in OLAP scenarios.

Best,
Yangze Guo

On Tue, Dec 26, 2023 at 4:51 PM xiangyu feng  wrote:
>
> Hi devs,
>
> I'm opening this thread to discuss FLIP-407: Improve Flink Client
> performance in interactive scenarios. The POC test results and design doc
> can be found at: FLIP-407
> 
> .
>
> Currently, Flink Client is mainly designed for one time interaction with
> the Flink Cluster. All the resources(http connections, threads, ha
> services) and instances(ClusterDescriptor, ClusterClient, RestClient) are
> created and recycled for each interaction. This works well when users do
> not need to interact frequently with Flink Cluster and also saves resource
> usage since resources are recycled immediately after each usage.
>
> However, in OLAP or StreamingWarehouse scenarios, users might submit
> interactive jobs to a dedicated Flink Session Cluster very often. In this
> case, we find that for short queries that can finish in less than 1s in
> Flink Cluster will still have E2E latency greater than 2s. Hence, we
> propose this FLIP to improve the Flink Client performance in this scenario.
> This could also improve the user experience when using session debug mode.
>
> The major change in this FLIP is that there will be a new introduced option
> *'execution.interactive-client'*. When this option is enabled, Flink
> Client will reuse all the necessary resources to improve interactive
> performance, including: HA Services, HTTP connections, threads and all
> kinds of instances related to a long-running Flink Cluster. The default
> value of this option will be false, then Flink Client will behave as before.
>
> Also, this FLIP proposed a configurable RetryStrategy when fetching results
> from client-side to Flink Cluster. In interactive scenarios, this can save
> more than 15% of TM CPU usage without performance degradation.
>
> Looking forward to your feedback, thanks.
>
> Best regards,
> Xiangyu


Re: [VOTE] FLIP-398: Improve Serialization Configuration And Usage In Flink

2024-01-01 Thread weijie guo
+1(binding)

Best regards,

Weijie


Zhu Zhu  于2023年12月27日周三 18:54写道:

> +1 (binding)
>
> Thanks,
> Zhu
>
> Zhanghao Chen  于2023年12月27日周三 15:41写道:
>
> > +1 (non-binding)
> >
> > Best,
> > Zhanghao Chen
> > 
> > From: Yong Fang 
> > Sent: Wednesday, December 27, 2023 14:54
> > To: dev 
> > Subject: [VOTE] FLIP-398: Improve Serialization Configuration And Usage
> In
> > Flink
> >
> > Hi devs,
> >
> > Thanks for all feedback about the FLIP-398: Improve Serialization
> > Configuration And Usage In Flink [1] which has been discussed in [2].
> >
> > I'd like to start a vote for it. The vote will be open for at least 72
> > hours unless there is an objection or insufficient votes.
> >
> > [1]
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-398%3A+Improve+Serialization+Configuration+And+Usage+In+Flink
> > [2] https://lists.apache.org/thread/m67s4qfrh660lktpq7yqf9docvvf5o9l
> >
> > Best,
> > Fang Yong
> >
>


Re: [DISCUSS] FLIP-405: Migrate string configuration key to ConfigOption

2024-01-01 Thread Xuannan Su
Hi all,

Thank you for all your comments! The FLIP has been updated
accordingly. Please let me know if you have any further questions or
comments.

Also, note that many people are on Christmas break, so we will keep
the discussion open for another week.

Best,
Xuannan

On Wed, Dec 27, 2023 at 5:20 PM Rui Fan <1996fan...@gmail.com> wrote:
>
> > After some investigation, it turns out those options of input/output
> > format are only publicly exposed in the DataSet docs[2], which is
> > deprecated. Thus, marking them as deprecated and removed in Flink 2.0
> > looks fine to me.
>
> Thanks Xuannan for the detailed investigation, if so, deprecate them
> and removing them in Flink 2.0 looks good to me.
>
> > I think the key of LOCAL_NUMBER_TASK_MANAGER is better as
> > 'minicluster.number-of-taskmanagers' or 'minicluster.taskmanager-number'
> > instead of 'minicluster.number-taskmanager'.
>
> Thanks Hang for the good suggestion! 'minicluster.number-of-taskmanagers'
> sounds good to me, it's similar to taskmanager.numberOfTaskSlots.
>
> Best,
> Rui
>
> On Wed, Dec 27, 2023 at 1:56 PM Hang Ruan  wrote:
>>
>> Hi, Rui Fan.
>>
>> Thanks for this FLIP.
>>
>> I think the key of LOCAL_NUMBER_TASK_MANAGER is better as
>> 'minicluster.number-of-taskmanagers' or 'minicluster.taskmanager-number'
>> instead of 'minicluster.number-taskmanager'.
>>
>> Best,
>> Hang
>>
>> Xuannan Su  于2023年12月27日周三 12:40写道:
>>
>> > Hi Xintong and Rui,
>> >
>> > Thanks for the quick feedback and the suggestions.
>> >
>> > > 1. I think the default value for `TASK_MANAGER_LOG_PATH_KEY` should be
>> > "no
>> > > default".
>> >
>> > I have considered both ways of describing the default value. However,
>> > I found out that some of the configurations, such as `web.tmpdir`, put
>> > `System.getProperty()` in the default value [1]. Some are putting the
>> > description in the default value column[2]. So I just picked the first
>> > one. I am fine with either way, so long as they are consistent. WDYT?
>> >
>> > > 3. Simply saying "getting / setting value with string key is discouraged"
>> > > in JavaDoc of get/setString is IMHO a bit confusing. People may have the
>> > > question why would we keep the discouraged interfaces at all. I would
>> > > suggest the following:
>> > > ```
>> > > We encourage users and developers to always use ConfigOption for getting
>> > /
>> > > setting the configurations if possible, for its rich description, type,
>> > > default-value and other supports. The string-key-based getter / setter
>> > > should only be used when ConfigOption is not applicable, e.g., the key is
>> > > programmatically generated in runtime.
>> > > ```
>> >
>> > The suggested comment looks good to me. Thanks for the suggestion. I
>> > will update the comment in the FLIP.
>> >
>> > > 2. So I wonder if we can simply mark them as deprecated and remove in
>> > 2.0.
>> >
>> > After some investigation, it turns out those options of input/output
>> > format are only publicly exposed in the DataSet docs[2], which is
>> > deprecated. Thus, marking them as deprecated and removed in Flink 2.0
>> > looks fine to me.
>> >
>> >
>> > @Rui
>> >
>> > > Configuration has a `public  T get(ConfigOption option)` method.
>> > > Could we remove all `Xxx getXxx(ConfigOption configOption)` methods?
>> >
>> > +1 Only keep the get(ConfigOption option),
>> > getOptional(ConfigOption option), and set(ConfigOption option, T
>> > value).
>> >
>> > Best,
>> > Xuannan
>> >
>> > [1]
>> > https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#web-tmpdir
>> > [2]
>> > https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#kubernetes-container-image-ref
>> > [3]
>> > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/dataset/overview/#data-sources
>> >
>> >
>> >
>> >
>> > On Tue, Dec 26, 2023 at 8:47 PM Xintong Song 
>> > wrote:
>> > >
>> > > >
>> > > > Configuration has a `public  T get(ConfigOption option)` method.
>> > > > Could we remove all `Xxx getXxx(ConfigOption configOption)`
>> > methods?
>> > >
>> > >
>> > >
>> > > Note: all `public void setXxx(ConfigOption key, Xxx value)` methods
>> > > > can be replaced with `public  Configuration set(ConfigOption
>> > option,
>> > > > T value)` as well.
>> > >
>> > >
>> > > +1
>> > >
>> > >
>> > > Best,
>> > >
>> > > Xintong
>> > >
>> > >
>> > >
>> > > On Tue, Dec 26, 2023 at 8:44 PM Xintong Song 
>> > wrote:
>> > >
>> > > > These features don't have a public option, but they work. I'm not sure
>> > > >> whether these features are used by some advanced users.
>> > > >> Actually, I think some of them are valuable! For example:
>> > > >>
>> > > >> - ConfigConstants.YARN_CONTAINER_START_COMMAND_TEMPLATE
>> > > >>   allows users to define the start command of the yarn container.
>> > > >> - FileInputFormat.ENUMERATE_NESTED_FILES_FLAG allows
>> > > >>   flink job reads all files under the directory even if it has nested
>> > > >> directories.
>> > > >>
>> > > >> This FLIP focuses on the 

Re: [DISCUSS] FLIP-406: Reorganize State & Checkpointing & Recovery Configuration

2024-01-01 Thread Rui Fan
Hi Zakelly,

I'm not sure whether we could add the state backend type in the
new key name of state.backend.incremental. It means we use
`execution.checkpointing.rocksdb-incremental` or
`execution.checkpointing.rocksdb-incremental.enabled`.

So far, state.backend.incremental only works for rocksdb state backend.
And this feature or optimization is very valuable and huge for large
state flink jobs. I believe it's enabled for most production flink jobs
with large rocksdb state.

If this option isn't generic for all state backend types, I guess we
can enable `execution.checkpointing.rocksdb-incremental.enabled`
by default in Flink 2.0.

But if it works for all state backends, it's hard to enable it by default.
Enabling great and valuable features or improvements are useful
for users, especially a lot of new flink users. Out-of-the-box options
are good for users.

WDYT?

Best,
Rui

On Fri, Dec 29, 2023 at 1:45 PM Zakelly Lan  wrote:

> Hi everyone,
>
> Thanks all for your comments!
>
> As many of you have questions about the names for boolean options, I
> suggest we make a naming rule for them. For now I could think of three
> options:
>
> Option 1: Use enumeration options if possible. But this may cause some name
> collisions or confusion as we discussed and we should unify the statement
> everywhere.
> Option 2: Use boolean options and add 'enabled' as the suffix.
> Option 3: Use boolean options and ONLY add 'enabled' when there are more
> detailed configurations under the same prefix, to prevent one name from
> serving as a prefix to another.
>
> I am slightly inclined to Option 3, since it is more in line with current
> practice and friendly for existing users. Also It reduces the length of
> configuration names as much as possible. I really want to hear your
> opinions.
>
>
> @Xuannan
>
> I agree with your comments 1 and 3.
>
> For 2, If we decide to change the name, maybe
> `execution.checkpointing.parallel-cleaner` is better? And as for whether to
> add 'enabled' I suggest we discuss the rule above. WDYT?
> Thanks!
>
>
> Best,
> Zakelly
>
> On Fri, Dec 29, 2023 at 12:02 PM Xuannan Su  wrote:
>
> > Hi Zakelly,
> >
> > Thanks for driving this! The organization of the configuration option
> > in the FLIP looks much cleaner and easier to understand. +1 to the
> > FLIP.
> >
> > Just some questions from me.
> >
> > 1. I think the change to the ConfigOptions should be put in the
> > `Public Interface` section, instead of `Proposed Changed`, as those
> > configuration options are public interface.
> >
> > 2. The key `state.checkpoint.cleaner.parallel-mode` seems confusing.
> > It feels like it is used to choose different modes. In fact, it is a
> > boolean flag to indicate whether to enable parallel clean. How about
> > making it `state.checkpoint.cleaner.parallel-mode.enabled`?
> >
> > 3. The `execution.checkpointing.write-buffer` may better be
> > `execution.checkpointing.write-buffer-size` so that we know it is
> > configuring the size of the buffer.
> >
> > Best,
> > Xuannan
> >
> >
> > On Wed, Dec 27, 2023 at 7:17 PM Yanfei Lei  wrote:
> > >
> > > Hi Zakelly,
> > >
> > > > Considering the name occupation, how about naming it as
> > `execution.checkpointing.type`?
> > >
> > > `Checkpoint Type`[1,2] is used to describe aligned/unaligned
> > > checkpoint, I am inclined to make a choice between
> > > `execution.checkpointing.incremental` and
> > > `execution.checkpointing.incremental.enabled`.
> > >
> > >
> > > [1]
> >
> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/ops/monitoring/checkpoint_monitoring/
> > > [2]
> >
> https://github.com/apache/flink/blob/master/flink-runtime-web/web-dashboard/src/app/pages/job/checkpoints/detail/job-checkpoints-detail.component.html#L27
> > >
> > > --
> > > Best,
> > > Yanfei
> > >
> > > Zakelly Lan  于2023年12月27日周三 14:41写道:
> > > >
> > > > Hi Lijie,
> > > >
> > > > Thanks for the reminder! I missed this.
> > > >
> > > > Considering the name occupation, how about naming it as
> > > > `execution.checkpointing.type`?
> > > >
> > > > Actually I think the current `execution.checkpointing.mode` is
> > confusing in
> > > > some ways, maybe `execution.checkpointing.data-consistency` is
> better.
> > > >
> > > >
> > > > Best,
> > > > Zakelly
> > > >
> > > >
> > > > On Wed, Dec 27, 2023 at 12:59 PM Lijie Wang <
> wangdachui9...@gmail.com>
> > > > wrote:
> > > >
> > > > > Hi Zakelly,
> > > > >
> > > > > >> I'm wondering if `execution.checkpointing.savepoint-dir` would
> be
> > > > > better.
> > > > >
> > > > > `execution.checkpointing.dir` and
> > `execution.checkpointing.savepoint-dir`
> > > > > are also fine for me.
> > > > >
> > > > > >> So I think an enumeration option `execution.checkpointing.mode`
> > which
> > > > > can be 'full' (default) or 'incremental' would be better
> > > > >
> > > > > I agree with using an enumeration option. But currently there is
> > already a
> > > > > configuration option called `execution.checkpointing.mode`, which
> is
> > used
> > 

[jira] [Created] (FLINK-33961) Hybrid Shuffle may hang when exclusive buffers per channel is set to 0

2024-01-01 Thread Jiang Xin (Jira)
Jiang Xin created FLINK-33961:
-

 Summary: Hybrid Shuffle may hang when exclusive buffers per 
channel is set to 0
 Key: FLINK-33961
 URL: https://issues.apache.org/jira/browse/FLINK-33961
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Network
Reporter: Jiang Xin


I found that the Hybrid Shuffle without enabling new mode may hang when 
exclusive-buffers-per-channel is set to 0. It can be reproduced by adding the 
following test into 
HybridShuffleITCase.java and run it.
{code:java}
@RepeatedTest(10)
void testHybridFullExchangesWithNonBuffersPerChannel() throws Exception {
final int numRecordsToSend = 1;
Configuration configuration = configureHybridOptions(getConfiguration(), 
false);
configuration.set(

NettyShuffleEnvironmentOptions.NETWORK_HYBRID_SHUFFLE_ENABLE_NEW_MODE, false);
configuration.set(NETWORK_BUFFERS_PER_CHANNEL, 0);
JobGraph jobGraph = createJobGraph(numRecordsToSend, false, configuration);
executeJob(jobGraph, configuration, numRecordsToSend);
} {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-397: Add config options for administrator JVM options

2024-01-01 Thread Zhanghao Chen
Hi Xiangyu,

The proposed new options are targeted on experienced Flink platform 
administrators instead of normal end users, and a one-by-one mapping from 
non-default option to the default option variant might be easier for users to 
understand. Also, although JM and TM tend to use the same set of JVM args in 
most times, there're cases where different set of JVM args are preferable. So I 
am leaning towards the current design, WDYT?

Best,
Zhanghao Chen

From: xiangyu feng 
Sent: Friday, December 29, 2023 20:20
To: dev@flink.apache.org 
Subject: Re: [DISCUSS] FLIP-397: Add config options for administrator JVM 
options

Hi Zhanghao,

Thanks for driving this. +1 for the overall idea.

One minor question, do we need separate administrator JVM options for both
JobManager and TaskManager? Or just one administrator JVM option for all?

I'm afraid of 6 jvm
options(env.java.opts.all\env.java.default-opts.all\env.java.opts.jobmanager\env.java.default-opts.jobmanager\env.java.opts.taskmanager\env.java.default-opts.taskmanager)
may confuse users.

Regards,
Xiangyu


Yong Fang  于2023年12月27日周三 15:36写道:

> +1 for this, we have met jobs that need to set GC policies different from
> the default ones to improve performance. Separating the default and
> user-set ones can help us better manage them.
>
> Best,
> Fang Yong
>
> On Fri, Dec 22, 2023 at 9:18 PM Benchao Li  wrote:
>
> > +1 from my side,
> >
> > I also met some scenarios that I wanted to set some JVM options by
> > default for all Flink jobs before, such as
> > '-XX:-DontCompileHugeMethods', without it, some generated big methods
> > won't be optimized in JVM C2 compiler, leading to poor performance.
> >
> > Zhanghao Chen  于2023年11月27日周一 20:04写道:
> > >
> > > Hi devs,
> > >
> > > I'd like to start a discussion on FLIP-397: Add config options for
> > administrator JVM options [1].
> > >
> > > In production environments, users typically develop and operate their
> > Flink jobs through a managed platform. Users may need to add JVM options
> to
> > their Flink applications (e.g. to tune GC options). They typically use
> the
> > env.java.opts.x series of options to do so. Platform administrators also
> > have a set of JVM options to apply by default, e.g. to use JVM 17, enable
> > GC logging, or apply pretuned GC options, etc. Both use cases will need
> to
> > set the same series of options and will clobber one another. Similar
> issues
> > have been described in SPARK-23472 [2].
> > >
> > > Therefore, I propose adding a set of default JVM options for
> > administrator use that prepends the user-set extra JVM options.
> > >
> > > Looking forward to hearing from you.
> > >
> > > [1]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-397%3A+Add+config+options+for+administrator+JVM+options
> > > [2] https://issues.apache.org/jira/browse/SPARK-23472
> > >
> > > Best,
> > > Zhanghao Chen
> >
> >
> >
> > --
> >
> > Best,
> > Benchao Li
> >
>


Re:Re: Re: [DISCUSS] FLIP-392: Deprecate the Legacy Group Window Aggregation

2024-01-01 Thread Xuyang
Hi, Martijn.
Thank you for your reminder :)


My idea is that in the current 1.x version, we can automatically convert the 
agg operator in the old grammar into the agg operator
in the new grammar. Huge changes will be introduced in version 2.0 that old 
syntax will be directly deleted at the code level.
>That would imply that we will never be able to remove the old SQL
>language from the code base, since we would always rewrite that old
>language to the new implementation under the hood.
I'm a little curious why the old syntax can't be removed in the code in 2.0. If 
you have any better ideas, let’s discuss it together.




--

Best!
Xuyang





At 2023-12-27 23:20:06, "Martijn Visser"  wrote:
>Hi Xuyang,
>
>It's currently the holiday season in Europe so do expect some slow responses.
>
>> The key reason the original FLIP is called "Deprecate the Legacy Group 
>> Window Aggregation" is that we also plan to automatically rewrite the group 
>> window agg corresponding the old syntax into the window agg corresponding 
>> the new window TVF syntax (will provide a fallback option from a 
>> compatibility perspective). Whether the window agg corresponding the new 
>> syntax is actively used by user or automatically rewritten, we all rely on 
>> the alignment of the functionality between the window agg and the legacy 
>> group window agg.
>
>That would imply that we will never be able to remove the old SQL
>language from the code base, since we would always rewrite that old
>language to the new implementation under the hood. I don't think
>that's necessarily a good idea, especially given that Flink 2.0 is
>coming next year and we could make a clean break there.
>
>Best regards,
>
>Martijn
>
>On Thu, Dec 21, 2023 at 12:44 PM Xuyang  wrote:
>>
>> Hi, Timo. Sorry to bother you. There's something I really need to hear your 
>> thoughts on.
>>
>>
>>
>>
>> When I'm trying to split this flip, having reviewed this discussion and the 
>> FLIP document again, I realized that there is still a key issue that hasn't 
>> been clarified. The key reason the original FLIP is called "Deprecate the 
>> Legacy Group Window Aggregation" is that we also plan to automatically 
>> rewrite the group window agg corresponding the old syntax into the window 
>> agg corresponding the new window TVF syntax (will provide a fallback option 
>> from a compatibility perspective). Whether the window agg corresponding the 
>> new syntax is actively used by user or automatically rewritten, we all rely 
>> on the alignment of the functionality between the window agg and the legacy 
>> group window agg.
>>
>>
>>
>>
>> To explain in detail, the original flip has the following two core parts.
>>
>>
>>
>>
>> 1. Automatically rewrite the legacy group window agg into the new window agg 
>> during plan optimization. (Corresponding to Section 5 in the Proposed 
>> Changes of the original FLIP)
>>
>>
>>
>>
>> 2. The alignment subtasks that the rewriting work depends on, involve 
>> aligning the features of the two operators. (No one had objections to this 
>> part of the work, and some of them are WIP) (Corresponding to Section 1-4 in 
>> the Proposed Changes of the original FLIP)
>>
>>
>>
>>
>> Currently, there are two ways to deal with this flip.
>>
>>
>>
>>
>> 1. According to your previous suggestion, split the subtasks of the two 
>> alignment features of supporting cdc stream and supporting HOP window size 
>> with non-integer step length into independent flips. Moreover, an additional 
>> Flip should be added to describe the work of automatic plan rewriting. In 
>> the discussion email, associate these three flips together. I'm not sure 
>> that's a bit trivial about doing this because the first two flips are small 
>> features actually. (And we need to discuss it separately and vote on them 
>> individually, right?)
>>
>>
>>
>>
>> 2. Modify the original flip title and change the content to the following 
>> style to highlight the automatic plan rewriting.
>>
>>
>>
>>
>> title: FLIP-392: Automatically rewrite the old syntax's agg to the window 
>> TVF agg
>>
>> part 1: automatic plan rewriting
>>
>> part 2: alignment subtasks the rewriting work depends on
>>
>>
>>
>>
>> I'm looking forward to your reply.
>>
>>
>>
>>
>> --
>>
>> Best!
>> Xuyang
>>
>>
>>
>>
>>
>> 在 2023-12-19 22:30:27,"Timo Walther"  写道:
>> >Hi Xuyang,
>> >
>> >sorry I missed the ping. Sounds reasonable to me. One FLIP about
>> >changelog semantics, the other about SQL semantics.
>> >
>> >Regards,
>> >Timo
>> >
>> >On 19.12.23 02:39, Xuyang wrote:
>> >> Hi, Timo. Sorry for this noise.
>> >> What do you think about splitting the flip like this?
>> >>
>> >>
>> >>
>> >>
>> >> --
>> >>
>> >>  Best!
>> >>  Xuyang
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> At 2023-12-15 10:05:32, "Xuyang"  wrote:
>> >>> Hi, Timo, thanks for your advice.
>> >>>
>> >>>
>> >>> I am considering splitting the existing flip into two while leaving the 
>> >>> existing flip (or without).
>> >>> One 

Re: [DISCUSS] FLIP-404: Create a Redis HyperLogLog Connector for Flink

2024-01-01 Thread Jinsui Chen
Hi, Martijn

Thank you for your input. I align with your viewpoint and am fully on board
with the idea. I'll continue contributing and provide assistance wherever
possible.

Best regards,
Jinsui

Martijn Visser  于2023年12月27日周三 23:56写道:

> Hi Jinsui,
>
> Thanks for opening the discussion and creating the FLIP. My main
> concern isn't so much the connector, it's more that we're talking
> about yet another Redis connector implementation but we actually don't
> have a proper Redis connector overall. Ideally it would be great if we
> could get https://github.com/apache/flink-connector-redis-streams/ in
> proper shape (perhaps you could help with some reviews) and then
> moving all Redis connectors into one mono repo, kind of how it's done
> for AWS connectors.
>
> Best regards,
>
> Martijn
>
> On Tue, Dec 19, 2023 at 6:25 PM Jinsui Chen 
> wrote:
> >
> > Hi, there
> >
> > We would like to start a discussion thread on "FLIP-404: Create a Redis
> > HyperLogLog Connector for Flink"[1].
> >
> > There has also been a lot of discussion in the past about Redis
> connectors,
> > including creating lookup and sink connectors for generic key-value get
> and
> > put, and creating source and sink connectors for streams structures.
> >
> > Different data structures are used in different ways. I think we can
> build
> > generic connectors based on KVs and create special-purpose connectors for
> > specific data structures.
> >
> > For example, for list structure, end-user can specify the data to be
> placed
> > or get on the left or right side of the list to build the sink or source
> > connector, and for hyperloglog, end-user needs to write the data to
> > multiple keys by the content of the table field.
> >
> > For the differentiated usage of different data structures, a unified
> > connector may be difficult to maintain due to too much capacity.
> >
> > Based on above, this FLIP is proposed.
> >
> > Looking forward to any comment or feedback.
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-404%3A+Create+a+Redis+HyperLogLog+Connector+for+Flink
> >
> > Best regards,
> > Jinsui
>


Re: [VOTE] Release flink-connector-opensearch v1.1.0, release candidate #1

2024-01-01 Thread Danny Cranmer
Hey,

Gordon, apologies for the delay. Yes this is the correct understanding, all
connectors follow a similar pattern.

Would appreciate some PMC eyes on this release.

Thanks,
Danny

On Thu, 23 Nov 2023, 23:28 Tzu-Li (Gordon) Tai,  wrote:

> Hi Danny,
>
> Thanks for starting a RC for this.
>
> From the looks of the staged POMs for 1.1.0-1.18, the flink versions for
> Flink dependencies still point to 1.17.1.
>
> My understanding is that this is fine, as those provided scope
> dependencies (e.g. flink-streaming-java) will have their versions
> overwritten by the user POM if they do intend to compile their jobs against
> Flink 1.18.x.
> Can you clarify if this is the correct understanding of how we intend the
> externalized connector artifacts to be published? Related discussion on
> [1].
>
> Thanks,
> Gordon
>
> [1] https://lists.apache.org/thread/x1pyrrrq7o1wv1lcdovhzpo4qhd4tvb4
>
> On Thu, Nov 23, 2023 at 3:14 PM Sergey Nuyanzin 
> wrote:
>
> > +1 (non-binding)
> >
> > - downloaded artifacts
> > - built from source
> > - verified checksums and signatures
> > - reviewed web pr
> >
> >
> > On Mon, Nov 6, 2023 at 5:31 PM Ryan Skraba  >
> > wrote:
> >
> > > Hello! +1 (non-binding) Thanks for the release!
> > >
> > > I've validated the source for the RC1:
> > > * flink-connector-opensearch-1.1.0-src.tgz at r64995
> > > * The sha512 checksum is OK.
> > > * The source file is signed correctly.
> > > * The signature 0F79F2AFB2351BC29678544591F9C1EC125FD8DB is found in
> the
> > > KEYS file, and on https://keyserver.ubuntu.com/
> > > * The source file is consistent with the GitHub tag v1.1.0-rc1, which
> > > corresponds to commit 0f659cc65131c9ff7c8c35eb91f5189e80414ea1
> > > - The files explicitly excluded by create_pristine_sources (such as
> > > .gitignore and the submodule tools/releasing/shared) are not present.
> > > * Has a LICENSE file and a NOTICE file
> > > * Does not contain any compiled binaries.
> > >
> > > * The sources can be compiled and unit tests pass with flink.version
> > 1.17.1
> > > and flink.version 1.18.0
> > >
> > > * Nexus has three staged artifact ids for 1.1.0-1.17 and 1.1.0-1.18
> > > - flink-connector-opensearch (.jar, -javadoc.jar, -sources.jar,
> > > -tests.jar and .pom)
> > > - flink-sql-connector-opensearch (.jar, -sources.jar and .pom)
> > > - flink-connector-gcp-pubsub-parent (only .pom)
> > >
> > > All my best, Ryan
> > >
> > > On Fri, Nov 3, 2023 at 10:29 AM Danny Cranmer  >
> > > wrote:
> > > >
> > > > Hi everyone,
> > > >
> > > > Please review and vote on the release candidate #1 for the version
> > 1.1.0
> > > of
> > > > flink-connector-opensearch, as follows:
> > > > [ ] +1, Approve the release
> > > > [ ] -1, Do not approve the release (please provide specific comments)
> > > >
> > > >
> > > > The complete staging area is available for your review, which
> includes:
> > > > * JIRA release notes [1],
> > > > * the official Apache source release to be deployed to
> dist.apache.org
> > > [2],
> > > > which are signed with the key with fingerprint
> > > > 0F79F2AFB2351BC29678544591F9C1EC125FD8DB [3],
> > > > * all artifacts to be deployed to the Maven Central Repository [4],
> > > > * source code tag v1.1.0-rc1 [5],
> > > > * website pull request listing the new release [6].
> > > >
> > > > The vote will be open for at least 72 hours. It is adopted by
> majority
> > > > approval, with at least 3 PMC affirmative votes.
> > > >
> > > > Thanks,
> > > > Danny
> > > >
> > > > [1]
> > > >
> > >
> >
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12353141
> > > > [2]
> > > >
> > >
> >
> https://dist.apache.org/repos/dist/dev/flink/flink-connector-opensearch-1.1.0-rc1/
> > > > [3] https://dist.apache.org/repos/dist/release/flink/KEYS
> > > > [4]
> > >
> https://repository.apache.org/content/repositories/orgapacheflink-1666/
> > > > [5]
> > https://github.com/apache/flink-connector-opensearch/tree/v1.1.0-rc1
> > > > [6] https://github.com/apache/flink-web/pull/694
> > >
> >
> >
> > --
> > Best regards,
> > Sergey
> >
>