Re: [Re-DISCUSS] FLIP-202: Introduce ClickHouse Connector

2023-08-23 Thread Jing Ge
Hi Conrad,

Thanks for driving it! +1 for starting a new round of discussion based on
the input you wrote in the comment. Look forward to the new design.

Best regards,
Jing

On Thu, Aug 24, 2023 at 8:00 AM ConradJam  wrote:

> Hi Community
>
> I want to re-initiate the discussion related to FLIP-202. This is a
> discussion related to the Clickhouse connector. Some previous discussions
> are summarized in [1]
>
> ● We need an external code warehouse to store this part of code, as
> @Martijn Visser said flink-clickhouse-connector to create
> ● Discuss the current Flink write or sink and clickhouse need to solve the
> common problems of users
>
>
> Now I want to redesign this part of the function. The relevant FLIP will be
> updated after collecting opinions. Welcome to join the discussion
>
> [1] FLINK-26999 
> :Introduce
> ClickHouse Connector
>
>
> --
> Best
>
> ConradJam
>


[jira] [Created] (FLINK-32950) statsd reporter does not follow spec for counters

2023-08-23 Thread Paymahn (Jira)
Paymahn created FLINK-32950:
---

 Summary: statsd reporter does not follow spec for counters
 Key: FLINK-32950
 URL: https://issues.apache.org/jira/browse/FLINK-32950
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Metrics
Reporter: Paymahn


The [statsd|https://github.com/statsd/statsd/blob/master/docs/metric_types.md] 
spec says the following:

> At each flush the current count is sent and reset to 0.

 

The flink [statsd 
reporter|https://github.com/apache/flink/blob/e5f78352a29df0d4dfe0c34369193896e7a1b4be/flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java#L129-L131]
 does not reset the counter to 0 after each flush. Instead it reports 
cumulative values. This is not correct and causes issues with downstream 
clients which consume these statsd metrics.

 

One possible fix would be do add the following as a class variable

 
{code:java}
  protected final Map lastKnownCounterValues = new 
ConcurrentHashMap<>();{code}
and then modify the {{reportCounter}} function like so
{code:java}
  private void reportCounter(final DMetric metric, final Counter counter) {
    // the statsd protocol says that counters should be set to 0 after flushing
    // 
https://github.com/statsd/statsd/blob/master/docs/metric_types.md#counting
    // we don't want to actually change the value of the counter because it 
could have uninteded
    // consequences. Instead, we keep track of the last known value and report 
the delta
    long curCount = counter.getCount();
    long lastKnownCount = this.lastKnownCounterValues.getOrDefault(counter, 0L);
    send(metric.getName(), curCount - lastKnownCount, DMetricType.COUNTER, 
metric.getTags());
    this.lastKnownCounterValues.put(counter, curCount);
  }{code}
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[Re-DISCUSS] FLIP-202: Introduce ClickHouse Connector

2023-08-23 Thread ConradJam
Hi Community

I want to re-initiate the discussion related to FLIP-202. This is a
discussion related to the Clickhouse connector. Some previous discussions
are summarized in [1]

● We need an external code warehouse to store this part of code, as
@Martijn Visser said flink-clickhouse-connector to create
● Discuss the current Flink write or sink and clickhouse need to solve the
common problems of users


Now I want to redesign this part of the function. The relevant FLIP will be
updated after collecting opinions. Welcome to join the discussion

[1] FLINK-26999  :Introduce
ClickHouse Connector


-- 
Best

ConradJam


Re: [Discuss] FLIP-355: Add parent dir of files to classpath using yarn.provided.lib.dirs

2023-08-23 Thread Yang Wang
+1 for this FLIP

Maybe a FLIP is an overkill for this enhancement.


Best,
Yang

Venkatakrishnan Sowrirajan  于2023年8月23日周三 01:44写道:

> Thanks for the FLIP, Archit.
>
> This is definitely quite a useful addition to the *yarn.provided.lib.dirs*
> . +1.
>
> IMO, except for the fact that *yarn.provided.lib.dirs* (platform specific
> jars can be cached) takes only directories whereas *yarn.ship-files* (user
> files) takes both files and dirs, the overall logic in terms of
> constructing the classpath in both the cases should be roughly the same.
>
> Referencing the PR (https://github.com/apache/flink/pull/23164) with the
> initial implementation you created as well here.
>
> Regards
> Venkata krishnan
>
>
> On Tue, Aug 22, 2023 at 10:09 AM Archit Goyal  >
> wrote:
>
> > Hi all,
> >
> > Gentle ping if I can get a review on the FLIP.
> >
> > Thanks,
> > Archit Goyal
> >
> > From: Archit Goyal 
> > Date: Thursday, August 17, 2023 at 5:51 PM
> > To: dev@flink.apache.org 
> > Subject: [Discuss] FLIP-355: Add parent dir of files to classpath using
> > yarn.provided.lib.dirs
> > Hi All,
> >
> > I am opening this thread to discuss the proposal to add parent
> directories
> > of files to classpath when using yarn.provided.lib.dirs. This is
> documented
> > in FLIP-355 <
> >
> https://urldefense.com/v3/__https://cwiki.apache.org/confluence/display/FLINK/FLIP*355*3A*Add*parent*dir*of*files*to*classpath*using*yarn.provided.lib.dirs__;KyUrKysrKysrKys!!IKRxdwAv5BmarQ!fFlyBKWuWwYcWfOcpLflTTi36tyHPiENIUry9J0ygaZY0VURnIs0glu5yafV0w0tfSsnOb9ZxDD9Cjw2TApTekVU$
> > >.
> >
> > This FLIP mentions about enhancing YARN's classpath configuration to
> > include parent directories of files in yarn.provided.lib.dirs.
> >
> > Please feel free to reply to this email thread and share your opinions.
> >
> > Thanks,
> > Archit Goyal
> >
>


Re: [DISCUSS] FLIP-334 : Decoupling autoscaler and kubernetes

2023-08-23 Thread Matt Wang
Sorry for the late reply, I still have a small question here:
Does the interface function of handlerRecommendedParallelism
in AutoScalerEventHandler conflict with
handlerScalingFailure/handlerScalingReport (one of the
handles the event of scale failure, and the other handles
the event of scale success).



--

Best,
Matt Wang


 Replied Message 
| From | Rui Fan<1996fan...@gmail.com> |
| Date | 08/21/2023 17:41 |
| To |  |
| Cc | Maximilian Michels ,
Gyula Fóra ,
Matt Wang |
| Subject | Re: [DISCUSS] FLIP-334 : Decoupling autoscaler and kubernetes |
Hi Max, Gyula and Matt,

Do you have any other comments?

The flink-kubernetes-operator 1.6 has been released recently,
it's a good time to kick off this FLIP.

Please let me know if you have any questions or concerns,
looking forward to your feedback, thanks!

Best,
Rui

On Wed, Aug 9, 2023 at 11:55 AM Rui Fan <1996fan...@gmail.com> wrote:

Hi Matt Wang,

Thanks for your discussion here.

it is recommended to unify the descriptions of AutoScalerHandler
and AutoScalerEventHandler in the FLIP

Good catch, I have updated all AutoScalerHandler to
AutoScalerEventHandler.

Can it support the use of zookeeper (zookeeper is a relatively
common use of flink HA)?

In my opinion, it's a good suggestion. However, I prefer we
implement other state stores in the other FLINK JIRA, and
this FLIP focus on the decoupling and implementing the
necessary state store. Does that make sense?

Regarding each scaling information, can it be persisted in
the shared file system through the filesystem? I think it will
be a more valuable requirement to support viewing
Autoscaling info on the UI in the future, which can provide
some foundations in advance;

This is a good suggestion as well. It's useful for users to check
the scaling information. I propose to add a CompositeEventHandler,
it can include multiple EventHandlers.

However, as the last question, I prefer we implement other
event handler in the other FLINK JIRA. What do you think?

A solution mentioned in FLIP is to initialize the
AutoScalerEventHandler object every time an event is
processed.

No, the FLIP mentioned `The AutoScalerEventHandler  object is shared for
all flink jobs`,
So the AutoScalerEventHandler is only initialized once.

And we call the AutoScalerEventHandler#handlerXXX
every time an event is processed.

Best,
Rui

On Tue, Aug 8, 2023 at 9:40 PM Matt Wang  wrote:

Hi Rui

Thanks for driving the FLIP.

I agree with the point fo this FLIP. This FLIP first provides a
general function of Autoscaler in Flink repo, and there is no
need to move kubernetes-autoscaler from kubernetes-operator
to Flink repo in this FLIP(it is recommended to unify the
descriptions of AutoScalerHandler and AutoScalerEventHandler
in the FLIP). Here I still have a few questions:

1. AutoScalerStateStore mainly records the state information
during Scaling. In addition to supporting the use of configmap,
can it support the use of zookeeper (zookeeper is a relatively
common use of flink HA)?
2. Regarding each scaling information, can it be persisted in
the shared file system through the filesystem? I think it will
be a more valuable requirement to support viewing
Autoscaling info on the UI in the future, which can provide
some foundations in advance;
3. A solution mentioned in FLIP is to initialize the
AutoScalerEventHandler object every time an event is
processed. What is the main purpose of this solution?



--

Best,
Matt Wang


 Replied Message 
| From | Rui Fan<1996fan...@gmail.com> |
| Date | 08/7/2023 11:34 |
| To |  |
| Cc | m...@apache.org ,
Gyula Fóra |
| Subject | Re: [DISCUSS] FLIP-334 : Decoupling autoscaler and kubernetes
|
Hi Ron:

Thanks for the feedback! The goal is indeed to turn the autoscaler into
a general tool that can support other resource management.


Hi Max, Gyula:

My proposed `AutoScalerStateStore` is similar to Map, it can really be
improved.

public interface AutoScalerStateStore {
Map getState(KEY jobKey)
void updateState(KEY jobKey, Map state);
}

From the method parameter, the StateStore is shared by all jobs, right?
If yes, the `KEY jobKey` isn't enough because the CR is needed during
creating the ConfigMap. The `jobKey` is ResourceID, the extraJobInfo is
CR.

So, this parameter may need to be changed from `KEY jobKey` to
`JobAutoScalerContext`. Does that make sense?
If yes, I can update the interface in the FLIP doc.

Best,
Rui

On Mon, Aug 7, 2023 at 10:18 AM liu ron  wrote:

Hi, Rui

Thanks for driving the FLIP.

The tuning of streaming jobs by autoscaler is very important. Although the
mainstream trend now is cloud-native, many companies still run their Flink
jobs on Yarn for historical reasons. If we can decouple autoscaler from
K8S
and turn it into a common tool that can support other resource management
frameworks such as Yarn, I think it will be very meaningful.
+1 for this proposal.

Best,
Ron


Gyula Fóra  于2023年8月5日周六 15:03写道:

Hi Rui!

Thanks for the proposal.

I agree with Max on tha

[jira] [Created] (FLINK-32949) Allow specifying the ServerSocket port for the collect function when accessing the TaskManager from the client.

2023-08-23 Thread JiaJian He (Jira)
JiaJian He created FLINK-32949:
--

 Summary: Allow specifying the ServerSocket port for the collect 
function when accessing the TaskManager from the client.
 Key: FLINK-32949
 URL: https://issues.apache.org/jira/browse/FLINK-32949
 Project: Flink
  Issue Type: Improvement
  Components: API / Core, API / DataStream, Runtime / Configuration
Reporter: JiaJian He


In the context of [#12069|https://github.com/apache/flink/pull/12069], the 
initialization of the {{CollectSinkFunction$ServerThread}} currently uses port 
0, which corresponds to a random port assignment.

Issues might arise under the following circumstances:
 # When the JobManager and TaskManager are deployed on different servers.
 # When network communication between servers requires specific ports to be 
open.
 # When using {{sql-client.sh}} at the JobManager to execute operations like 
selecting data, the CollectSinkFunction$ServerThread running on the TaskManager 
using a random port can lead to data retrieval failures.

The purpose of this pull request is to address this problem by introducing a 
configuration parameter, 'taskmanager.collect.port', which allows specifying 
the port for the {{{}CollectSinkFunction$ServerThread{}}}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-356: Support Nested Fields Filter Pushdown

2023-08-23 Thread Venkatakrishnan Sowrirajan
Becket and Jark,

 Deprecate all the other
> methods except tryApplyFilters() and tryApplyProjections().

For *SupportsProjectionPushDown*, we still need a
*supportsNestedProjections* API on the table source as some of the table
sources might not be able to handle nested fields and therefore the Flink
planner should not push down the nested projections or else the
*applyProjection
*API has to be appropriately changed to return
*unconvertibleProjections *similar
to *SupportsFilterPushDown*.

Or we have to introduce two different applyProjections()
> methods for FieldReferenceExpression / NestedFieldReferenceExpression
> respectively.

Agree this is not preferred. Given that *supportNestedProjections *cannot
be deprecated/removed based on the current API form, extending
*FieldReferenceExpression* to support nested fields should be okay.

Another alternative could be to change *applyProjections *to take
List and on the connector side they choose to handle
*FieldReferenceExpression* and *NestedFieldReferenceExpression *as
applicable and return the remainingProjections. In the case of nested field
projections not supported, it should return them back but only projecting
the top level fields. IMO, this is also *not preferred*.

*SupportsAggregatePushDown*

*AggregateExpression *currently takes in a list of
*FieldReferenceExpression* as args for the aggregate function, if in future
*SupportsAggregatePushDown* adds support for aggregate pushdown on nested
fields then the AggregateExpression API also has to change if a new
NestedFieldReferenceExpression is introduced for nested fields.

If we add a
> flag for each new filter,
> the interface will be filled with lots of flags (e.g., supportsBetween,
> supportsIN)

In an ideal situation, I completely agree with you. But in the current
state, *supportsNestedFilters* can act as a bridge to reach the eventual
desired state which is to have a clean and consistent set of APIs
throughout all Supports*PushDown.

Also shared some thoughts on the end state API

with extension to the *FieldReferenceExpression* to support nested fields.
Please take a look.

Regards
Venkata krishnan

On Tue, Aug 22, 2023 at 5:02 PM Becket Qin  wrote:

> Hi Jark,
>
> Regarding the migration path, it would be useful to scrutinize the use case
> of FiledReferenceExpression and ResolvedExpressions. There are two kinds of
> use cases:
>
> 1. A ResolvedExpression is constructed by the user or connector / plugin
> developers.
> 2. A ResolvedExpression is constructed by the framework and passed to user
> or connector / plugin developers.
>
> For the first case, both of the approaches provide the same migration
> experience.
>
> For the second case, generally speaking, introducing
> NestedFieldReferenceExpression and extending FieldReferenceExpression would
> have the same impact for backwards compatibility. SupportsFilterPushDown is
> a special case here because understanding the filter expressions is
> optional for the source implementation. In other use cases, if
> understanding the reference to a nested field is a must have, the user code
> has to be changed, regardless of which approach we take to support nested
> fields.
>
> Therefore, I think we have to check each public API where the nested field
> reference is exposed. If we have many public APIs where understanding
> nested fields is optional for the user  / plugin / connector developers,
> having a separate NestedFieldReferenceExpression would have a more smooth
> migration. Otherwise, there seems to be no difference between the two
> approaches.
>
> Migration path aside, the main reason I prefer extending
> FieldReferenceExpression over a new NestedFieldReferenceExpression is
> because this makes the SupportsProjectionPushDown interface simpler.
> Otherwise, we have to treat it as a special case that does not match the
> overall API style. Or we have to introduce two different applyProjections()
> methods for FieldReferenceExpression / NestedFieldReferenceExpression
> respectively. This issue further extends to implementation in addition to
> public API. A single FieldReferenceExpression might help simplify the
> implementation code a little bit. For example, in a recursive processing of
> a row with nested rows, we may not need to switch between
> FieldReferenceExpression and NestedFieldReferenceExpression depending on
> whether the record being processed is a top level record or nested record.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
>
> On Tue, Aug 22, 2023 at 11:43 PM Jark Wu  wrote:
>
> > Hi Becket,
> >
> > I totally agree we should try to have a consistent API for a final state.
> > The only concern I have mentioned is the "smooth" migration path.
> > The FiledReferenceExpression is widely used in many public APIs,
> > not only in the SupportsFilterPushDown. Yes, we can change every
> > methods in 2-steps, but is it good to change API back a

Re: Flink 1.17.2 planned?

2023-08-23 Thread Jing Ge
Hi Christian,

Thanks for your understanding. We will take a look at 1.17.2, once the 1.18
release is done. In the meantime, there might be someone in the community
who volunteers to be the 1.17.2 release manager. You will see related email
threads on the Dev. Stay tuned please :-)

Best regards,
Jing

On Wed, Aug 23, 2023 at 9:27 AM Christian Lorenz 
wrote:

> Hi Jing,
>
>
>
> thanks for the answer. I have no idea what kind of work is needed for
> being a release manager. I think we’ll have to wait for the release then
> (if really urgent, the blocking bug can also be patched by us).
>
>
>
> Kind regards,
>
> Christian
>
>
>
> *Von: *Jing Ge via user 
> *Datum: *Dienstag, 22. August 2023 um 11:40
> *An: *liu ron 
> *Cc: *u...@flink.apache.org 
> *Betreff: *Re: Flink 1.17.2 planned?
>
> This email has reached Mapp via an external source
>
>
>
> Hi Christian,
>
>
>
> Thanks for reaching out. Liked Ron pointed out that the community is
> focusing on the 1.18 release. If you are facing urgent issues, would you
> like to volunteer as the release manager of 1.17.2 and drive the release?
> Theoretically, everyone could be the release manager of a bugs fix release.
>
>
>
> Best regards,
>
> Jing
>
>
>
> On Tue, Aug 22, 2023 at 3:41 AM liu ron  wrote:
>
> Hi, Christian
>
>
>
> We released 1.17.1 [1] in May, and the main focus of the community is
> currently on the 1.18 release, so 1.17.2 should be planned for after the
> 1.18 release!
>
>
>
> [1]
> https://flink.apache.org/2023/05/25/apache-flink-1.17.1-release-announcement/
>
>
>
>
>
> Best,
>
> Ron
>
>
>
> Christian Lorenz via user  于2023年8月21日周一 17:33写道:
>
> Hi team,
>
>
>
> are there any infos about a bugfix release 1.17.2 available? E.g. will
> there be another bugfix release of 1.17 / approximate timing?
>
> We are hit by https://issues.apache.org/jira/browse/FLINK-32296 which
> leads to wrong SQL responses in some circumstances.
>
>
>
> Kind regards,
>
> Christian
>
> This e-mail is from Mapp Digital Group and its international legal
> entities and may contain information that is confidential.
> If you are not the intended recipient, do not read, copy or distribute the
> e-mail or any attachments. Instead, please notify the sender and delete the
> e-mail and any attachments.
>
> This e-mail is from Mapp Digital Group and its international legal
> entities and may contain information that is confidential.
> If you are not the intended recipient, do not read, copy or distribute the
> e-mail or any attachments. Instead, please notify the sender and delete the
> e-mail and any attachments.
>


[jira] [Created] (FLINK-32948) Minimize visibility of parameterized tests

2023-08-23 Thread Jiabao Sun (Jira)
Jiabao Sun created FLINK-32948:
--

 Summary: Minimize visibility of parameterized tests
 Key: FLINK-32948
 URL: https://issues.apache.org/jira/browse/FLINK-32948
 Project: Flink
  Issue Type: Sub-task
  Components: Tests
Affects Versions: 1.17.1
Reporter: Jiabao Sun


Since  was [FLINK-32942|https://issues.apache.org/jira/browse/FLINK-32942] was 
merged into master, we can minimize visibility of parameterized tests.

Before
{code:java}
@ExtendWith(ParameterizedTestExtension.class)
public class ParameterizedTestExtensionTest {

private static final List PARAMETERS = Arrays.asList(1, 2);

@Parameter
public Integer parameter;

@Parameters
public static List parameters() {
return PARAMETERS;
}

@TestTemplate
void testWithParameters() {
assertThat(parameter).isIn(PARAMETERS);
}
}
{code}

Now
{code:java}
@ExtendWith(ParameterizedTestExtension.class)
class ParameterizedTestExtensionTest {

private static final List PARAMETERS = Arrays.asList(1, 2);

@Parameter
private Integer parameter;

@Parameters
private static List parameters() {
return PARAMETERS;
}

@TestTemplate
void testWithParameters() {
assertThat(parameter).isIn(PARAMETERS);
}
}
{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32947) Autoscaler standalone mode supports the metric report

2023-08-23 Thread Rui Fan (Jira)
Rui Fan created FLINK-32947:
---

 Summary: Autoscaler standalone mode supports the metric report
 Key: FLINK-32947
 URL: https://issues.apache.org/jira/browse/FLINK-32947
 Project: Flink
  Issue Type: New Feature
  Components: Autoscaler
Reporter: Rui Fan


 FLIP-334 is decoupling the autoscaler and kubernetes, the autoscaler can run 
with standalone mode after FLIP-334.

The flink kubernetes operator itself sets up the metrics reporters and provides 
the base metric groups, for standalone implementation we need to create a new 
metric reporter otherwise we cannot report the metrics.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[DISCUSS][2.0] Deprecating Accumulator in favor of MetricsGroup

2023-08-23 Thread Matthias Pohl
Hi everyone,
I was looking into serializing the ArchivedExecutionGraph for another FLIP
and came across Accumulators [1] (don't mix that one up with the window
accumulators of the Table/SQL API). Accumulators were introduced in Flink
quite a while ago in Statosphere PR #340 [2].

I had a brief chat with Chesnay about it who pointed out that there was an
intention to use this for collecting metrics in the past. The Accumulator
JavaDoc provides a hint that it was inspired by Hadoop's Counter concept
[3] which also sounds like it is more or less equivalent to Flink's metrics.

The Accumulator is currently accessible through the RuntimeContext
interface which provides addAccumuator [4] and getAccumulator [5]. Usages
for these messages appear in the following classes:
- CollectSinkFunction [6]: Here it's used to collect the final data when
closing the function. This feels like a misuse of the feature. Instead, the
CollectSink could block the close call until all data was fetched from the
client program.
- DataSet.collect() [7]: Uses CollectHelper utilizes
SerializedListAccumulator to collect the final data similarly to
CollectSinkFunction
- EmptyFieldsCountAccumulator [8] is an example program that counts empty
fields. This could be migrated to MetricGroups
- ChecksumHashCodeHelper [9] is used in DataSetUtils where the calling
method is marked as deprecated for 2.0 already
- CollectOutputFormat [10] uses SerializedListAccumulator analogously to
DataSet.collect(). This class will be removed with the removal of the Scala
API in 2.0.

The initial investigation brings me to the conclusion that we can remove
the Accumulator feature in favor of Metrics and proper collect
implementations: That would also help cleaning up the
(Archived)ExecutionGraph: IMHO, we should have a clear separation between
Metrics (which are part of the ExecutionGraph) and processed data (which
shouldn't be part of the ExecutionGraph).

I'm curious what others think about this. Did I miss a scenario where
Accumulators are actually needed? Or is this already part of some other 2.0
effort [11] which I missed? I would suggest removing it could be a
nice-to-have item for 2.0.

Best,
Matthias



[1]
https://github.com/apache/flink/blob/c6997c97c575d334679915c328792b8a3067cfb5/flink-core/src/main/java/org/apache/flink/api/common/accumulators/Accumulator.java

[2] https://github.com/stratosphere/stratosphere/pull/340
[3]
https://hadoop.apache.org/docs/stable/api/org/apache/hadoop/mapreduce/Counters.html
[4]
https://github.com/apache/flink/blob/63ee60859cac64f2bc6cfe2c5015ceb1199cea9c/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java#L156
[5]
https://github.com/apache/flink/blob/63ee60859cac64f2bc6cfe2c5015ceb1199cea9c/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java#L165

[6]
https://github.com/apache/flink/blob/5ae8cb0503449b07f76d0ab621c3e81734496b26/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkFunction.java#L304
[7]
https://github.com/apache/flink/blob/c6997c97c575d334679915c328792b8a3067cfb5/flink-java/src/main/java/org/apache/flink/api/java/Utils.java#L145
[8]
https://github.com/apache/flink/blob/aa98c18d2ba975479fcfa4930b0139fa575d303e/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/EmptyFieldsCountAccumulator.java#L156
[9]
https://github.com/apache/flink/blob/c6997c97c575d334679915c328792b8a3067cfb5/flink-java/src/main/java/org/apache/flink/api/java/Utils.java#L256
[10]
https://github.com/apache/flink/blob/91d81c427aa6312841ca868d54e8ce6ea721cd60/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/sinks/CollectTableSink.scala#L70

[11] https://cwiki.apache.org/confluence/display/FLINK/2.0+Release

-- 

[image: Aiven] 

*Matthias Pohl*
Opensource Software Engineer, *Aiven*
matthias.p...@aiven.io|  +49 170 9869525
aiven.io    |   
     
*Aiven Deutschland GmbH*
Alexanderufer 3-7, 10117 Berlin
Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen
Amtsgericht Charlottenburg, HRB 209739 B


Re: [ANNOUNCE] release-1.18 branch cut

2023-08-23 Thread Rui Fan
Thanks for the quick response!

Best,
Rui

On Wed, Aug 23, 2023 at 9:46 PM Jing Ge  wrote:

> yes please
>
> On Wed, Aug 23, 2023 at 3:35 PM Rui Fan <1996fan...@gmail.com> wrote:
>
> > Hi Jing,
> >
> > Thanks for the effort and update!
> >
> > It means the PR of flink-1.19 can be merged to master branch, right?
> >
> > Best,
> > Rui
> >
> > On Wed, Aug 23, 2023 at 9:29 PM Jing Ge 
> > wrote:
> >
> > > Hi devs, The release-1.18 branch has been forked out from the master
> > > branch, with commit ID cfa4a9c35563fd8a5973ec2f35251190e365be14. The
> > > version on the master branch has been upgraded to 1.19-SNAPSHOT. From
> now
> > > on, for PRs that should be presented in 1.18.0, please make sure: Merge
> > the
> > > PR into both master and release-1.18 branches The JIRA ticket should be
> > > closed with the correct fix-versions (1.18.0). The umbrella issue [1]
> for
> > > release testing has been created. Please create subtasks for your new
> > > features under this issue, and make a detailed description on how to
> > verify
> > > it. We plan to finish all release testing in the next two weeks (until
> > Sept
> > > 05, 2023), and please update the “X-team verified” column in the 1.18
> > > release wiki page [2] in the meantime. Also, we’d like to thank all
> > > contributors who put effort into stabilizing the CI on the master
> branch
> > in
> > > the past week, and look forward to stabilizing new features in the
> coming
> > > weeks. Good luck with your release testing! Best regards,
> > > Konstantin, Sergey, Qingsheng, and Jing
> > >
> > >
> > > [1] https://issues.apache.org/jira/browse/FLINK-32726
> > > [2] https://cwiki.apache.org/confluence/display/FLINK/1.18+Release
> > >
> >
>


Re: [ANNOUNCE] release-1.18 branch cut

2023-08-23 Thread Jing Ge
yes please

On Wed, Aug 23, 2023 at 3:35 PM Rui Fan <1996fan...@gmail.com> wrote:

> Hi Jing,
>
> Thanks for the effort and update!
>
> It means the PR of flink-1.19 can be merged to master branch, right?
>
> Best,
> Rui
>
> On Wed, Aug 23, 2023 at 9:29 PM Jing Ge 
> wrote:
>
> > Hi devs, The release-1.18 branch has been forked out from the master
> > branch, with commit ID cfa4a9c35563fd8a5973ec2f35251190e365be14. The
> > version on the master branch has been upgraded to 1.19-SNAPSHOT. From now
> > on, for PRs that should be presented in 1.18.0, please make sure: Merge
> the
> > PR into both master and release-1.18 branches The JIRA ticket should be
> > closed with the correct fix-versions (1.18.0). The umbrella issue [1] for
> > release testing has been created. Please create subtasks for your new
> > features under this issue, and make a detailed description on how to
> verify
> > it. We plan to finish all release testing in the next two weeks (until
> Sept
> > 05, 2023), and please update the “X-team verified” column in the 1.18
> > release wiki page [2] in the meantime. Also, we’d like to thank all
> > contributors who put effort into stabilizing the CI on the master branch
> in
> > the past week, and look forward to stabilizing new features in the coming
> > weeks. Good luck with your release testing! Best regards,
> > Konstantin, Sergey, Qingsheng, and Jing
> >
> >
> > [1] https://issues.apache.org/jira/browse/FLINK-32726
> > [2] https://cwiki.apache.org/confluence/display/FLINK/1.18+Release
> >
>


Re: [ANNOUNCE] release-1.18 branch cut

2023-08-23 Thread Rui Fan
Hi Jing,

Thanks for the effort and update!

It means the PR of flink-1.19 can be merged to master branch, right?

Best,
Rui

On Wed, Aug 23, 2023 at 9:29 PM Jing Ge  wrote:

> Hi devs, The release-1.18 branch has been forked out from the master
> branch, with commit ID cfa4a9c35563fd8a5973ec2f35251190e365be14. The
> version on the master branch has been upgraded to 1.19-SNAPSHOT. From now
> on, for PRs that should be presented in 1.18.0, please make sure: Merge the
> PR into both master and release-1.18 branches The JIRA ticket should be
> closed with the correct fix-versions (1.18.0). The umbrella issue [1] for
> release testing has been created. Please create subtasks for your new
> features under this issue, and make a detailed description on how to verify
> it. We plan to finish all release testing in the next two weeks (until Sept
> 05, 2023), and please update the “X-team verified” column in the 1.18
> release wiki page [2] in the meantime. Also, we’d like to thank all
> contributors who put effort into stabilizing the CI on the master branch in
> the past week, and look forward to stabilizing new features in the coming
> weeks. Good luck with your release testing! Best regards,
> Konstantin, Sergey, Qingsheng, and Jing
>
>
> [1] https://issues.apache.org/jira/browse/FLINK-32726
> [2] https://cwiki.apache.org/confluence/display/FLINK/1.18+Release
>


[ANNOUNCE] release-1.18 branch cut

2023-08-23 Thread Jing Ge
Hi devs, The release-1.18 branch has been forked out from the master
branch, with commit ID cfa4a9c35563fd8a5973ec2f35251190e365be14. The
version on the master branch has been upgraded to 1.19-SNAPSHOT. From now
on, for PRs that should be presented in 1.18.0, please make sure: Merge the
PR into both master and release-1.18 branches The JIRA ticket should be
closed with the correct fix-versions (1.18.0). The umbrella issue [1] for
release testing has been created. Please create subtasks for your new
features under this issue, and make a detailed description on how to verify
it. We plan to finish all release testing in the next two weeks (until Sept
05, 2023), and please update the “X-team verified” column in the 1.18
release wiki page [2] in the meantime. Also, we’d like to thank all
contributors who put effort into stabilizing the CI on the master branch in
the past week, and look forward to stabilizing new features in the coming
weeks. Good luck with your release testing! Best regards,
Konstantin, Sergey, Qingsheng, and Jing


[1] https://issues.apache.org/jira/browse/FLINK-32726
[2] https://cwiki.apache.org/confluence/display/FLINK/1.18+Release


Re: [DISCUSS] FLIP-319: Integrating with Kafka’s proper support for 2PC participation (KIP-939).

2023-08-23 Thread Gyula Fóra
Hi Gordon!

Thank you for preparing the detailed FLIP, I think this is a huge
improvement that enables the exactly-once Kafka sink in many environments /
use-cases where this was previously unfeasible due to the limitations
highlighted in the FLIP.

Big +1

Cheers,
Gyula

On Fri, Aug 18, 2023 at 7:54 PM Tzu-Li (Gordon) Tai 
wrote:

> Hi Flink devs,
>
> I’d like to officially start a discussion for FLIP-319: Integrating with
> Kafka’s proper support for 2PC participation (KIP-939) [1].
>
> This is the “sister” joint FLIP for KIP-939 [2] [3]. It has been a
> long-standing issue that Flink’s Kafka connector doesn’t work fully
> correctly under exactly-once mode due to lack of distributed transaction
> support in the Kafka transaction protocol. This has led to subpar hacks in
> the connector such as Java reflections to workaround the protocol's
> limitations (which causes a bunch of problems on its own, e.g. long
> recovery times for the connector), while still having corner case scenarios
> that can lead to data loss.
>
> This joint effort with the Kafka community attempts to address this so that
> the Flink Kafka connector can finally work against public Kafka APIs, which
> should result in a much more robust integration between the two systems,
> and for Flink developers, easier maintainability of the code.
>
> Obviously, actually implementing this FLIP relies on the joint KIP being
> implemented and released first. Nevertheless, I'd like to start the
> discussion for the design as early as possible so we can benefit from the
> new Kafka changes as soon as it is available.
>
> Looking forward to feedback and comments on the proposal!
>
> Thanks,
> Gordon
>
> [1]
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=255071710
> [2]
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-939%3A+Support+Participation+in+2PC
> [3] https://lists.apache.org/thread/wbs9sqs3z1tdm7ptw5j4o9osmx9s41nf
>


[jira] [Created] (FLINK-32946) Start End of Life discussion thread for now outdated Flink minor version

2023-08-23 Thread Matthias Pohl (Jira)
Matthias Pohl created FLINK-32946:
-

 Summary: Start End of Life discussion thread for now outdated 
Flink minor version
 Key: FLINK-32946
 URL: https://issues.apache.org/jira/browse/FLINK-32946
 Project: Flink
  Issue Type: Sub-task
Reporter: Matthias Pohl


The idea is to discuss whether we should do a final release for the now not 
supported minor version in the community. Such a minor release shouldn't be 
covered by the current minor version release managers. Their only 
responsibility is to trigger the discussion.

The intention of a final patch release for the now unsupported Flink minor 
version is to flush out all the fixes that didn't end up in the previous 
release.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32945) NullPointerException when executing TopSpeedWindowing example with checkpointing enabled

2023-08-23 Thread Matthias Pohl (Jira)
Matthias Pohl created FLINK-32945:
-

 Summary: NullPointerException when executing TopSpeedWindowing 
example with checkpointing enabled
 Key: FLINK-32945
 URL: https://issues.apache.org/jira/browse/FLINK-32945
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Affects Versions: 1.18.0
Reporter: Matthias Pohl
 Attachments: data.txt.gz

I tried running the TopSpeedWindowing example with a checkpoint interval of 10s 
and the data input file that I attached to this Jira. I run into the following 
NullPointerException:
{code}
2023-08-23 09:56:03,111 ERROR 
org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor   [] - Error while 
executing remote procedure call public void 
org.apache.flink.runtime.jobmaster.JobMaster.notifyEndOfData(org.apache.flink.runtime.executiongraph.ExecutionAttemptID).
java.lang.reflect.InvocationTargetException: null
at jdk.internal.reflect.GeneratedMethodAccessor20.invoke(Unknown 
Source) ~[?:?]
at 
jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 ~[?:?]
at java.lang.reflect.Method.invoke(Method.java:568) ~[?:?]
at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRpcInvocation$0(PekkoRpcActor.java:301)
 ~[flink-rpc-akka4947976d-da31-483f-88c7-8f3ce057edfe.jar:1.18-SNAPSHOT]
at 
org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
 ~[classes/:?]
at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcInvocation(PekkoRpcActor.java:300)
 ~[flink-rpc-akka4947976d-da31-483f-88c7-8f3ce057edfe.jar:1.18-SNAPSHOT]
at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:222)
 ~[flink-rpc-akka4947976d-da31-483f-88c7-8f3ce057edfe.jar:1.18-SNAPSHOT]
at 
org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:85)
 ~[flink-rpc-akka4947976d-da31-483f-88c7-8f3ce057edfe.jar:1.18-SNAPSHOT]
at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168)
 ~[flink-rpc-akka4947976d-da31-483f-88c7-8f3ce057edfe.jar:1.18-SNAPSHOT]
at 
org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33) 
[flink-rpc-akka4947976d-da31-483f-88c7-8f3ce057edfe.jar:1.18-SNAPSHOT]
at 
org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29) 
[flink-rpc-akka4947976d-da31-483f-88c7-8f3ce057edfe.jar:1.18-SNAPSHOT]
at scala.PartialFunction.applyOrElse(PartialFunction.scala:127) 
[flink-rpc-akka4947976d-da31-483f-88c7-8f3ce057edfe.jar:1.18-SNAPSHOT]
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126) 
[flink-rpc-akka4947976d-da31-483f-88c7-8f3ce057edfe.jar:1.18-SNAPSHOT]
at 
org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29) 
[flink-rpc-akka4947976d-da31-483f-88c7-8f3ce057edfe.jar:1.18-SNAPSHOT]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175) 
[flink-rpc-akka4947976d-da31-483f-88c7-8f3ce057edfe.jar:1.18-SNAPSHOT]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) 
[flink-rpc-akka4947976d-da31-483f-88c7-8f3ce057edfe.jar:1.18-SNAPSHOT]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) 
[flink-rpc-akka4947976d-da31-483f-88c7-8f3ce057edfe.jar:1.18-SNAPSHOT]
at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547) 
[flink-rpc-akka4947976d-da31-483f-88c7-8f3ce057edfe.jar:1.18-SNAPSHOT]
at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545) 
[flink-rpc-akka4947976d-da31-483f-88c7-8f3ce057edfe.jar:1.18-SNAPSHOT]
at 
org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229) 
[flink-rpc-akka4947976d-da31-483f-88c7-8f3ce057edfe.jar:1.18-SNAPSHOT]
at org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590) 
[flink-rpc-akka4947976d-da31-483f-88c7-8f3ce057edfe.jar:1.18-SNAPSHOT]
at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557) 
[flink-rpc-akka4947976d-da31-483f-88c7-8f3ce057edfe.jar:1.18-SNAPSHOT]
at org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280) 
[flink-rpc-akka4947976d-da31-483f-88c7-8f3ce057edfe.jar:1.18-SNAPSHOT]
at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241) 
[flink-rpc-akka4947976d-da31-483f-88c7-8f3ce057edfe.jar:1.18-SNAPSHOT]
at org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:253) 
[flink-rpc-akka4947976d-da31-483f-88c7-8f3ce057edfe.jar:1.18-SNAPSHOT]
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373) [?:?]
at 
java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182)
 [?:?]
at java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655) [?:?]
at java.util.concurren

Re: [DISCUSS] FLIP-323: Support Attached Execution on Flink Application Completion for Batch Jobs

2023-08-23 Thread Becket Qin
Hi Weihua,

Just want to clarify. "client.attached.after.submission" is going to be a
pure client side configuration.

On the cluster side, it is only "execution.shutdown-on-attached-exit"
controlling whether the cluster will shutdown or not when an attached
client is disconnected. In order to honor this configuration, the cluster
needs to know if the client submitting the job is attached or not. But the
cluster will not retrieve this information by reading the configuration of
"client.attached.after.submission". In fact this configuration should not
even be visible to the cluster. The cluster only knows if a client is
attached or not when a client submits a job.

Thanks,

Jiangjie (Becket) Qin



On Wed, Aug 23, 2023 at 2:35 PM Weihua Hu  wrote:

> Hi, Jiangjie
>
> Thanks for the clarification.
>
> My key point is the meaning of the "submission" in
> "client.attached.after.submission".
> At first glance, I thought only job submissions were taken into account.
> After your clarification, this option also works for cluster submissions.
>
> It's fine for me.
>
> Best,
> Weihua
>
>
> On Wed, Aug 23, 2023 at 8:35 AM Becket Qin  wrote:
>
> > Hi Weihua,
> >
> > Thanks for the explanation. From the doc, it looks like the current
> > behaviors of "execution.attached=true" between Yarn and K8S session
> > cluster are exactly the opposite. For YARN it basically means the cluster
> > will shutdown if the client disconnects. For K8S, it means the cluster
> will
> > not shutdown until a client explicitly stops it. This sounds like a bad
> > situation to me and needs to be fixed.
> >
> > My guess is that the YARN behavior here is the original intended
> behavior,
> > while K8S reused the configuration for a different purpose. If we
> deprecate
> > the execution.attached config here. The behavior would be:
> >
> > For YARN session clusters:
> > 1. Current "execution.attached=true" would be equivalent to
> > "execution.shutdown-on-attached-exit=true" +
> > "client.attached.after.submission=true".
> > 2. Current "execution.attached=false" would be equivalent to
> > "execution.shutdown-on-attached-exit=false", i.e. the cluster will keep
> > running until explicitly stopped.
> >
> > I am not sure what the current behavior of "execution.attached=true" +
> > "execution.shutdown-on-attached-exit=false" is. Supposedly, it should be
> > equivalent to "execution.shutdown-on-attached-exit=false", which means
> > "execution.attached" only controls the client side behavior, while the
> > cluster side behavior is controlled by
> > "execution.shutdown-on-attached-exit".
> >
> > For K8S session clusters:
> > 1. Current "execution.attached=true" would be equivalent to
> > "execution.shutdown-on-attached-exit=false".
> > 2. Current "execution.attached=false" would be equivalent to
> > "execution.shutdown-on-attached-exit=true" +
> > "client.attached.after.submission=true".
> >
> > This will make the same config behave the same for YARN and K8S.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> > On Tue, Aug 22, 2023 at 11:04 PM Weihua Hu 
> wrote:
> >
> > > Hi, Jiangjie
> > >
> > > 'execution.attached' can be used to attach an existing cluster and stop
> > it
> > > [1][2],
> > > which is not related to job submission. So does YARN session mode[3].
> > > IMO, this behavior should not be controlled by the new option
> > > 'client.attached.after.submission'.
> > >
> > > [1]
> > >
> > >
> >
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/resource-providers/native_kubernetes/#session-mode
> > > [2]
> > >
> > >
> >
> https://github.com/apache/flink/blob/a85ffc491874ecf3410f747df3ed09f61df52ac6/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/cli/KubernetesSessionCli.java#L126
> > > [3]
> > >
> > >
> >
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/resource-providers/yarn/#session-mode
> > >
> > > Best,
> > > Weihua
> > >
> > >
> > > On Tue, Aug 22, 2023 at 5:16 PM Becket Qin 
> wrote:
> > >
> > > > Hi Weihua,
> > > >
> > > > Just want to clarify a little bit, what is the impact of
> > > > `execution.attached` on a cluster startup before a client submits a
> job
> > > to
> > > > that cluster? Does this config only become effective after a job
> > > > submission?
> > > >
> > > > Currently, the cluster behavior has an independent config of
> > > > 'execution.shutdown-on-attached-exit'. So if a client submitted a job
> > in
> > > > attached mode, and this `execution.shutdown-on-attached-exit` is set
> to
> > > > true, the cluster will shutdown if the client detaches from the
> > cluster.
> > > Is
> > > > this sufficient? Or do you mean we need another independent
> > > configuration?
> > > >
> > > > Thanks,
> > > >
> > > > Jiangjie (Becket) Qin
> > > >
> > > > On Tue, Aug 22, 2023 at 2:20 PM Weihua Hu 
> > > wrote:
> > > >
> > > > > Hi Jiangjie
> > > > >
> > > > > Sorry for the late reply, I fully agree with the three user
> sensible
> > > > > behaviors you described.
> > > > >
> > > > > I 

[jira] [Created] (FLINK-32944) Caused by: org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue.

2023-08-23 Thread Tan Kim (Jira)
Tan Kim created FLINK-32944:
---

 Summary: Caused by: 
org.apache.flink.api.common.InvalidProgramException: Table program cannot be 
compiled. This is a bug. Please file an issue.
 Key: FLINK-32944
 URL: https://issues.apache.org/jira/browse/FLINK-32944
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / API
Reporter: Tan Kim


When I try to change a Table to a DataStream through the Bridge API of the 
Table API, the following error occurs when I try to convert the List variable 
of the Pojo class.

 
{code:java}
Exception in thread "main" org.apache.flink.table.api.TableException: Error 
while generating structured type converter.
    at 
org.apache.flink.table.data.conversion.StructuredObjectConverter.open(StructuredObjectConverter.java:89)
    at 
org.apache.flink.table.data.conversion.StructuredObjectConverter.open(StructuredObjectConverter.java:76)
    at 
org.apache.flink.table.runtime.typeutils.ExternalSerializer.initializeConverter(ExternalSerializer.java:217)
    at 
org.apache.flink.table.runtime.typeutils.ExternalSerializer.(ExternalSerializer.java:78)
    at 
org.apache.flink.table.runtime.typeutils.ExternalSerializer.of(ExternalSerializer.java:93)
    at 
org.apache.flink.table.runtime.typeutils.ExternalTypeInfo.createExternalTypeSerializer(ExternalTypeInfo.java:97)
    at 
org.apache.flink.table.runtime.typeutils.ExternalTypeInfo.of(ExternalTypeInfo.java:67)
    at 
org.apache.flink.table.planner.connectors.ExternalDynamicSink.lambda$getSinkRuntimeProvider$2(ExternalDynamicSink.java:115)
    at 
org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecSink.applySinkProvider(CommonExecSink.java:512)
    at 
org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecSink.createSinkTransformation(CommonExecSink.java:218)
    at 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.java:176)
    at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:161)
    at 
org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:85)
    at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
    at scala.collection.Iterator.foreach(Iterator.scala:937)
    at scala.collection.Iterator.foreach$(Iterator.scala:937)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
    at scala.collection.IterableLike.foreach(IterableLike.scala:70)
    at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
    at scala.collection.TraversableLike.map(TraversableLike.scala:233)
    at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
    at scala.collection.AbstractTraversable.map(Traversable.scala:104)
    at 
org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:84)
    at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:197)
    at 
org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.toStreamInternal(AbstractStreamTableEnvironmentImpl.java:224)
    at 
org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.toStreamInternal(AbstractStreamTableEnvironmentImpl.java:219)
    at 
org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.scala:151)

..

Caused by: org.apache.flink.util.FlinkRuntimeException: 
org.apache.flink.api.common.InvalidProgramException: Table program cannot be 
compiled. This is a bug. Please file an issue.
    at 
org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:94)
    at 
org.apache.flink.table.data.conversion.StructuredObjectConverter.open(StructuredObjectConverter.java:80)
    ... 31 more
Caused by: 
org.apache.flink.shaded.guava30.com.google.common.util.concurrent.UncheckedExecutionException:
 org.apache.flink.api.common.InvalidProgramException: Table program cannot be 
compiled. This is a bug. Please file an issue.
    at 
org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2051)
    at 
org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache.get(LocalCache.java:3962)
    at 
org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4859)
    at 
org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:92)
    ... 32 more
Caused by: org.apache.flink.api.common.InvalidProgramException: Table program 
cannot be compiled. This is a bug. Please file an issue.
    at 
org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:107)
    at 
org.apache.flink.table.runtime.generated.CompileUtils.lambda$compile$0(CompileUtils.java:92)
    at 
org.apache.fli

Re: [DISCUSS] Update Flink Roadmap

2023-08-23 Thread Jing Ge
Thanks Jark, +1 for the OLAP :-)

Best regards,
Jing

On Sun, Aug 20, 2023 at 5:04 PM Jark Wu  wrote:

> Hi all,
>
> I have addressed another bunch of comments on the Google doc (mainly about
> the OLAP roadmap).
> And I have opened a pull request for the website:
> https://github.com/apache/flink-web/pull/672
>
> Please help to review it and continue the discussion on the pull request,
> thanks a lot!
>
> Best,
> Jark
>
> On Tue, 15 Aug 2023 at 12:15, Xintong Song  wrote:
>
> > Thanks for driving this, Jark.
> >
> > The current draft looks good to me. I think it is good to open a PR with
> > it. And if there are other comments, we can discuss them during the PR
> > review.
> >
> > I also added a few minor comments in the draft regarding the feature
> radar.
> > Those can also be discussed on the PR.
> >
> > Best,
> >
> > Xintong
> >
> >
> >
> > On Tue, Aug 15, 2023 at 11:15 AM Shammon FY  wrote:
> >
> > > Hi Jark,
> > >
> > > Sounds good and I would love to, thanks! I will involve you and
> Xingtong
> > > on the document after updating.
> > >
> > > Best,
> > > Shammon FY
> > >
> > >
> > > On Mon, Aug 14, 2023 at 10:39 PM Jark Wu  wrote:
> > >
> > >> Hi Shammon,
> > >>
> > >> Sure, could you help to draft a subsection about this in the google
> doc?
> > >>
> > >> Best,
> > >> Jark
> > >>
> > >> 2023年8月14日 20:30,Shammon FY  写道:
> > >>
> > >> Thanks @Jark for driving the Flink Roadmap.
> > >>
> > >> As we discussed olap in the thread [1] and according to the
> suggestions
> > >> from @Xingtong Song, could we add a subsection in `Towards Streaming
> > >> Warehouses` or `Performance` that the short-lived query in Flink
> Session
> > >> Cluster is one of the future directions for Flink?
> > >>
> > >> Best,
> > >> Shammon FY
> > >>
> > >> On Mon, Aug 14, 2023 at 8:03 PM Jark Wu  wrote:
> > >>
> > >>> Thank you everyone for helping polish the roadmap [1].
> > >>>
> > >>> I think I have addressed all the comments and we have included all
> > >>> ongoing
> > >>> parts of Flink.
> > >>> Please feel free to take a last look. I'm going to prepare the pull
> > >>> request
> > >>> if there are no more concerns.
> > >>>
> > >>> Best,
> > >>> Jark
> > >>>
> > >>> [1]:
> > >>>
> > >>>
> >
> https://docs.google.com/document/d/12BDiVKEsY-f7HI3suO_IxwzCmR04QcVqLarXgyJAb7c/edit
> > >>>
> > >>> On Sun, 13 Aug 2023 at 13:04, Yuan Mei 
> wrote:
> > >>>
> > >>> > Sorry for taking so long
> > >>> >
> > >>> > I've added a section about Flink Disaggregated State Management
> > >>> Evolution
> > >>> > in the attached doc.
> > >>> >
> > >>> > I found some of the contents might be overlapped with the
> > "large-scale
> > >>> > streaming jobs". So that part might need some changes as well.
> > >>> >
> > >>> > Please let me know what you think.
> > >>> >
> > >>> > Best
> > >>> > Yuan
> > >>> >
> > >>> > On Mon, Jul 24, 2023 at 12:07 PM Yuan Mei 
> > >>> wrote:
> > >>> >
> > >>> > > Sorry have missed this email and respond a bit late.
> > >>> > >
> > >>> > > I will put a draft for the long-term vision for the state as well
> > as
> > >>> > > large-scale state support into the roadmap.
> > >>> > >
> > >>> > > Best
> > >>> > > Yuan
> > >>> > >
> > >>> > > On Mon, Jul 17, 2023 at 10:34 AM Jark Wu 
> wrote:
> > >>> > >
> > >>> > >> Hi Jiabao,
> > >>> > >>
> > >>> > >> Thank you for your suggestions. I have added them to the "Going
> > >>> Beyond a
> > >>> > >> SQL Stream/Batch Processing Engine" and "Large-Scale State Jobs"
> > >>> > sections.
> > >>> > >>
> > >>> > >> Best,
> > >>> > >> Jark
> > >>> > >>
> > >>> > >> On Thu, 13 Jul 2023 at 16:06, Jiabao Sun <
> jiabao@xtransfer.cn
> > >>> > >> .invalid>
> > >>> > >> wrote:
> > >>> > >>
> > >>> > >> > Thanks Jark and Martijn for driving this.
> > >>> > >> >
> > >>> > >> > There are two suggestions about the Table API:
> > >>> > >> >
> > >>> > >> > - Add the JSON type to adapt to the no sql database type.
> > >>> > >> > - Remove changelog normalize operator for upsert stream.
> > >>> > >> >
> > >>> > >> >
> > >>> > >> > Best,
> > >>> > >> > Jiabao
> > >>> > >> >
> > >>> > >> >
> > >>> > >> > > 2023年7月13日 下午3:49,Jark Wu  写道:
> > >>> > >> > >
> > >>> > >> > > Hi all,
> > >>> > >> > >
> > >>> > >> > > Sorry for taking so long back here.
> > >>> > >> > >
> > >>> > >> > > Martijn and I have drafted the first version of the updated
> > >>> roadmap,
> > >>> > >> > > including the updated feature radar reflecting the current
> > >>> state of
> > >>> > >> > > different components.
> > >>> > >> > >
> > >>> > >> >
> > >>> > >>
> > >>> >
> > >>>
> >
> https://docs.google.com/document/d/12BDiVKEsY-f7HI3suO_IxwzCmR04QcVqLarXgyJAb7c/edit
> > >>> > >> > >
> > >>> > >> > > Feel free to leave comments in the thread or the document.
> > >>> > >> > > We may miss mentioning something important, so your help in
> > >>> > enriching
> > >>> > >> > > the content is greatly appreciated.
> > >>> > >> > >
> > >>> > >> > > Best,
> > >>> > >> > > Jark & Martijn
> > >>> > >> > >
> > >>> > >> > >
> > >>> > >> > > On Fri, 

[jira] [Created] (FLINK-32943) sometime run batch tasks concurrently, the tasks still in the initialization status

2023-08-23 Thread zhu (Jira)
zhu created FLINK-32943:
---

 Summary: sometime run batch tasks concurrently, the tasks still in 
the initialization status
 Key: FLINK-32943
 URL: https://issues.apache.org/jira/browse/FLINK-32943
 Project: Flink
  Issue Type: Bug
 Environment: flink 1.15.2

 
|*lob.server.port*|6124|
|*classloader.resolve-order*|parent-first|
|*jobmanager.execution.failover-strategy*|region|
|*jobmanager.memory.heap.size*|2228014280b|
|*jobmanager.memory.jvm-metaspace.size*|536870912b|
|*jobmanager.memory.jvm-overhead.max*|322122552b|
|*jobmanager.memory.jvm-overhead.min*|322122552b|
|*jobmanager.memory.off-heap.size*|134217728b|
|*jobmanager.memory.process.size*|3gb|
|*jobmanager.rpc.address*|naf-flink-ms-flink-manager-1-4gcwz|
|*jobmanager.rpc.port*|6123|
|*parallelism.default*|1|
|*query.server.port*|6125|
|*rest.address*|0.0.0.0|
|*rest.bind-address*|0.0.0.0|
|*rest.connection-timeout*|6|
|*rest.server.numThreads*|8|
|*slot.request.timeout*|300|
|*state.backend.rocksdb.localdir*|/home/nafplat/data/flinkStateStore|
|*state.backend.type*|rocksdb|
|*taskmanager.bind-host*|0.0.0.0|
|*taskmanager.host*|0.0.0.0|
|*taskmanager.memory.framework.off-heap.batch-shuffle.size*|256mb|
|*taskmanager.memory.framework.off-heap.size*|512mb|
|*taskmanager.memory.managed.fraction*|0.4|
|*taskmanager.memory.network.fraction*|0.2|
|*taskmanager.memory.process.size*|16gb|
|*taskmanager.memory.task.off-heap.size*|268435456bytes|
|*taskmanager.numberOfTaskSlots*|6|
|*taskmanager.runtime.large-record-handler*|true|
|*web.submit.enable*|true|
|*web.tmpdir*|/tmp/flink-web-4be192ba-870a-4f88-8185-d07fa6303cca|
|*web.upload.dir*|/opt/flink/nafJar|
Reporter: zhu


run 1.15.2 flink session on k8s,In most cases, there is no problem. Sometimes, 
tasks are initialized continuously, and subsequent tasks are also initialized 
continuously,

I run batch job with 6 concurrent,jobmanage with 2cpu and 3g Memory

This problem always occurs, it seems that there is a deadlock during 
initialization, but my job does not have any deadlock issues

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: Heartbeat of TaskManager with id container

2023-08-23 Thread xiangyu feng
Hi Nagireddy,

I'm not sure how you monitoring kafka lag. AFAIK, you can check the
metadata of the topic in your Kafka cluster to see the actual lag by
following command.

./kafka-consumer-groups.sh --bootstrap-server 192.168.0.107:39092
--group  --describe


This tool is provided with Kafka distribution. If there are any gap between
Kafka Connector lag and this tool, u can open a jira to report this
issue[1].

Hope this helps u!

[1]
https://issues.apache.org/jira/secure/RapidBoard.jspa?rapidView=582&projectKey=FLINK

Regards,
Xiangyu




Y SREEKARA BHARGAVA REDDY  于2023年8月21日周一 18:45写道:

> Thanks Xiangyu,
>
> I have one issue, while running flink with kafka connector. its a working
> fine for couple of days.
>
> But suddenly  kafka lag went to "Negative value"
>
> I am trying to find the root cause for that. Any suggestions?
>
>
> On Sat, Aug 5, 2023 at 5:57 PM xiangyu feng  wrote:
>
>> Hi Nagireddy,
>>
>> I'm not particularly familiar with StreamingFileSink but I checked with
>> the implementation of HadoopFsCommitter. AFAIK, when committing files to
>> HDFS the committer will check if the temp file exist in the first place.
>> [image: image.png]
>>
>> In your case, could u check why the committing temp file not exist on
>> HDFS? Were these files deleted by mistake? I searched some information,
>> this error may be due to the small file merge will merge the file that is
>> being written. You can disable small file merge when writing files.
>>
>> Hope this helps.
>>
>> Regards,
>> Xiangyu
>>
>>
>> Y SREEKARA BHARGAVA REDDY  于2023年8月5日周六 18:22写道:
>>
>>> Hi Xiangyu/Dev,
>>>
>>> Did any one has solution handle below important note in
>>> StreamingFileSink:
>>>
>>> Caused by: java.io.IOException: Cannot clean commit: Staging file does
>>> not exist.
>>> at org.apache.flink.runtime.fs.hdfs.
>>> HadoopRecoverableFsDataOutputStream$HadoopFsCommitter.commit(
>>> HadoopRecoverableFsDataOutputStream.java:250)
>>>
>>> Important Note 3: Flink and the StreamingFileSink never overwrites
>>> committed data. Given this, when trying to restore from an old
>>> checkpoint/savepoint which assumes an in-progress file which was committed
>>> by subsequent successful checkpoints, *Flink will refuse to resume and
>>> it will throw an exception as it cannot locate the in-progress file*.
>>>
>>> Currently i am facing same issue in the PROD code.
>>>
>>>
>>>
>>> Regards,
>>> Nagireddy Y.
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Fri, Aug 4, 2023 at 12:11 PM xiangyu feng 
>>> wrote:
>>>
 Hi ynagireddy4u,

 From the exception info, I think your application has met a HDFS file
 issue during the commit phase of checkpoint. Can u check why 'Staging file
 does not exist' in the first place?

 Regards,
 Xiangyu

 Y SREEKARA BHARGAVA REDDY  于2023年8月4日周五
 12:21写道:

> Hi Xiangyu/Dev Team,
>
> Thanks for reply.
>
> In  our flink job, we increase the *checkpoint timeout to 30 min.*
> And the *checkpoint interval is 10 min.*
>
> But while running the job we got below exception.
>
> java.lang.RuntimeException: Error while confirming checkpoint
> at org.apache.flink.streaming.runtime.tasks.StreamTask
> .notifyCheckpointComplete(StreamTask.java:952)
> at org.apache.flink.streaming.runtime.tasks.StreamTask
> .lambda$notifyCheckpointCompleteAsync$7(StreamTask.java:924)
> at org.apache.flink.util.function.FunctionUtils
> .lambda$asCallable$5(FunctionUtils.java:125)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at org.apache.flink.streaming.runtime.tasks.
> StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.run(
> StreamTaskActionExecutor.java:87)
> at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail
> .java:78)
> at org.apache.flink.streaming.runtime.tasks.mailbox.
> MailboxProcessor.processMail(MailboxProcessor.java:261)
> at org.apache.flink.streaming.runtime.tasks.mailbox.
> MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
> at org.apache.flink.streaming.runtime.tasks.StreamTask
> .runMailboxLoop(StreamTask.java:487)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
> StreamTask.java:470)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.IOException: Cannot clean commit: Staging file
> does not exist.
> at org.apache.flink.runtime.fs.hdfs.
> HadoopRecoverableFsDataOutputStream$HadoopFsCommitter.commit(
> HadoopRecoverableFsDataOutputStream.java:250)
> at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket
> .onSuccessfulCompletionOfCheckpoint(Bucket.java:300)
> at org.apache.flink.streaming.api.functions.sink.filesystem.
> Buckets.commitUpToCheckpoint(Buc

Fwd: [Discussion] Slack Channel

2023-08-23 Thread Jing Ge
Hi devs,

Thanks Giannis for your suggestion. It seems that the last email wasn't
sent to the dev ML. It is also an interesting topic for devs and user-zh.

Best regards,
Jing

-- Forwarded message -
From: Giannis Polyzos 
Date: Tue, Aug 22, 2023 at 11:11 AM
Subject: [Discussion] Slack Channel
To: user , 


Hello folks,
considering how apache flink gains more and more popularity and seeing how
other open-source projects use Slack, I wanted to start this thread to see
how we can grow the community.
First of all one thing I have noticed, although there are people involved
with Flink only lately they start noticing there is actually an open-source
Slack channel.
Maybe we can help somehow raise awareness around that? Like inviting people
to join.
The other part is around the channels.
Currently, there is only one channel #troubleshooting for people to ask
questions.
I believe this creates a few limitations, as there are quite a few
questions daily, but it's hard to differentiate between topics, and people
with context on specific parts can't identify them easily.
I'm thinking it would be nice to create more channels like:
#flink-cdc
#flink-paimon
#flink-datastream
#pyflink
#flink-sql
#flink-statebackends
#flink-monitoring
#flink-k8s
#job-board etc.
to help people have more visibility on different topics, make it easier to
find answers to similar questions, and search for things of interest.
This can be a first step towards growing the community.

Best,
Giannis