[jira] [Created] (FLINK-34634) Restarting the job will not read the changelog anymore if it stops before the synchronization of meta information is complete and some table is removed

2024-03-08 Thread Hongshun Wang (Jira)
Hongshun Wang created FLINK-34634:
-

 Summary: Restarting the job will not read the changelog anymore if 
it stops before the synchronization of meta information is complete and some 
table is removed
 Key: FLINK-34634
 URL: https://issues.apache.org/jira/browse/FLINK-34634
 Project: Flink
  Issue Type: Improvement
  Components: Flink CDC
Reporter: Hongshun Wang
 Fix For: cdc-3.1.0
 Attachments: image-2024-03-09-15-25-26-187.png, 
image-2024-03-09-15-27-46-073.png

Once, I removed a table from the option and then restarted the job from the 
savepoint, but the job couldn't read the binlog anymore. When I checked the 
logs, I found an Error level log stating:

' The enumerator received invalid request meta group id 6, the valid meta group 
id range is [0, 4].'

It appears that the Reader is requesting more splits than the Enumerator is 
aware of.

However, the code should indeed remove redundant split information from the 
Reader as seen in 
[https://github.com/ververica/flink-cdc-connectors/pull/2292]. So why does this 
issue occur?

!image-2024-03-09-15-25-26-187.png!

Upon examining the code, I discovered the cause. If the job stops before 
completing all the split meta information and then restarts, this issue occurs. 
Suppose that the totalFinishedSplitSize of binlogSplit in the Reader is 6, and 
no meta information has been synchronized, leaving the 
finishedSnapshotSplitInfos of binlogSplit in the Reader empty. After 
restarting, the totalFinishedSplitSize of binlogSplit in the Reader equals (6 - 
(0 - 0)) which is still 6, but in the Enumerator, it is only 4(the removed 
table have two split). This could lead to an out-of-range request.

!image-2024-03-09-15-27-46-073.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34633) Support unnesting array constants

2024-03-08 Thread Xingcan Cui (Jira)
Xingcan Cui created FLINK-34633:
---

 Summary: Support unnesting array constants
 Key: FLINK-34633
 URL: https://issues.apache.org/jira/browse/FLINK-34633
 Project: Flink
  Issue Type: New Feature
  Components: Table SQL / Planner
Affects Versions: 1.18.1
Reporter: Xingcan Cui


It seems that the current planner doesn't support using UNNEST on array 
constants.(x)
{code:java}
SELECT * FROM UNNEST(ARRAY[1,2,3]);{code}
 
The following query can be compiled.(x)
{code:java}
SELECT * FROM (VALUES('a')) CROSS JOIN UNNEST(ARRAY[1, 2, 3]){code}
 
The rewritten version works. (/)
{code:java}
SELECT * FROM (SELECT *, ARRAY[1,2,3] AS A FROM (VALUES('a'))) CROSS JOIN 
UNNEST(A){code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34632) Log checkpoint Id when logging checkpoint processing delay

2024-03-08 Thread Mingliang Liu (Jira)
Mingliang Liu created FLINK-34632:
-

 Summary: Log checkpoint Id when logging checkpoint processing delay
 Key: FLINK-34632
 URL: https://issues.apache.org/jira/browse/FLINK-34632
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Checkpointing
Affects Versions: 1.18.1
Reporter: Mingliang Liu


Currently we log a warning message when the checkpoint barrier takes too long 
to start processing. It has the delay and would be easier for debugging 
respective checkpoint if the id is also logged.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP Suggestion: Externalize Kudu Connector from Bahir

2024-03-08 Thread Ferenc Csaky
Thank you Jeyhun, Leonard, and Hang for your comments! Let me
address them from earliest to latest.

> How do you plan the review process in this case (e.g. incremental
over existing codebase or cumulative all at once) ?

I think incremental would be less time consuming and complex for
reviewers so I would leaning towards that direction. I would
imagine multiple subtasks for migrating the existing code, and
updating the deprecated interfaces, so those should be separate PRs and the 
release can be initiated when everything is merged.

> (1) About the release version, could you specify kudu connector version 
> instead of flink version 1.18 as external connector version is different with 
> flink?
> (2) About the connector config options, could you enumerate these options so 
> that we can review they’re reasonable or not?

I added these to the FLIP, copied the current configs options as is, PTAL.

> (3) Metrics is also key part of connector, could you add the supported 
> connector metrics to public interface as well?

The current Bahir conenctor code does not include any metrics and I did not 
plan to include them into the scope of this FLIP.

> I think that how to state this code originally lived in Bahir may be in the
FLIP.

I might miss your point, but the FLIP contains this: "Migrating the current 
code keeping the history and noting it explicitly it was forked from the Bahir 
repository [2]." Pls. share if you meant something else.

Best,
Ferenc



On Friday, March 8th, 2024 at 10:42, Hang Ruan  wrote:

> 
> 
> Hi, Ferenc.
> 
> Thanks for the FLIP discussion. +1 for the proposal.
> I think that how to state this code originally lived in Bahir may be in the
> FLIP.
> 
> Best,
> Hang
> 
> Leonard Xu xbjt...@gmail.com 于2024年3月7日周四 14:14写道:
> 
> > Thanks Ferenc for kicking off this discussion, I left some comments here:
> > 
> > (1) About the release version, could you specify kudu connector version
> > instead of flink version 1.18 as external connector version is different
> > with flink ?
> > 
> > (2) About the connector config options, could you enumerate these options
> > so that we can review they’re reasonable or not?
> > 
> > (3) Metrics is also key part of connector, could you add the supported
> > connector metrics to public interface as well?
> > 
> > Best,
> > Leonard
> > 
> > > 2024年3月6日 下午11:23,Ferenc Csaky ferenc.cs...@pm.me.INVALID 写道:
> > > 
> > > Hello devs,
> > > 
> > > Opening this thread to discuss a FLIP [1] about externalizing the Kudu
> > > connector, as recently
> > > the Apache Bahir project were moved to the attic [2]. Some details were
> > > discussed already
> > > in another thread [3]. I am proposing to externalize this connector and
> > > keep it maintainable,
> > > and up to date.
> > > 
> > > Best regards,
> > > Ferenc
> > > 
> > > [1]
> > > https://docs.google.com/document/d/1vHF_uVe0FTYCb6PRVStovqDeqb_C_FKjt2P5xXa7uhE
> > > [2] https://bahir.apache.org/
> > > [3] https://lists.apache.org/thread/2nb8dxxfznkyl4hlhdm3vkomm8rk4oyq


[jira] [Created] (FLINK-34631) Memory leak in pyflink when using state with RocksDB

2024-03-08 Thread Mark Lidenberg (Jira)
Mark Lidenberg created FLINK-34631:
--

 Summary: Memory leak in pyflink when using state with RocksDB 
 Key: FLINK-34631
 URL: https://issues.apache.org/jira/browse/FLINK-34631
 Project: Flink
  Issue Type: Bug
  Components: API / Python
Affects Versions: 1.18.1
Reporter: Mark Lidenberg


I have had issues with memory constantly growing on pyflink task managers, 
which should not really happen when we use RocksDB as our state backend. 

I've made a simple example to demonstrate the memory leak. In this example I 
update state with 1mb value for each key and then sleep for 1 second. Memory 
growth 1mb per second, as if the state value stays in memory. Same thing 
happens if I send 100 messages per second with 10kb each. Memory keeps growing 
indefinitely. I've also tested `MapState`, it's the same. 

 

```python 
import time

import psutil

from pyflink.common import Types
from pyflink.datastream import (
    EmbeddedRocksDBStateBackend,
    KeyedProcessFunction,
    RuntimeContext,
    StreamExecutionEnvironment,
)
from pyflink.datastream.state import ValueStateDescriptor


class Processor(KeyedProcessFunction):
    def open(self, runtime_context: RuntimeContext):
        self.state = runtime_context.get_state(
            ValueStateDescriptor(
                name="my_state",
                value_type_info=Types.STRING(),
            )
        )

    def process_element(self, value: int, ctx: KeyedProcessFunction.Context):
        print("Processing", value, "Memory: ", 
round(psutil.Process().memory_info().rss / 1024 / 1024, 2), "MB")

        # Processing 1 Memory:  171.25 MB -> Processing 2 Memory:  172.12 MB -> 
... grows 1mb per second, which should not happen because we use RocksDB as 
state backend
        self.state.update("a" * 1_000_000)  # 1 mb of data per second
        time.sleep(1.0)


if __name__ == "__main__":
    # - Create flink environment

    environment = 
StreamExecutionEnvironment.get_execution_environment().set_parallelism(1)

    # - Make sure to use RocksDB as state backend

    environment.set_state_backend(EmbeddedRocksDBStateBackend())

    # - Create pipeline

    (
        environment.from_collection(
            collection=list(range(3600 * 12)),
        )
        .key_by(lambda value: value)
        .process(Processor())
    )

    # - Execute pipeline

    environment.execute(job_name="memory_leak_test")

```



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34630) Pulsar source lost topic subscribe

2024-03-08 Thread WangMinChao (Jira)
WangMinChao created FLINK-34630:
---

 Summary: Pulsar source lost topic subscribe
 Key: FLINK-34630
 URL: https://issues.apache.org/jira/browse/FLINK-34630
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Pulsar
Affects Versions: pulsar-3.0.1
Reporter: WangMinChao


The non-partition pulsar topic partition id is `-1`, using multiples of the 
non-partition topics  
 in Pulsar source maybe lose topic subscribe.





--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34629) Pulsar source lost topic subscribe

2024-03-08 Thread WangMinChao (Jira)
WangMinChao created FLINK-34629:
---

 Summary: Pulsar source lost topic subscribe
 Key: FLINK-34629
 URL: https://issues.apache.org/jira/browse/FLINK-34629
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Pulsar
Affects Versions: pulsar-3.0.1
Reporter: WangMinChao


The non-partition pulsar topic partition id is `-1`, using multiples of the 
non-partition topics  
 in Pulsar source maybe lose topic subscribe.





--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34628) Pulsar source lost topic subscribe

2024-03-08 Thread WangMinChao (Jira)
WangMinChao created FLINK-34628:
---

 Summary: Pulsar source lost topic subscribe
 Key: FLINK-34628
 URL: https://issues.apache.org/jira/browse/FLINK-34628
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Pulsar
Affects Versions: pulsar-3.0.1
Reporter: WangMinChao


The non-partition pulsar topic partition id is `-1`, using multiples of the 
non-partition topics  
 in Pulsar source maybe lose topic subscribe.





--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34626) Pulsar source lost topic subscribe

2024-03-08 Thread WangMinChao (Jira)
WangMinChao created FLINK-34626:
---

 Summary: Pulsar source lost topic subscribe
 Key: FLINK-34626
 URL: https://issues.apache.org/jira/browse/FLINK-34626
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Pulsar
Affects Versions: pulsar-3.0.1
Reporter: WangMinChao


The non-partition pulsar topic partition id is `-1`, using multiples of the 
non-partition topics  
 in Pulsar source maybe lose topic subscribe.





--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34627) Pulsar source lost topic subscribe

2024-03-08 Thread WangMinChao (Jira)
WangMinChao created FLINK-34627:
---

 Summary: Pulsar source lost topic subscribe
 Key: FLINK-34627
 URL: https://issues.apache.org/jira/browse/FLINK-34627
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Pulsar
Affects Versions: pulsar-3.0.1
Reporter: WangMinChao


The non-partition pulsar topic partition id is `-1`, using multiples of the 
non-partition topics  
 in Pulsar source maybe lose topic subscribe.





--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34625) TTL doesn't seem to work in pyflink

2024-03-08 Thread Mark Lidenberg (Jira)
Mark Lidenberg created FLINK-34625:
--

 Summary: TTL doesn't seem to work in pyflink 
 Key: FLINK-34625
 URL: https://issues.apache.org/jira/browse/FLINK-34625
 Project: Flink
  Issue Type: Bug
  Components: API / Python
Affects Versions: 1.18.1
 Environment: Image used: flink:1.18.1-scala_2.12-java11
Reporter: Mark Lidenberg


I've made a simple example to test the ttl and couldn't get the expected 
results. I went further and replicated this example in Java and it worked just 
fine. There is an inconsistency in behavior, so there is something wrong in 
pyflink or my pyflink setup. 

Here is a code to reproduce. In the example I create a state with ttl 1 second 
and then process events every 1.5 seconds and print current state.  I expect it 
to print `None, None, None, ...` (because ttl expires after 1.5 seconds), but 
instead it prints `None, "state", "state, ...`. In Java it works as expected, 
prints `Null, Null, ...`

```python

import time

from pyflink.common import Time, Types
from pyflink.datastream import KeyedProcessFunction, RuntimeContext, 
StreamExecutionEnvironment
from pyflink.datastream.state import StateTtlConfig, ValueStateDescriptor


class Processor(KeyedProcessFunction):
    def open(self, runtime_context: RuntimeContext):
        state_descriptor = ValueStateDescriptor(
            name="my_state",
            value_type_info=Types.STRING(),
        )

        state_descriptor.enable_time_to_live(
            ttl_config=StateTtlConfig.new_builder(Time.seconds(1))
            .set_update_type(StateTtlConfig.UpdateType.OnCreateAndWrite)
            
.set_state_visibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
            .build()
        )

        self.state = runtime_context.get_state(state_descriptor)

    def process_element(self, value: int, ctx: KeyedProcessFunction.Context):
        # Print current state
        print(self.state.value())
        # expect to print `None` all the time, but prints: `None, 'state', 
'state', ...` instead

        # Update state
        self.state.update("state")

        # sleep to reset the state
        time.sleep(1.5)


if __name__ == "__main__":
    # Init environment
    environment = 
StreamExecutionEnvironment.get_execution_environment().set_parallelism(1)

    # Setup pipeline
    (
        environment.from_collection(
            collection=list(range(10)),
        )
        .key_by(lambda value: 0)
        .process(Processor())
    )

    # Execute pipeline
    environment.execute("ttl_test")

```

 

```java

import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

import java.io.IOException;
import java.time.LocalDateTime;

public class Processor extends KeyedProcessFunction {

    private transient ValueState state;


    @Override
    public void open(Configuration parameters) {
        var stateTtlConfig = StateTtlConfig
                .newBuilder(Time.seconds(1))
                .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
                
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
                .build();

        var stateDescriptor = new ValueStateDescriptor<>("state", String.class);
        stateDescriptor.enableTimeToLive(stateTtlConfig);
        state = getRuntimeContext().getState(stateDescriptor);

    }

    @Override
    public void processElement(String event, Context context, Collector 
collector) throws IOException, InterruptedException {
        // print state
        var state = state.value();
        System.out.println(state); # prints `Null, Null, ...` 

        // update state
        state.update(LocalDateTime.now().toString());

        // sleep to reset the state
        Thread.sleep(1500);
    }
}

```



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] Externalized Google Cloud Connectors

2024-03-08 Thread Martijn Visser
Hi Claire,

I don't think it's a good idea to actually develop outside of Apache;
contributions that have happened outside of the Apache realm do not play a
role when evaluating potential new committers. I think the best course of
action would be to create a FLIP to add these connectors to the ASF, while
trying to find one or two committers in the Flink project that are willing
to help with the reviews. Would that be possible?

Best regards,

Martijn

On Thu, Feb 15, 2024 at 12:39 PM Claire McCarthy
 wrote:

> Hi Alexander,
>
> Thanks so much for the info!
>
> It sounds like the best path forward is for us to develop outside of Apache
> while, in parallel, working to gain committer status. Our goal will be to
> eventually move anything we build under the Apache umbrella once we're more
> plugged in to the community.
>
> As for migrating the existing Pub/Sub connector to the new Source API, we
> actually have somebody currently building a new Pub/Sub connector from
> scratch (using the new Source API). Once that is ready, we will make sure
> to get that new implementation moved under Apache and help with the
> migration effort.
>
> Thanks again for the response and I'm sure we will be chatting soon!
>
> Best,
> Claire
>
> On Wed, Feb 14, 2024 at 7:36 AM Alexander Fedulov <
> alexander.fedu...@gmail.com> wrote:
>
> > Hi Claire,
> >
> > Thanks for reaching out. It's great that there is interest from Google
> > in spearheading the development of the respective Flink connectors.
> >
> > As of now,there is only one GCP-specific connector developed directly as
> > part
> > of ASF Flink, namely the Pub/Sub one. It has already been externalized
> here
> > [1].
> > Grouping further connectors under apache/flink-connectors-gcp makes
> sense,
> > but
> > it would be nice to first understand which GCP connectors you plan to add
> > before we create this new umbrella project.
> >
> > I do not think establishing a dedicated workgroup to help with the
> > GCP-specific
> > development is a realistic goal, though. The development will most
> probably
> > take
> > place on the regular ASF best effort basis (which involves mailing list
> > discussions,
> > reaching out to people for reviews, etc.) until your developers gain
> > committer status
> > and can work more independently.
> >
> > One immediate open item where the Flink community would definitely
> > appreciate your
> > help is with the migration of the existing Pub/Sub connector to the new
> > Source API.
> > As you can see here [2], it is one of the two remaining connectors where
> we
> > have not
> > yet made progress, and it seems like a great place to start the
> > collaboration.
> > Flink 2.0 aims to remove the SourceFunction API, which the current
> Pub/Sub
> > connector
> > relies on. It would be great if your colleagues could assist with this
> > effort [3].
> >
> > Best,
> > Alexander Fedulov
> >
> > [1] https://github.com/apache/flink-connector-gcp-pubsub
> > [2] https://issues.apache.org/jira/browse/FLINK-28045
> > [3] https://issues.apache.org/jira/browse/FLINK-32673
> >
> >
> >
> > On Tue, 13 Feb 2024 at 17:25, Claire McCarthy
> >  wrote:
> >
> > > Hi Devs!
> > >
> > > I’d like to kick off a discussion on setting up a repo for a new fleet
> of
> > > Google Cloud connectors.
> > >
> > > A bit of context:
> > >
> > >-
> > >
> > >We have a team of Google engineers who are looking to build/maintain
> > >5-10 GCP connectors for Flink.
> > >-
> > >
> > >We are wondering if it would make sense to host our connectors under
> > the
> > >ASF umbrella following a similar repo structure as AWS (
> > >https://github.com/apache/flink-connector-aws). In our case:
> > >apache/flink-connectors-gcp.
> > >-
> > >
> > >Currently, we have no Flink committers on our team. We are actively
> > >involved in the Apache Beam community and have a number of ASF
> members
> > > on
> > >the team.
> > >
> > >
> > > We saw that one of the original motivations for externalizing
> connectors
> > > was to encourage more activity and contributions around connectors by
> > > easing the contribution overhead. We understand that the decision was
> > > ultimately made to host the externalized connector repos under the ASF
> > > organization. For the same reasons (release infra, quality assurance,
> > > integration with the community, etc.), we would like all GCP connectors
> > to
> > > live under the ASF organization.
> > >
> > > We want to ask the Flink community what you all think of this idea, and
> > > what would be the best way for us to go about contributing something
> like
> > > this. We are excited to contribute and want to learn and follow your
> > > practices.
> > >
> > > A specific issue we know of is that our changes need approval from
> Flink
> > > committers. Do you have a suggestion for how best to go about a new
> > > contribution like ours from a team that does not have committers? Is it
> > > possible, for example, to partner 

Re: [DISCUSS] FLIP-419: Optimize multi-sink query plan generation

2024-03-08 Thread Martijn Visser
Hi Jeyhun Karimov,

I see that you've already opened up a VOTE thread, but since you're talking
about having a prototype already and results, I wondered if you could
include the POC and how you've tested these results in the FLIP?

Best regards,

Martijn

On Tue, Jan 30, 2024 at 4:47 AM Jeyhun Karimov  wrote:

> Hi devs,
>
> I just wanted to give an update on this FLIP.
> I updated the doc based on the comments from Jim.
> Also, I developed a prototype and did some testing.
>
> I in my small prototype I ran the following tests:
>
>-
>
>  
> org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinks1
>-
>
>  
> org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinks2
>-
>
>  
> org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinks3
>-
>
>  
> org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinks4
>-
>
>  
> org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinks5
>-
>
>  
> org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinksWithUDTF
>-
>
>  
> org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinksSplitOnUnion1
>-
>
>  
> org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinksSplitOnUnion2
>-
>
>  
> org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinksSplitOnUnion3
>-
>
>  
> org.apache.flink.table.planner.plan.stream.sql.DagOptimizationTest#testMultiSinksSplitOnUnion4
>
>
> These tests are e2e dag optimization, including query parsing, validation,
> optimization, and checking the results.
>
> In these e2e optimization tests, my prototype was 15-20% faster than
> existing Flink optimization structure (with the "cost" of simplifying the
> codebase).
>
>
> Any questions/comments are more than welcome.
>
>
> Regards,
>
> Jeyhun Karimov
>
> On Wed, Jan 17, 2024 at 9:11 PM Jeyhun Karimov 
> wrote:
>
> > Hi Jim,
> >
> > Thanks for your comments. Please find my answers below:
> >
> >1. StreamOptimizeContext may still be needed to pass the fact that we
> >>are optimizing a streaming query.  I don't think this class will go
> >> away
> >>completely.  (I agree it may become more simple if the kind or
> >>mini-batch configuration can be removed.)
> >
> >
> > What I meant is that it might go away if we get rid of
> > *isUpdateBeforeRequired* and *getMiniBatchInterval *fields.
> > Of course if we can get rid of only one of them, then the
> > *StreamOptimizeContext* class will not be removed but get simpler.
> > Will update the doc accordingly.
> >
> >2. How are the mini-batch and changelog inference rules tightly
> coupled?
> >>I looked a little bit and I haven't seen any connection between them.
> >> It
> >>seems like the changelog inference is what needs to run multiple
> times.
> >
> >
> > Sorry for the misunderstanding. The mini-batch and changelog inference
> are
> > not coupled among themselves but with the high-level optimization logic.
> > The idea is to separate the query optimization into 1) optimize 2) enrich
> > with changelog inference 3) enrich with mini-batch interval inference and
> > 4) rewrite
> >
> >3. I think your point about code complexity is unnecessary.
> >> StreamOptimizeContext
> >>extends org.apache.calcite.plan.Context which is used an interface to
> >> pass
> >>information and objects through the Calcite stack.
> >
> >
> > I partially agree. Please see my answer above for the question 1.
> >
> >4. Is an alternative where the complexity of the changelog
> optimization
> >>can be moved into the `FlinkChangelogModeInferenceProgram`?  (If this
> >> is
> >>coupling between the mini-batch and changelog rules, then this would
> >> not
> >>make sense.)
> >
> >
> > Good point. Yes, this is definitely an alternative.
> >
> >5. There are some other smaller refactorings.  I tried some of them
> >>here: https://github.com/apache/flink/pull/24108 Mostly, it is
> syntax
> >>and using lazy vals to avoid recomputing various things.  (Feel free
> to
> >>take whatever actually works; I haven't run the tests.)
> >
> >
> > I took a look at your PR. For sure, some of the refactorings I will reuse
> > (probably rebase by the time I have this ready :))
> >
> >
> > Separately, folks on the Calcite dev list are thinking about multi-query
> >> optimization:
> >> https://lists.apache.org/thread/mcdqwrtpx0os54t2nn9vtk17spkp5o5k
> >> https://issues.apache.org/jira/browse/CALCITE-6188
> >
> >
> > Seems interesting. But Calcite's MQO approach will probably require some
> > drastic changes in our codebase once we adopt it.
> > This approach is more incremental.
> >
> > Hope my comments answer your questions.
> >
> > Regards,
> > Jeyhun Karimov
> >
> > On Wed, Jan 17, 2024 at 2:36 AM Jim Hughes  >
> > wrote:
> >
> >> Hi Jeyhun,
> >>
> >>
> >> Generally, I like the idea of 

Re: [DISCUSS] Add "Special Thanks" Page on the Flink Website

2024-03-08 Thread Martijn Visser
Hi all,

I'm +1 on it. As long as we follow the ASF rules on this, we can thank
those that are/have made contributions.

Best regards,

Martijn

On Wed, Mar 6, 2024 at 7:45 AM Jark Wu  wrote:

> Hi Matthias,
>
> Thanks for your comments! Please see my reply inline.
>
> > What do we do if we have enough VMs? Do we still allow
> companies to add more VMs to the pool even though it's not adding any
> value?
>
> The ASF policy[1] makes it very clear: "Project Thanks pages are to show
> appreciation
> for goods that the project truly needs, not just for goods that someone
> wants to donate."
> Therefore, the community should reject new VMs if it is enough.
>
>
> > The community lacks the openly accessible tools to monitor the VM usage
> independently
> as far as I know (the Azure Pipelines project is owned by Ververica right
> now).
>
> The Azure pipeline account is sponsored by Ververica, and is managed by the
> community.
> AFAIK, Chesnay and Robert both have admin permissions [2] to the Azure
> pipeline project.
> Others can contact the managers to get access to the environment.
>
> > I figured that there could be a chance for us to
> rely on Apache-provided infrastructure entirely with our current workload
> when switching over from Azure Pipelines.
>
> That sounds great. We can return back the VMs and mark the donations as
> historical
> on the Thank Page once the new GitHub Actions CI is ready.
>
> > I am fine with creating a Thank You page to acknowledge the financial
> contributions from Alibaba and Ververica in the past (since Apache allows
> historical donations) considering that the contributions of the two
> companies go way back in time and are quite significant in my opinion. I
> suggest focusing on the past for now because of the option to migrate to
> Apache infrastructure midterm.
>
> Sorry, do you mean we only mention past donations for now?
> IIUC, the new GitHub Actions might be ready after the end of v1.20, which
> probably be in half a year.
> I'm worried that if we say the sponsorship is ongoing until now (but it's
> not), it will confuse
> people and disrespect the sponsor.
>
> Besides, I'm not sure whether the new GitHub Actions CI will replace the
> machines for running
> flink-ci mirrors [3] and the flink benchmarks [4]. If not, I think it's
> inappropriate to say they are
> historical donations.
>
> Furthermore, we are collecting all kinds of donations. I just noticed that
> AWS donated [5] service costs
> for flink-connector-aws tests that hit real AWS services. This is an
> ongoing donation and I think it's not
> good to mark it as a historical donation. (Thanks for the donation, AWS,
> @Danny
> Cranmer  @HongTeoh!
> We should add it to the Thank Page!)
>
> Best,
> Jark
>
>
> [1]: https://www.apache.org/foundation/marks/linking#projectthanks
> [2]:
>
> https://cwiki.apache.org/confluence/display/FLINK/Continuous+Integration#ContinuousIntegration-Contacts
>
> [3]:
>
> https://cwiki.apache.org/confluence/display/FLINK/Continuous+Integration#ContinuousIntegration-Repositories
>
> [4]: https://lists.apache.org/thread/bkw6ozoflgltwfwmzjtgx522hyssfko6
>
> [5]: https://issues.apache.org/jira/browse/INFRA-24474
>
> On Wed, 6 Mar 2024 at 17:58, Matthias Pohl  wrote:
>
> > Thanks for starting this discussion. I see the value of such a page if we
> > want to encourage companies to sponsor CI infrastructure in case we need
> > this infrastructure (as Yun Tang pointed out). The question is, though:
> Do
> > we need more VMs? The amount of commits to master is constantly
> decreasing
> > since its peak in 2019/2020 [1]. Did we observe shortage of CI runners in
> > the past years? What do we do if we have enough VMs? Do we still allow
> > companies to add more VMs to the pool even though it's not adding any
> > value? Then it becomes a marketing tool for companies. The community
> lacks
> > the openly accessible tools to monitor the VM usage independently as far
> as
> > I know (the Azure Pipelines project is owned by Ververica right now). My
> > concern is (which goes towards what Max is saying) that this can be a
> > source of friction in the community (even if it's not about individuals
> but
> > companies). I'm not sure whether the need for additional infrastructure
> > out-weights the risk for friction.
> >
> > On another note: After monitoring the GitHub Action workflows (FLIP-396
> > [2]) for the past weeks, I figured that there could be a chance for us to
> > rely on Apache-provided infrastructure entirely with our current workload
> > when switching over from Azure Pipelines. But that might be a premature
> > judgement because the monitoring started after the feature freeze of
> Flink
> > 1.19. We should wait with a final conclusion till the end of the 1.20
> > release cycle. Apache Infra increased the amount of VMs they are offering
> > since 2018 (when the Apache Flink community decided to go for Azure
> > Pipelines and custom VMs as far as I know). That's based on a
> conversation
> 

Re: [DISCUSS] Support the Ozone Filesystem

2024-03-08 Thread Martijn Visser
Hi Ferenc,

I'm +0: I have seen no demand for Ozone, but if the community is OK with
it, why not.

Best regards,

Martijn

On Mon, Feb 26, 2024 at 6:08 AM Ferenc Csaky 
wrote:

> Hi,
>
> gentle reminder on this thread, any opinions or thoughts?
>
> Regards,
> Ferenc
>
>
>
>
> On Thursday, February 8th, 2024 at 18:02, Ferenc Csaky
>  wrote:
>
> >
> >
> > Hello devs,
> >
> > I would like to start a discussion regarding Apache Ozone FS support. The
> > jira [1] is stale for quite a while, but supporting it with some
> limitations could
> > be done with minimal effort.
> >
> > Ozone do not have truncate() impl, so it falls to the same category as
> > Hadoop < 2.7 [2], on Datastream API it requires the usage of
> > OnCheckpointRollingPolicy when checkpointing enabled to make sure
> > the FileSink will not use truncate().
> >
> > Table API is a bit trickier, because checkpointing policy cannot be
> ocnfigured
> > explicitly (why?), it behaves differently regarding the write mode [3].
> Bulk mode
> > is covered, but for fow format, auto-compaction has to be set.
> >
> > Even with the mentioned limitations, I think it would worth to add
> support for OFS,
> > it would require 1 small change to enable "ofs" [4] and documenting the
> limitations.
> >
> > WDYT?
> >
> > Regards,
> > Ferenc
> >
> > [1] https://issues.apache.org/jira/browse/FLINK-28231
> > [2]
> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/datastream/filesystem/#general
> > [3]
> https://github.com/apache/flink/blob/a33a0576364ac3d9c0c038c74362f1faac8d47b8/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSink.java#L226
> > [4]
> https://github.com/apache/flink/blob/a33a0576364ac3d9c0c038c74362f1faac8d47b8/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableWriter.java#L62
>


[jira] [Created] (FLINK-34624) Enable local recovery in ChangelogRescalingITCase

2024-03-08 Thread Yanfei Lei (Jira)
Yanfei Lei created FLINK-34624:
--

 Summary: Enable local recovery in ChangelogRescalingITCase
 Key: FLINK-34624
 URL: https://issues.apache.org/jira/browse/FLINK-34624
 Project: Flink
  Issue Type: Technical Debt
  Components: Runtime / State Backends
Reporter: Yanfei Lei


Randomly enable local recovery in ChangelogRescalingITCase, since the local 
recovery of changelog state backend was supported by FLINK-27693 and 
FLINK-27692.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP Suggestion: Externalize Kudu Connector from Bahir

2024-03-08 Thread Hang Ruan
Hi, Ferenc.

Thanks for the FLIP discussion. +1 for the proposal.
I think that how to state this code originally lived in Bahir may be in the
FLIP.

Best,
Hang

Leonard Xu  于2024年3月7日周四 14:14写道:

> Thanks Ferenc for kicking off this discussion, I left some comments here:
>
> (1) About the release version, could you specify kudu connector version
> instead of flink version 1.18 as external connector version is different
> with flink ?
>
> (2) About the connector config options, could you enumerate these options
> so that we can review they’re reasonable or not?
>
> (3) Metrics is also key part of connector, could you add the supported
> connector metrics to public interface as well?
>
>
> Best,
> Leonard
>
>
> > 2024年3月6日 下午11:23,Ferenc Csaky  写道:
> >
> > Hello devs,
> >
> > Opening this thread to discuss a FLIP [1] about externalizing the Kudu
> connector, as recently
> > the Apache Bahir project were moved to the attic [2]. Some details were
> discussed already
> > in another thread [3]. I am proposing to externalize this connector and
> keep it maintainable,
> > and up to date.
> >
> > Best regards,
> > Ferenc
> >
> > [1]
> https://docs.google.com/document/d/1vHF_uVe0FTYCb6PRVStovqDeqb_C_FKjt2P5xXa7uhE
> > [2] https://bahir.apache.org/
> > [3] https://lists.apache.org/thread/2nb8dxxfznkyl4hlhdm3vkomm8rk4oyq
>
>


[jira] [Created] (FLINK-34623) Flink creating main.jar files in jobmanager jars upload folder

2024-03-08 Thread Nikhil_D (Jira)
Nikhil_D created FLINK-34623:


 Summary: Flink creating main.jar files in jobmanager jars upload 
folder
 Key: FLINK-34623
 URL: https://issues.apache.org/jira/browse/FLINK-34623
 Project: Flink
  Issue Type: Bug
  Components: flink-docker
Reporter: Nikhil_D


Flink is creating a main.jar and uploading it to jars upload folder. This would 
cause lot of problems like filling up jars folder, filling up the meatspace. 

Can we understand why this is happening and is there any way to resolve this 
through Flink config or set up.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)