[jira] [Created] (FLINK-34128) oracle jdbc connector bug. When the oracle table field is of type float, the type obtained by jdbc is bigdecimal. error is 'java.lang.ClassCastException: java.math.BigD

2024-01-16 Thread blackpighe (Jira)
blackpighe created FLINK-34128:
--

 Summary: oracle jdbc connector bug. When the oracle table field is 
of type float, the type obtained by jdbc is bigdecimal. error is  
'java.lang.ClassCastException: java.math.BigDecimal cannot be cast to 
java.lang.Float'
 Key: FLINK-34128
 URL: https://issues.apache.org/jira/browse/FLINK-34128
 Project: Flink
  Issue Type: Bug
  Components: Connectors / JDBC
Affects Versions: jdbc-3.1.0, jdbc-3.0.0
 Environment: All current versions of flinks-jdbc-connector
Reporter: blackpighe
 Fix For: jdbc-3.1.1, jdbc-3.0.0
 Attachments: image-2024-01-17-14-33-05-713.png

create oracle table contain float field and execute sql occurred error.

java.lang.ClassCastException: java.math.BigDecimal cannot be cast to 
java.lang.Float

 

Locate the cause of the error:

org/apache/flink/connector/jdbc/converter/AbstractJdbcRowConverter.java

!image-2024-01-17-14-33-05-713.png!

Object field = resultSet.getObject(pos + 1);

this method for oracle jdbc produce bug. 

expect:float value

actual: bigdecimal value

 

Suggest this modification:

Object field =
resultSet.getObject(pos + 1, rowType.getTypeAt(pos).getDefaultConversion());

Specify the type explicitly according to the schema.but mock test case is 
error.Let's talk about it and see what we can do about it.

 

 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34127) Kafka connector repo runs a duplicate of `IntegrationTests` framework tests

2024-01-16 Thread Mason Chen (Jira)
Mason Chen created FLINK-34127:
--

 Summary: Kafka connector repo runs a duplicate of 
`IntegrationTests` framework tests
 Key: FLINK-34127
 URL: https://issues.apache.org/jira/browse/FLINK-34127
 Project: Flink
  Issue Type: Improvement
  Components: Build System / CI, Connectors / Kafka
Affects Versions: kafka-3.0.2
Reporter: Mason Chen


I found out this behavior when troubleshooting CI flakiness. These integration 
tests make heavy use of the CI since they require Kafka, Zookeeper, and Docker 
containers. We can further stablize CI by not redundantly running these set of 
tests.


`grep -E ".*testIdleReader\[TestEnvironment.*" 14_Compile\ and\ test.txt` 
returns:

```

2024-01-17T00:51:05.2943150Z Test 
org.apache.flink.connector.kafka.dynamic.source.DynamicKafkaSourceITTest.IntegrationTests.testIdleReader[TestEnvironment:
 [MiniCluster], ExternalContext: 
[org.apache.flink.connector.kafka.testutils.DynamicKafkaSourceExternalContext@43e9a8a2],
 Semantic: [EXACTLY_ONCE]] is running.
2024-01-17T00:51:07.6922535Z Test 
org.apache.flink.connector.kafka.dynamic.source.DynamicKafkaSourceITTest.IntegrationTests.testIdleReader[TestEnvironment:
 [MiniCluster], ExternalContext: 
[org.apache.flink.connector.kafka.testutils.DynamicKafkaSourceExternalContext@43e9a8a2],
 Semantic: [EXACTLY_ONCE]] successfully run.
2024-01-17T00:56:27.1326332Z Test 
org.apache.flink.connector.kafka.dynamic.source.DynamicKafkaSourceITTest.IntegrationTests.testIdleReader[TestEnvironment:
 [MiniCluster], ExternalContext: 
[org.apache.flink.connector.kafka.testutils.DynamicKafkaSourceExternalContext@2db4a84a],
 Semantic: [EXACTLY_ONCE]] is running.
2024-01-17T00:56:28.4000830Z Test 
org.apache.flink.connector.kafka.dynamic.source.DynamicKafkaSourceITTest.IntegrationTests.testIdleReader[TestEnvironment:
 [MiniCluster], ExternalContext: 
[org.apache.flink.connector.kafka.testutils.DynamicKafkaSourceExternalContext@2db4a84a],
 Semantic: [EXACTLY_ONCE]] successfully run.
2024-01-17T00:56:58.7830792Z Test 
org.apache.flink.connector.kafka.source.KafkaSourceITCase.IntegrationTests.testIdleReader[TestEnvironment:
 [MiniCluster], ExternalContext: [KafkaSource-PARTITION], Semantic: 
[EXACTLY_ONCE]] is running.
2024-01-17T00:56:59.0544092Z Test 
org.apache.flink.connector.kafka.source.KafkaSourceITCase.IntegrationTests.testIdleReader[TestEnvironment:
 [MiniCluster], ExternalContext: [KafkaSource-PARTITION], Semantic: 
[EXACTLY_ONCE]] successfully run.
2024-01-17T00:56:59.3910987Z Test 
org.apache.flink.connector.kafka.source.KafkaSourceITCase.IntegrationTests.testIdleReader[TestEnvironment:
 [MiniCluster], ExternalContext: [KafkaSource-TOPIC], Semantic: [EXACTLY_ONCE]] 
is running.
2024-01-17T00:56:59.6025298Z Test 
org.apache.flink.connector.kafka.source.KafkaSourceITCase.IntegrationTests.testIdleReader[TestEnvironment:
 [MiniCluster], ExternalContext: [KafkaSource-TOPIC], Semantic: [EXACTLY_ONCE]] 
successfully run.
2024-01-17T00:57:37.8378640Z Test 
org.apache.flink.connector.kafka.source.KafkaSourceITCase.IntegrationTests.testIdleReader[TestEnvironment:
 [MiniCluster], ExternalContext: [KafkaSource-PARTITION], Semantic: 
[EXACTLY_ONCE]] is running.
2024-01-17T00:57:38.0144732Z Test 
org.apache.flink.connector.kafka.source.KafkaSourceITCase.IntegrationTests.testIdleReader[TestEnvironment:
 [MiniCluster], ExternalContext: [KafkaSource-PARTITION], Semantic: 
[EXACTLY_ONCE]] successfully run.
2024-01-17T00:57:38.2004796Z Test 
org.apache.flink.connector.kafka.source.KafkaSourceITCase.IntegrationTests.testIdleReader[TestEnvironment:
 [MiniCluster], ExternalContext: [KafkaSource-TOPIC], Semantic: [EXACTLY_ONCE]] 
is running.
2024-01-17T00:57:38.4072815Z Test 
org.apache.flink.connector.kafka.source.KafkaSourceITCase.IntegrationTests.testIdleReader[TestEnvironment:
 [MiniCluster], ExternalContext: [KafkaSource-TOPIC], Semantic: [EXACTLY_ONCE]] 
successfully run.
2024-01-17T01:06:11.2933375Z Test 
org.apache.flink.tests.util.kafka.KafkaSourceE2ECase.testIdleReader[TestEnvironment:
 [FlinkContainers], ExternalContext: [KafkaSource-PARTITION], Semantic: 
[EXACTLY_ONCE]] is running.
2024-01-17T01:06:12.1790031Z Test 
org.apache.flink.tests.util.kafka.KafkaSourceE2ECase.testIdleReader[TestEnvironment:
 [FlinkContainers], ExternalContext: [KafkaSource-PARTITION], Semantic: 
[EXACTLY_ONCE]] successfully run.
2024-01-17T01:06:12.5703927Z Test 
org.apache.flink.tests.util.kafka.KafkaSourceE2ECase.testIdleReader[TestEnvironment:
 [FlinkContainers], ExternalContext: [KafkaSource-TOPIC], Semantic: 
[EXACTLY_ONCE]] is running.
2024-01-17T01:06:13.3369574Z Test 
org.apache.flink.tests.util.kafka.KafkaSourceE2ECase.testIdleReader[TestEnvironment:
 [FlinkContainers], ExternalContext: [KafkaSource-TOPIC], Semantic: 
[EXACTLY_ONCE]] successfully run.

```



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34126) Correct the description of jobmanager.scheduler

2024-01-16 Thread Junrui Li (Jira)
Junrui Li created FLINK-34126:
-

 Summary: Correct the description of jobmanager.scheduler
 Key: FLINK-34126
 URL: https://issues.apache.org/jira/browse/FLINK-34126
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Configuration
Reporter: Junrui Li


Now the config option jobmanager.scheduler has description: 

_Determines which scheduler implementation is used to schedule tasks. Accepted 
values are:_
 * _'Default': Default scheduler_
 * _'Adaptive': Adaptive scheduler. More details can be found 
[here|https://nightlies.apache.org/flink/flink-docs-master/zh/docs/deployment/elastic_scaling#adaptive-scheduler]._
 * _'AdaptiveBatch': Adaptive batch scheduler. More details can be found 
[here|https://nightlies.apache.org/flink/flink-docs-master/zh/docs/deployment/elastic_scaling#adaptive-batch-scheduler]._

_Possible values:_
 * _"Default"_
 * _"Adaptive"_
 * _"AdaptiveBatch"_

However, after FLIP-283 we changed the default scheduler for batch job to 
AdaptiveBatchScheduler. This config option description will mislead users that 
the 'DefaultScheduler' is the universal fallback for both batch and streaming 
jobs. 

 

We should update this description.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-403: High Availability Services for OLAP Scenarios

2024-01-16 Thread Yang Wang
I am completely in favor of splitting the LeaderServices and
PersistenceServices
while sharing the same concern that MaterialProvider is not very easy to
understand.
It just feels like we do the separation but not thoroughly.

If you have a clear plan for the subsequent improvements, I am fine that we
only focus
on the OLAP requirements in FLIP-403.


Best,
Yang

On Wed, Jan 17, 2024 at 11:40 AM Yangze Guo  wrote:

> Thanks for the comments, Zhu.
>
> > Did you look into which part takes most of the time? Jar uploading, Jar
> downloading, JobInformation shipping, TDD shipping, or others?
>
> In our scenario, the key factor should be the JobInformation shipping,
> as the jobs are completed within 1 second. This can have a big impact
> on the QPS.
>
> > If these objects are large, e.g. a hundreds megabytes connector jar,
> will ship it hundreds of times(if parallelism > 100) from JMs to TMs be a
> blocker of performance and stability, compared letting the DFS help with
> the shipping... I'm fine to use a void blobService in OLAP scenarios *by
> default* if it works better in most cases.
>
> Thanks for the input. Currently, in our scenario, the connector jars
> are pre-deployed on the JM and TM, and each job submission only
> includes the serialized JobGraph. However, if there are custom
> connectors and UDFs involved in the future, I believe choosing the
> appropriate blob strategy will indeed require a further analysis. So,
> +1 for providing users with the option to switch between blob
> services. high-availability.blob-store.enabled sounds good from my
> side. We can set it to false if it is not manually configured and if
> high-availability.job-recovery.enabled is set to false.
>
> If there are no further comments, I will adjust the FLIP based on
> these discussions and then initiate a vote.
>
> Best,
> Yangze Guo
>
> On Mon, Jan 15, 2024 at 5:55 PM Zhu Zhu  wrote:
> >
> > Correction:
> > I'm fine to use a void blobService in OLAP scenarios if it works better
> > in most cases.  -> I'm fine to use a void blobService in OLAP scenarios
> > *by default* if it works better in most cases.
> >
> >
> >
> > Zhu Zhu  于2024年1月15日周一 17:51写道:
> >
> > > @Yangze
> > >
> > > > (with 128 parallelism WordCount jobs), disabling BlobStore resulted
> in a
> > > 100% increase in QPS
> > >
> > > Did you look into which part takes most of the time? Jar uploading, Jar
> > > downloading, JobInformation shipping, TDD shipping, or others?
> > >
> > > If these objects are large, e.g. a hundreds megabytes connector jar,
> > > will ship it hundreds of times(if parallelism > 100) from JMs to TMs
> > > be a blocker of performance and stability, compared letting the DFS
> > > help with the shipping. If yes, we should not force it to use a void
> > > blobService. Maybe an option should be given to users to switch between
> > > blobServices?
> > >
> > > I'm fine to use a void blobService in OLAP scenarios if it works better
> > > in most cases. However, it is a bit weird that we disable blobs if
> > > `enable-job-recovery=false`. Conceptually, they should be unrelated.
> > >
> > > > As Matthias mentioned, each component still needs to write its RPC
> > > address, so this part of the writing may be unavoidable.
> > >
> > > Thanks Matthias for the inputs.
> > > However, even in non-ha mode, that task manager can connect to
> JobMaster.
> > > Therefore, I guess it's not necessary to store JM addresses externally.
> > > I noticed `HighAvailabilityServices#getJobManagerLeaderRetriever`
> > > accepts a parameter `defaultJobManagerAddress`. So maybe it's not
> needed
> > > for TMs to find out the addresses of JMs via external services?
> > >
> > > > focus on the discussion of HA functionality in the OLAP scenario in
> > > FLIP-403 and exclude the refactoring from the scope of this FLIP
> > >
> > > It sounds good to me.
> > > Actually the concept of separating leader election and persistence
> > > looks great to me at the first glance. But the shared MaterialProvider
> > > makes it more complicated than I had expected.
> > >
> > > Thanks,
> > > Zhu
> > >
> > > Yangze Guo  于2024年1月11日周四 14:53写道:
> > >
> > >> Thanks for the comments, Zhu and Matthias.
> > >>
> > >> @Zhu Zhu
> > >>
> > >> > How about disabling the checkpoint to avoid the cost? I know the
> cost
> > >> is there even if we disable the checkpoint at the moment. But I think
> it
> > >> can be fixed.
> > >> > If HA is disabled, the jobmanager needs to directly participate in
> all
> > >> blob shipping work which may result in a hot-spot.
> > >>
> > >> Currently, there are several persistence services that have specific
> > >> implementations based on the HA mode:
> > >> - JobGraphStore and JobResultStore: These are related to job recovery
> > >> and can cause significant redundant I/O in OLAP scenarios, impacting
> > >> performance. It may be necessary to configure them as in-memory stores
> > >> for OLAP.
> > >> - CompletedCheckpointStore: As @Zhu Zhu mentioned, we can avoid this
> > >> 

[jira] [Created] (FLINK-34125) Flink 2.0: Remove deprecated serialization config methods and options

2024-01-16 Thread Zhanghao Chen (Jira)
Zhanghao Chen created FLINK-34125:
-

 Summary: Flink 2.0: Remove deprecated serialization config methods 
and options
 Key: FLINK-34125
 URL: https://issues.apache.org/jira/browse/FLINK-34125
 Project: Flink
  Issue Type: Sub-task
  Components: API / Type Serialization System, Runtime / Configuration
Reporter: Zhanghao Chen






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34124) Flink 2.0: Disable Kyro by default

2024-01-16 Thread Zhanghao Chen (Jira)
Zhanghao Chen created FLINK-34124:
-

 Summary: Flink 2.0: Disable Kyro by default
 Key: FLINK-34124
 URL: https://issues.apache.org/jira/browse/FLINK-34124
 Project: Flink
  Issue Type: Sub-task
  Components: API / Type Serialization System, Runtime / Configuration
Reporter: Zhanghao Chen






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34123) Introduce built-in serializers for common composited data types

2024-01-16 Thread Zhanghao Chen (Jira)
Zhanghao Chen created FLINK-34123:
-

 Summary: Introduce built-in serializers for common composited data 
types
 Key: FLINK-34123
 URL: https://issues.apache.org/jira/browse/FLINK-34123
 Project: Flink
  Issue Type: Sub-task
  Components: API / Type Serialization System
Reporter: Zhanghao Chen






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-403: High Availability Services for OLAP Scenarios

2024-01-16 Thread Yangze Guo
Thanks for the comments, Zhu.

> Did you look into which part takes most of the time? Jar uploading, Jar 
> downloading, JobInformation shipping, TDD shipping, or others?

In our scenario, the key factor should be the JobInformation shipping,
as the jobs are completed within 1 second. This can have a big impact
on the QPS.

> If these objects are large, e.g. a hundreds megabytes connector jar, will 
> ship it hundreds of times(if parallelism > 100) from JMs to TMs be a blocker 
> of performance and stability, compared letting the DFS help with the 
> shipping... I'm fine to use a void blobService in OLAP scenarios *by default* 
> if it works better in most cases.

Thanks for the input. Currently, in our scenario, the connector jars
are pre-deployed on the JM and TM, and each job submission only
includes the serialized JobGraph. However, if there are custom
connectors and UDFs involved in the future, I believe choosing the
appropriate blob strategy will indeed require a further analysis. So,
+1 for providing users with the option to switch between blob
services. high-availability.blob-store.enabled sounds good from my
side. We can set it to false if it is not manually configured and if
high-availability.job-recovery.enabled is set to false.

If there are no further comments, I will adjust the FLIP based on
these discussions and then initiate a vote.

Best,
Yangze Guo

On Mon, Jan 15, 2024 at 5:55 PM Zhu Zhu  wrote:
>
> Correction:
> I'm fine to use a void blobService in OLAP scenarios if it works better
> in most cases.  -> I'm fine to use a void blobService in OLAP scenarios
> *by default* if it works better in most cases.
>
>
>
> Zhu Zhu  于2024年1月15日周一 17:51写道:
>
> > @Yangze
> >
> > > (with 128 parallelism WordCount jobs), disabling BlobStore resulted in a
> > 100% increase in QPS
> >
> > Did you look into which part takes most of the time? Jar uploading, Jar
> > downloading, JobInformation shipping, TDD shipping, or others?
> >
> > If these objects are large, e.g. a hundreds megabytes connector jar,
> > will ship it hundreds of times(if parallelism > 100) from JMs to TMs
> > be a blocker of performance and stability, compared letting the DFS
> > help with the shipping. If yes, we should not force it to use a void
> > blobService. Maybe an option should be given to users to switch between
> > blobServices?
> >
> > I'm fine to use a void blobService in OLAP scenarios if it works better
> > in most cases. However, it is a bit weird that we disable blobs if
> > `enable-job-recovery=false`. Conceptually, they should be unrelated.
> >
> > > As Matthias mentioned, each component still needs to write its RPC
> > address, so this part of the writing may be unavoidable.
> >
> > Thanks Matthias for the inputs.
> > However, even in non-ha mode, that task manager can connect to JobMaster.
> > Therefore, I guess it's not necessary to store JM addresses externally.
> > I noticed `HighAvailabilityServices#getJobManagerLeaderRetriever`
> > accepts a parameter `defaultJobManagerAddress`. So maybe it's not needed
> > for TMs to find out the addresses of JMs via external services?
> >
> > > focus on the discussion of HA functionality in the OLAP scenario in
> > FLIP-403 and exclude the refactoring from the scope of this FLIP
> >
> > It sounds good to me.
> > Actually the concept of separating leader election and persistence
> > looks great to me at the first glance. But the shared MaterialProvider
> > makes it more complicated than I had expected.
> >
> > Thanks,
> > Zhu
> >
> > Yangze Guo  于2024年1月11日周四 14:53写道:
> >
> >> Thanks for the comments, Zhu and Matthias.
> >>
> >> @Zhu Zhu
> >>
> >> > How about disabling the checkpoint to avoid the cost? I know the cost
> >> is there even if we disable the checkpoint at the moment. But I think it
> >> can be fixed.
> >> > If HA is disabled, the jobmanager needs to directly participate in all
> >> blob shipping work which may result in a hot-spot.
> >>
> >> Currently, there are several persistence services that have specific
> >> implementations based on the HA mode:
> >> - JobGraphStore and JobResultStore: These are related to job recovery
> >> and can cause significant redundant I/O in OLAP scenarios, impacting
> >> performance. It may be necessary to configure them as in-memory stores
> >> for OLAP.
> >> - CompletedCheckpointStore: As @Zhu Zhu mentioned, we can avoid this
> >> overhead by disabling checkpoints. I agree to remove Checkpoint
> >> Storage from the scope of this FLIP.
> >> - BlobStore: Agree that disabling BlobStore can potentially lead to
> >> hotspots in JobManagers. However, enabling it in OLAP scenarios can
> >> also result in high external storage access overhead , e.g.
> >> JobInformation/ShuffleDescriptor in TDD. I think this is a trade-off.
> >> In our internal benchmark for short query (with 128 parallelism
> >> WordCount jobs), disabling BlobStore resulted in a 100% increase in
> >> QPS. Therefore, I lean towards disabling it. WDYT?
> >>
> >> > 

[jira] [Created] (FLINK-34122) Deprecate old serialization config methods and options

2024-01-16 Thread Zhanghao Chen (Jira)
Zhanghao Chen created FLINK-34122:
-

 Summary: Deprecate old serialization config methods and options
 Key: FLINK-34122
 URL: https://issues.apache.org/jira/browse/FLINK-34122
 Project: Flink
  Issue Type: Sub-task
  Components: API / Type Serialization System, Runtime / Configuration
Reporter: Zhanghao Chen






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34121) Introduce pipeline.force-kryo-avro to control whether to force registration of Avro serializer with Kryo

2024-01-16 Thread Zhanghao Chen (Jira)
Zhanghao Chen created FLINK-34121:
-

 Summary: Introduce pipeline.force-kryo-avro to control whether to 
force registration of Avro serializer with Kryo
 Key: FLINK-34121
 URL: https://issues.apache.org/jira/browse/FLINK-34121
 Project: Flink
  Issue Type: Sub-task
  Components: API / Type Serialization System, Runtime / Configuration
Reporter: Zhanghao Chen






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34120) Introduce unified serialization config option for all Kryo, POJO and customized serializers

2024-01-16 Thread Zhanghao Chen (Jira)
Zhanghao Chen created FLINK-34120:
-

 Summary: Introduce unified serialization config option for all 
Kryo, POJO and customized serializers
 Key: FLINK-34120
 URL: https://issues.apache.org/jira/browse/FLINK-34120
 Project: Flink
  Issue Type: New Feature
  Components: API / Type Serialization System, Runtime / Configuration
Reporter: Zhanghao Chen






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34119) Improve description about changelog in document

2024-01-16 Thread Hangxiang Yu (Jira)
Hangxiang Yu created FLINK-34119:


 Summary: Improve description about changelog in document
 Key: FLINK-34119
 URL: https://issues.apache.org/jira/browse/FLINK-34119
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / State Backends
Reporter: Hangxiang Yu
Assignee: Hangxiang Yu


Since we have resolved some issues and marked as prodution-ready in [release 
note,|#generalized-incremental-checkpoint]]

we could update some description about it in doc.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-417: Expose JobManagerOperatorMetrics via REST API

2024-01-16 Thread Hang Ruan
Hi, Mason.

The field `operatorName` in JobManagerOperatorQueryScopeInfo refers to the
fields in OperatorQueryScopeInfo and chooses the operatorName instead of
OperatorID.
It is fine by my side to change from opertorName to operatorID in this
FLIP.

Best,
Hang

Mason Chen  于2024年1月17日周三 09:39写道:

> Hi Xuyang and Hang,
>
> Thanks for your support and feedback! See my responses below:
>
> 1. IIRC, in a sense, operator ID and vertex ID are the same thing. The
> > operator ID can
> > be converted from the vertex ID[1]. Therefore, it is somewhat strange to
> > have both vertex
> > ID and operator ID in a single URL.
> >
> I think Hang explained it perfectly. Essentially, a vertix may contain one
> or more operators so the operator ID is required to distinguish this case.
>
> 2. If I misunderstood the semantics of operator IDs here, then what is the
> > relationship
> > between vertex ID and operator ID, and do we need a url like
> > `/jobs//vertices//operators/`
> > to list all operator ids under this vertices?
> >
> Good question, we definitely need expose operator IDs through the REST API
> to make this usable. I'm looking at how users would currently discover the
> vertex id to query. From the supported REST APIs [1], you can currently
> obtain it from
>
> 1. `/jobs/`
> 2. `/jobs//plan`
>
> From the response of both these APIs, they include the vertex ids (the
> vertices AND nodes fields), but not the operator ids. We would need to add
> the logic to the plan generation [2]. The response is a little confusing
> because there is a field in the vertices called "operator name". I propose
> to add a new field called "operators" to the vertex response object, which
> would be a list of objects with the structure
>
> Operator
> {
>   "id": "THE-FLINK-GENERATED-ID"
> }.
>
> The JobManagerOperatorQueryScopeInfo has three fields: jobID, vertexID and
> > operatorName. So we should use the operator name in the API.
> > If you think we should use the operator id, there need be more changes
> > about it.
> >
> I think we should use operator id since it uniquely identifies an
> operator--on the contrary, the operator name does not (it may be empty or
> repeated between operators by the user). I actually had a question on that
> since you implemented the metric group. What's the reason we use operator
> name currently? Could it also use operator id so we can match against the
> id?
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/rest_api/
> [2]
>
> https://github.com/apache/flink/blob/416cb7aaa02c176e01485ff11ab4269f76b5e9e2/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonPlanGenerator.java#L53
>
> Best,
> Mason
>
>
> On Thu, Jan 11, 2024 at 10:54 PM Hang Ruan  wrote:
>
> > Hi, Mason.
> >
> > Thanks for driving this FLIP.
> >
> > The JobManagerOperatorQueryScopeInfo has three fields: jobID, vertexID
> and
> > operatorName. So we should use the operator name in the API.
> > If you think we should use the operator id, there need be more changes
> > about it.
> >
> > About the Xuyang's questions, we add both vertexID and operatorID
> > information because of the operator chain.
> > A operator chain has a vertexID and contains many different operators.
> The
> > operator information helps to distinguish them in the same operator
> chain.
> >
> > Best,
> > Hang
> >
> >
> > Xuyang  于2024年1月12日周五 10:21写道:
> >
> > > Hi, Mason.
> > > Thanks for driving this Flip. I think it's important for external
> system
> > > to be able to
> > > perceive the metric of the operator coordinator. +1 for it.
> > >
> > >
> > > I just have the following minor questions and am looking forward to
> your
> > > reply. Please forgive
> > > me if I have some misunderstandings.
> > >
> > >
> > > 1. IIRC, in a sense, operator ID and vertex ID are the same thing. The
> > > operator ID can
> > > be converted from the vertex ID[1]. Therefore, it is somewhat strange
> to
> > > have both vertex
> > > ID and operator ID in a single URL.
> > >
> > >
> > > 2. If I misunderstood the semantics of operator IDs here, then what is
> > the
> > > relationship
> > > between vertex ID and operator ID, and do we need a url like
> > > `/jobs//vertices//operators/`
> > > to list all operator ids under this vertices?
> > >
> > >
> > >
> > >
> > > [1]
> > >
> >
> https://github.com/apache/flink/blob/7bebd2d9fac517c28afc24c0c034d77cfe2b43a6/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/OperatorID.java#L40C27-L40C27
> > >
> > > --
> > >
> > > Best!
> > > Xuyang
> > >
> > >
> > >
> > >
> > >
> > > At 2024-01-12 04:20:03, "Mason Chen"  wrote:
> > > >Hi Devs,
> > > >
> > > >I'm opening this thread to discuss a short FLIP for exposing
> > > >JobManagerOperatorMetrics via REST API [1].
> > > >
> > > >The current set of REST APIs make it impossible to query coordinator
> > > >metrics. This FLIP proposes a new REST API to query the
> > > >JobManagerOperatorMetrics.
> > > >
> > > >[1]
> > > >
> > >
> >
> 

[jira] [Created] (FLINK-34118) Implement restore tests for Sort node

2024-01-16 Thread Bonnie Varghese (Jira)
Bonnie Varghese created FLINK-34118:
---

 Summary: Implement restore tests for Sort node
 Key: FLINK-34118
 URL: https://issues.apache.org/jira/browse/FLINK-34118
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: Bonnie Varghese
Assignee: Bonnie Varghese






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-417: Expose JobManagerOperatorMetrics via REST API

2024-01-16 Thread Mason Chen
Hi Xuyang and Hang,

Thanks for your support and feedback! See my responses below:

1. IIRC, in a sense, operator ID and vertex ID are the same thing. The
> operator ID can
> be converted from the vertex ID[1]. Therefore, it is somewhat strange to
> have both vertex
> ID and operator ID in a single URL.
>
I think Hang explained it perfectly. Essentially, a vertix may contain one
or more operators so the operator ID is required to distinguish this case.

2. If I misunderstood the semantics of operator IDs here, then what is the
> relationship
> between vertex ID and operator ID, and do we need a url like
> `/jobs//vertices//operators/`
> to list all operator ids under this vertices?
>
Good question, we definitely need expose operator IDs through the REST API
to make this usable. I'm looking at how users would currently discover the
vertex id to query. From the supported REST APIs [1], you can currently
obtain it from

1. `/jobs/`
2. `/jobs//plan`

>From the response of both these APIs, they include the vertex ids (the
vertices AND nodes fields), but not the operator ids. We would need to add
the logic to the plan generation [2]. The response is a little confusing
because there is a field in the vertices called "operator name". I propose
to add a new field called "operators" to the vertex response object, which
would be a list of objects with the structure

Operator
{
  "id": "THE-FLINK-GENERATED-ID"
}.

The JobManagerOperatorQueryScopeInfo has three fields: jobID, vertexID and
> operatorName. So we should use the operator name in the API.
> If you think we should use the operator id, there need be more changes
> about it.
>
I think we should use operator id since it uniquely identifies an
operator--on the contrary, the operator name does not (it may be empty or
repeated between operators by the user). I actually had a question on that
since you implemented the metric group. What's the reason we use operator
name currently? Could it also use operator id so we can match against the
id?

[1] https://nightlies.apache.org/flink/flink-docs-master/docs/ops/rest_api/
[2]
https://github.com/apache/flink/blob/416cb7aaa02c176e01485ff11ab4269f76b5e9e2/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonPlanGenerator.java#L53

Best,
Mason


On Thu, Jan 11, 2024 at 10:54 PM Hang Ruan  wrote:

> Hi, Mason.
>
> Thanks for driving this FLIP.
>
> The JobManagerOperatorQueryScopeInfo has three fields: jobID, vertexID and
> operatorName. So we should use the operator name in the API.
> If you think we should use the operator id, there need be more changes
> about it.
>
> About the Xuyang's questions, we add both vertexID and operatorID
> information because of the operator chain.
> A operator chain has a vertexID and contains many different operators. The
> operator information helps to distinguish them in the same operator chain.
>
> Best,
> Hang
>
>
> Xuyang  于2024年1月12日周五 10:21写道:
>
> > Hi, Mason.
> > Thanks for driving this Flip. I think it's important for external system
> > to be able to
> > perceive the metric of the operator coordinator. +1 for it.
> >
> >
> > I just have the following minor questions and am looking forward to your
> > reply. Please forgive
> > me if I have some misunderstandings.
> >
> >
> > 1. IIRC, in a sense, operator ID and vertex ID are the same thing. The
> > operator ID can
> > be converted from the vertex ID[1]. Therefore, it is somewhat strange to
> > have both vertex
> > ID and operator ID in a single URL.
> >
> >
> > 2. If I misunderstood the semantics of operator IDs here, then what is
> the
> > relationship
> > between vertex ID and operator ID, and do we need a url like
> > `/jobs//vertices//operators/`
> > to list all operator ids under this vertices?
> >
> >
> >
> >
> > [1]
> >
> https://github.com/apache/flink/blob/7bebd2d9fac517c28afc24c0c034d77cfe2b43a6/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/OperatorID.java#L40C27-L40C27
> >
> > --
> >
> > Best!
> > Xuyang
> >
> >
> >
> >
> >
> > At 2024-01-12 04:20:03, "Mason Chen"  wrote:
> > >Hi Devs,
> > >
> > >I'm opening this thread to discuss a short FLIP for exposing
> > >JobManagerOperatorMetrics via REST API [1].
> > >
> > >The current set of REST APIs make it impossible to query coordinator
> > >metrics. This FLIP proposes a new REST API to query the
> > >JobManagerOperatorMetrics.
> > >
> > >[1]
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-417%3A+Expose+JobManagerOperatorMetrics+via+REST+API
> > >
> > >Best,
> > >Mason
> >
>


Re: [DISCUSS] FLIP-419: Optimize multi-sink query plan generation

2024-01-16 Thread Jim Hughes
Hi Jeyhun,


Generally, I like the idea of speeding up the optimizer in the case of
multiple queries!


I am new to the optimizer, but I have a few comments / questions.



   1. StreamOptimizeContext may still be needed to pass the fact that we
   are optimizing a streaming query.  I don't think this class will go away
   completely.  (I agree it may become more simple if the kind or
   mini-batch configuration can be removed.)
   2. How are the mini-batch and changelog inference rules tightly coupled?
   I looked a little bit and I haven't seen any connection between them.  It
   seems like the changelog inference is what needs to run multiple times.
   3. I think your point about code complexity is unnecessary.
StreamOptimizeContext
   extends org.apache.calcite.plan.Context which is used an interface to pass
   information and objects through the Calcite stack.
   4. Is an alternative where the complexity of the changelog optimization
   can be moved into the `FlinkChangelogModeInferenceProgram`?  (If this is
   coupling between the mini-batch and changelog rules, then this would not
   make sense.)
   5. There are some other smaller refactorings.  I tried some of them
   here: https://github.com/apache/flink/pull/24108 Mostly, it is syntax
   and using lazy vals to avoid recomputing various things.  (Feel free to
   take whatever actually works; I haven't run the tests.)

Separately, folks on the Calcite dev list are thinking about multi-query
optimization:
https://lists.apache.org/thread/mcdqwrtpx0os54t2nn9vtk17spkp5o5k
https://issues.apache.org/jira/browse/CALCITE-6188

Cheers,


Jim

On Tue, Jan 16, 2024 at 5:45 PM Jeyhun Karimov  wrote:

> Hi devs,
>
> I’d like to start a discussion on FLIP-419: Optimize multi-sink query plan
> generation [1].
>
>
> Currently, the optimization process of multi-sink query plans are
> suboptimal: 1) it requires to go through the optimization process several
> times and 2) as a result of this some low-level code complexity is
> introduced on high level optimization classes such
> as StreamCommonSubGraphBasedOptimizer.
>
>
> To address this issue, this FLIP introduces  to decouple changelog and
> mini-batch interval inference from the main optimization process.
>
> Please find more details in the FLIP wiki document [1]. Looking forward to
> your feedback.
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-419%3A+Optimize+multi-sink+query+plan+generation
>
>
> Regards,
> Jeyhun Karimov
>


[DISCUSS] FLIP-419: Optimize multi-sink query plan generation

2024-01-16 Thread Jeyhun Karimov
Hi devs,

I’d like to start a discussion on FLIP-419: Optimize multi-sink query plan
generation [1].


Currently, the optimization process of multi-sink query plans are
suboptimal: 1) it requires to go through the optimization process several
times and 2) as a result of this some low-level code complexity is
introduced on high level optimization classes such
as StreamCommonSubGraphBasedOptimizer.


To address this issue, this FLIP introduces  to decouple changelog and
mini-batch interval inference from the main optimization process.

Please find more details in the FLIP wiki document [1]. Looking forward to
your feedback.

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-419%3A+Optimize+multi-sink+query+plan+generation


Regards,
Jeyhun Karimov


[jira] [Created] (FLINK-34117) CompactCoordinator for table file sink loses data upon job termination

2024-01-16 Thread Alexander Fedulov (Jira)
Alexander Fedulov created FLINK-34117:
-

 Summary: CompactCoordinator for table file sink loses data upon 
job termination
 Key: FLINK-34117
 URL: https://issues.apache.org/jira/browse/FLINK-34117
 Project: Flink
  Issue Type: Bug
Reporter: Alexander Fedulov
Assignee: Alexander Fedulov


CompactCoordinator accumulates data in currentInputFiles and only rolls them 
into inputFiles in snapshotState(). At the same time it relies on separately 
receiving checkpoint indications from the upstream operator via 
processElement() (EndCheckpoint). If the job terminates, the final 
EndCheckpoint can arrive before the snapshotState() gets called. This leads to 
data loss (all events in currentInputFiles get discarded).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [VOTE] Accept Flink CDC into Apache Flink

2024-01-16 Thread Gunnar Morling
+1 (non-binding)

Best,

--Gunnar


Am Mo., 15. Jan. 2024 um 18:24 Uhr schrieb Fabian Hueske :

> +1 (binding)
>
> Cheers, Fabian
>
>
> Timo Walther  schrieb am Mo., 15. Jan. 2024, 16:00:
>
> > +1 (binding)
> >
> > Cheers,
> > Timo
> >
> >
> > On 09.01.24 10:58, xiangyu feng wrote:
> > > +1 (non-binding)
> > >
> > > Regards,
> > > Xiangyu Feng
> > >
> > > Danny Cranmer  于2024年1月9日周二 17:50写道:
> > >
> > >> +1 (binding)
> > >>
> > >> Thanks,
> > >> Danny
> > >>
> > >> On Tue, Jan 9, 2024 at 9:31 AM Feng Jin 
> wrote:
> > >>
> > >>> +1 (non-binding)
> > >>>
> > >>> Best,
> > >>> Feng Jin
> > >>>
> > >>> On Tue, Jan 9, 2024 at 5:29 PM Yuxin Tan 
> > wrote:
> > >>>
> >  +1 (non-binding)
> > 
> >  Best,
> >  Yuxin
> > 
> > 
> >  Márton Balassi  于2024年1月9日周二 17:25写道:
> > 
> > > +1 (binding)
> > >
> > > On Tue, Jan 9, 2024 at 10:15 AM Leonard Xu 
> > >> wrote:
> > >
> > >> +1(binding)
> > >>
> > >> Best,
> > >> Leonard
> > >>
> > >>> 2024年1月9日 下午5:08,Yangze Guo  写道:
> > >>>
> > >>> +1 (non-binding)
> > >>>
> > >>> Best,
> > >>> Yangze Guo
> > >>>
> > >>> On Tue, Jan 9, 2024 at 5:06 PM Robert Metzger <
> > >> rmetz...@apache.org
> > 
> > >> wrote:
> > 
> >  +1 (binding)
> > 
> > 
> >  On Tue, Jan 9, 2024 at 9:54 AM Guowei Ma 
> > > wrote:
> > 
> > > +1 (binding)
> > > Best,
> > > Guowei
> > >
> > >
> > > On Tue, Jan 9, 2024 at 4:49 PM Rui Fan <1996fan...@gmail.com>
> >  wrote:
> > >
> > >> +1 (non-binding)
> > >>
> > >> Best,
> > >> Rui
> > >>
> > >> On Tue, Jan 9, 2024 at 4:41 PM Hang Ruan <
> > >>> ruanhang1...@gmail.com>
> > >> wrote:
> > >>
> > >>> +1 (non-binding)
> > >>>
> > >>> Best,
> > >>> Hang
> > >>>
> > >>> gongzhongqiang  于2024年1月9日周二
> > >>> 16:25写道:
> > >>>
> >  +1 non-binding
> > 
> >  Best,
> >  Zhongqiang
> > 
> >  Leonard Xu  于2024年1月9日周二 15:05写道:
> > 
> > > Hello all,
> > >
> > > This is the official vote whether to accept the Flink CDC
> > >>> code
> >  contribution
> > > to Apache Flink.
> > >
> > > The current Flink CDC code, documentation, and website can
> > >> be
> > > found here:
> > > code: https://github.com/ververica/flink-cdc-connectors <
> > > https://github.com/ververica/flink-cdc-connectors>
> > > docs: https://ververica.github.io/flink-cdc-connectors/ <
> > > https://ververica.github.io/flink-cdc-connectors/>
> > >
> > > This vote should capture whether the Apache Flink community
> > >>> is
> > >>> interested
> > > in accepting, maintaining, and evolving Flink CDC.
> > >
> > > Regarding my original proposal[1] in the dev mailing list,
> > >> I
> > > firmly
> >  believe
> > > that this initiative aligns perfectly with Flink. For the
> > >>> Flink
> >  community,
> > > it represents an opportunity to bolster Flink's competitive
> >  edge
> > > in
> > > streaming
> > > data integration, fostering the robust growth and
> > >> prosperity
> > >>> of
> > > the
> >  Apache
> > > Flink
> > > ecosystem. For the Flink CDC project, becoming a
> > >> sub-project
> > >>> of
> > >> Apache
> > > Flink
> > > means becoming an integral part of a neutral open-source
> > > community,
> > > capable of
> > > attracting a more diverse pool of contributors.
> > >
> > > All Flink CDC maintainers are dedicated to continuously
> > > contributing
> > >> to
> > > achieve
> > > seamless integration with Flink. Additionally, PMC members
> > >>> like
> > > Jark,
> > > Qingsheng,
> > > and I are willing to infacilitate the expansion of
> > >>> contributors
> > > and
> > > committers to
> > > effectively maintain this new sub-project.
> > >
> > > This is a "Adoption of a new Codebase" vote as per the
> > >> Flink
> > > bylaws
> > >>> [2].
> > > Only PMC votes are binding. The vote will be open at least
> > >> 7
> >  days
> > > (excluding weekends), meaning until Thursday January 18
> > >> 12:00
> > > UTC,
> > > or
> > > until we
> > > achieve the 2/3rd majority. We will follow the instructions
> > >>> in
> > > the
> > >>> Flink
> > > Bylaws
> > > in the case 

Re: [DISCUSS] FLIP-388: Support Dynamic Logger Level Adjustment

2024-01-16 Thread David Morávek
Hi Yuepeng,

Thanks for the FLIP! There was already quite a discussion on FLIP-210
[1][2], that has proposed more or less the same thing. FLIP was marked as
out of scope for Flink because underlying technologies already address it.

Are you aware of the effort? If yes, what has changed since then?

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-210%3A+Change+logging+level+dynamically+at+runtime
[2] https://lists.apache.org/thread/n8omkpjf1mk9jphx38b8tfrs4h3nxo3z

Best,
D.

On Tue, Jan 16, 2024 at 4:37 PM Yuepeng Pan  wrote:

> Hi all,
>
>
>
>
> I created the FLIP-388[1] to support dynamic logger level
> adjustment.
>
>
>
>
>  Comprehensive and detailed system logs(like debug, trace, all,
> etc.)
>
> could contribute to improved visibility of internal system execution
> information
>
> and also enhance the efficiency of program debugging. Flink currently only
>
> supports static log level configuration(like debug, trace, all, etc.) to
> help application
>
> debugging, which can lead to the following issues when using static log
> configuration:
>
>   1. A sharp increase in log volume, accelerating disk occupancy.
>
>  2. Potential risks of system performance degradation due to a large
> volume of log printing.
>
>  3. The need to simplify log configuration subsequently, which causes
> inevitably cause the program to restart.
>
>
>
>  Therefore, introducing a mechanism to dynamically adjust the online
> log output level
>
> in the event of debugging programs will be meaningful, which can complete
> the switch
>
> of log level configuration without restarting the program.
>
>
>
>
>  I really appreciate Fan Rui(CC'ed), Zhanghao Chen(CC'ed)  for
> providing some valuable help and suggestions.
>
>  Please refer to the FLIP[1] document for more details about the
> proposed design and implementation.
>
> We welcome any feedback and opinions on this proposal.
>
>
>
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-388%3A+Support+Dynamic+Logger+Level+Adjustment
>
> [2] https://issues.apache.org/jira/browse/FLINK-33320
>
>
>
>
> Best,
>
> Yuepeng Pan


[DISCUSS] FLIP-388: Support Dynamic Logger Level Adjustment

2024-01-16 Thread Yuepeng Pan
Hi all,




I created the FLIP-388[1] to support dynamic logger level adjustment.




 Comprehensive and detailed system logs(like debug, trace, all, etc.) 

could contribute to improved visibility of internal system execution 
information 

and also enhance the efficiency of program debugging. Flink currently only 

supports static log level configuration(like debug, trace, all, etc.) to help 
application 

debugging, which can lead to the following issues when using static log 
configuration:

  1. A sharp increase in log volume, accelerating disk occupancy.

 2. Potential risks of system performance degradation due to a large volume 
of log printing.

 3. The need to simplify log configuration subsequently, which causes 
inevitably cause the program to restart.

 

 Therefore, introducing a mechanism to dynamically adjust the online log 
output level 

in the event of debugging programs will be meaningful, which can complete the 
switch 

of log level configuration without restarting the program. 




 I really appreciate Fan Rui(CC'ed), Zhanghao Chen(CC'ed)  for providing 
some valuable help and suggestions. 

 Please refer to the FLIP[1] document for more details about the proposed 
design and implementation. 

We welcome any feedback and opinions on this proposal.




[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-388%3A+Support+Dynamic+Logger+Level+Adjustment
 

[2] https://issues.apache.org/jira/browse/FLINK-33320




Best,

Yuepeng Pan

[jira] [Created] (FLINK-34116) GlobalConfigurationTest.testInvalidStandardYamlFile fails

2024-01-16 Thread Matthias Pohl (Jira)
Matthias Pohl created FLINK-34116:
-

 Summary: GlobalConfigurationTest.testInvalidStandardYamlFile fails
 Key: FLINK-34116
 URL: https://issues.apache.org/jira/browse/FLINK-34116
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Configuration
Affects Versions: 1.19.0
Reporter: Matthias Pohl


[https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=56416=logs=f0ac5c25-1168-55a5-07ff-0e88223afed9=50bf7a25-bdc4-5e56-5478-c7b4511dde53=6274]
{code:java}
Jan 16 01:38:10 01:38:10.780 [ERROR] Failures: 
Jan 16 01:38:10 01:38:10.781 [ERROR]   
GlobalConfigurationTest.testInvalidStandardYamlFile:200 
Jan 16 01:38:10 Multiple Failures (1 failure)
Jan 16 01:38:10 -- failure 1 --
Jan 16 01:38:10 Expecting actual:
Jan 16 01:38:10   "java.lang.RuntimeException: Error parsing YAML configuration.
Jan 16 01:38:10 at 
org.apache.flink.configuration.GlobalConfiguration.loadYAMLResource(GlobalConfiguration.java:351)
Jan 16 01:38:10 at 
org.apache.flink.configuration.GlobalConfiguration.loadConfiguration(GlobalConfiguration.java:162)
Jan 16 01:38:10 at 
org.apache.flink.configuration.GlobalConfiguration.loadConfiguration(GlobalConfiguration.java:115)
Jan 16 01:38:10 at 
org.apache.flink.configuration.GlobalConfigurationTest.lambda$testInvalidStandardYamlFile$3(GlobalConfigurationTest.java:198)
Jan 16 01:38:10 at 
org.assertj.core.api.ThrowableAssert.catchThrowable(ThrowableAssert.java:63)
Jan 16 01:38:10 at 
org.assertj.core.api.AssertionsForClassTypes.catchThrowable(AssertionsForClassTypes.java:892)
Jan 16 01:38:10 at 
org.assertj.core.api.Assertions.catchThrowable(Assertions.java:1366)
Jan 16 01:38:10 at 
org.assertj.core.api.Assertions.assertThatThrownBy(Assertions.java:1210)
Jan 16 01:38:10 at 
org.apache.flink.configuration.GlobalConfigurationTest.testInvalidStandardYamlFile(GlobalConfigurationTest.java:198)
Jan 16 01:38:10 at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
Jan 16 01:38:10 at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
Jan 16 01:38:10 at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
Jan 16 01:38:10 at 
java.base/java.lang.reflect.Method.invoke(Method.java:566)
[...] {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34115) TableAggregateITCase.testFlagAggregateWithOrWithoutIncrementalUpdate fails

2024-01-16 Thread Matthias Pohl (Jira)
Matthias Pohl created FLINK-34115:
-

 Summary: 
TableAggregateITCase.testFlagAggregateWithOrWithoutIncrementalUpdate fails
 Key: FLINK-34115
 URL: https://issues.apache.org/jira/browse/FLINK-34115
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Runtime
Affects Versions: 1.19.0
Reporter: Matthias Pohl


[https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=56348=logs=de826397-1924-5900-0034-51895f69d4b7=f311e913-93a2-5a37-acab-4a63e1328f94=11613]
{code:java}
 Jan 14 01:20:01 01:20:01.949 [ERROR] Tests run: 18, Failures: 1, Errors: 0, 
Skipped: 0, Time elapsed: 29.07 s <<< FAILURE! -- in 
org.apache.flink.table.planner.runtime.stream.table.TableAggregateITCase
Jan 14 01:20:01 01:20:01.949 [ERROR] 
org.apache.flink.table.planner.runtime.stream.table.TableAggregateITCase.testFlagAggregateWithOrWithoutIncrementalUpdate
 -- Time elapsed: 0.518 s <<< FAILURE!
Jan 14 01:20:01 org.opentest4j.AssertionFailedError: 
Jan 14 01:20:01 
Jan 14 01:20:01 expected: List((true,6,1), (false,6,1), (true,6,1), (true,3,2), 
(false,6,1), (false,3,2), (true,6,1), (true,5,2), (false,6,1), (false,5,2), 
(true,8,1), (true,6,2), (false,8,1), (false,6,2), (true,8,1), (true,6,2))
Jan 14 01:20:01  but was: List((true,3,1), (false,3,1), (true,5,1), (true,3,2), 
(false,5,1), (false,3,2), (true,8,1), (true,5,2), (false,8,1), (false,5,2), 
(true,8,1), (true,5,2), (false,8,1), (false,5,2), (true,8,1), (true,6,2))
Jan 14 01:20:01 at 
sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
Jan 14 01:20:01 at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
Jan 14 01:20:01 at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
Jan 14 01:20:01 at 
org.apache.flink.table.planner.runtime.stream.table.TableAggregateITCase.checkRank$1(TableAggregateITCase.scala:122)
Jan 14 01:20:01 at 
org.apache.flink.table.planner.runtime.stream.table.TableAggregateITCase.testFlagAggregateWithOrWithoutIncrementalUpdate(TableAggregateITCase.scala:69)
Jan 14 01:20:01 at java.lang.reflect.Method.invoke(Method.java:498)
Jan 14 01:20:01 at 
java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
Jan 14 01:20:01 at 
java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
Jan 14 01:20:01 at 
java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
Jan 14 01:20:01 at 
java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
Jan 14 01:20:01 at 
java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
Jan 14 01:20:01 at 
java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
Jan 14 01:20:01 at 
java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
Jan 14 01:20:01 at 
java.util.Iterator.forEachRemaining(Iterator.java:116)
Jan 14 01:20:01 at 
scala.collection.convert.Wrappers$IteratorWrapper.forEachRemaining(Wrappers.scala:26)
Jan 14 01:20:01 at 
java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
Jan 14 01:20:01 at 
java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
Jan 14 01:20:01 at 
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
Jan 14 01:20:01 at 
java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
Jan 14 01:20:01 at 
java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
Jan 14 01:20:01 at 
java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
Jan 14 01:20:01 at 
java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485)
Jan 14 01:20:01 at 
java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:272)
Jan 14 01:20:01 at 
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
Jan 14 01:20:01 at 
java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
Jan 14 01:20:01 at 
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
Jan 14 01:20:01 at 
java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
Jan 14 01:20:01 at 
java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
Jan 14 01:20:01 at 
java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
Jan 14 01:20:01 at 
java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485)
Jan 14 01:20:01 at 
java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:189)
Jan 14 01:20:01 at 
java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
Jan 14 01:20:01 at 

Re: Flink SQL ANTI JOIN results with windowing in DataStream

2024-01-16 Thread Péter Váry
Turns out the issue was that the windows were not triggered for the joins.
For one of the tasks the watermark was advanced, so the fdFiles and the
tableFiles windows were triggered, but the global watermark was not
advanced (parallelism was 4). Since the global watermark was not advanced,
the join window did not fire.

Sorry about the false alarm.

Thanks,
Peter

Péter Váry  ezt írta (időpont: 2024. jan. 16.,
K, 13:27):

> Hi Team,
>
> I am working on Iceberg in process compaction, and trying to use SQL
> window join to compare 2 streams like this:
>
>
>
>
> *Table fsFiles = tEnv.sqlQuery("SELECT runId, location, window_start,
> window_end " +"FROM TABLE(" +"*
>
> *TUMBLE(" +"TABLE " + fileSystemFilesTable + "," + "
>   DESCRIPTOR(ts), " +*
>
> *"**INTERVAL '1' SECONDS))");*
>
>
>
> *Table tableFiles = tEnv.sqlQuery(*
>
> *"SELECT runId, location, window_start, window_end " +*
>
> *"FROM TABLE(" +*
>
> *"**TUMBLE(" +*
>
> *"TABLE " + **tableFilesTable** + "," +*
>
> *"DESCRIPTOR(ts), " +**"**INTERVAL '1'
> SECONDS))");*
>
>
> Then I print out these streams with the following code, I see the values
> in the logs:
>
> *tEnv.toDataStream(fsFiles).print("FS");*
> *tEnv.toDataStream(tableFiles).print("TS");*
>
>
> The result is:
>
> *FS:2> +I[1705405510802,
> file:/var/folders/19/xs17kb0j7dj0klq324_vj7scgn/T/junit13711198986865553391/db.db/test_table_with_pk/metadata/0-b717c629-bb71-48df-a30b-615aeb320aec.metadata.json,
> 2024-01-16T11:45:10, 2024-01-16T11:45:11]*
> *[..]*
> *TS:2> +I[1705405510802,
> file:/var/folders/19/xs17kb0j7dj0klq324_vj7scgn/T/junit13711198986865553391/db.db/test_table_unpartitioned/metadata/snap-532818363465442978-1-dc47e70d-82eb-490a-a21d-c032b88c3303.avro,
> 2024-01-16T11:45:10, 2024-01-16T11:45:11]*
> *[..]*
>
>
> So this is as I expected the 2 streams periodically emit the incoming data
> files with the runIds, timestamps.
>
> Now, I try to run an ANTI JOIN on these streams:
>
> *Table missingFiles = tEnv.sqlQuery(*
> *"   SELECT ts, location\n" +*
> *"   FROM (\n" +*
> *"   SELECT * FROM TABLE(TUMBLE(TABLE " + fsFiles + ",
> DESCRIPTOR(ts), INTERVAL '1' SECONDS))\n" +*
> *"   ) L WHERE L.location NOT IN (\n" +*
> *" SELECT location FROM (   \n" +*
> *"   SELECT * FROM TABLE(TUMBLE(TABLE " + tableFiles + ",
> DESCRIPTOR(ts), INTERVAL '1' SECONDS))\n" +*
> *" ) R WHERE L.window_start = R.window_start AND
> L.window_end = R.window_end)");*
>
>
> And event though there is some missing files based on the logs, I do not
> see any records in the logs for the missing table:
>
> *tEnv.toDataStream(missingFiles).print("MISSING");*
>
>
> Just for trying out a different way of solving/checking this, I tried to
> have a FULL JOIN to see how the join works:
>
> *Table joined = tEnv.sqlQuery(*
> *"SELECT fs_files.location AS fs_location,
> table_files.location AS table_location,\n" +*
> *"  COALESCE(fs_files.window_start, table_files.window_start)
> as window_start,\n" +*
> *"  COALESCE(fs_files.window_end, table_files.window_end) as
> window_end\n" +*
> *"  FROM (SELECT * FROM TABLE(TUMBLE(TABLE " + fsFiles + ",
> DESCRIPTOR(ts), INTERVAL '1' SECONDS))) fs_files\n" +*
> *"  LEFT JOIN (SELECT * FROM TABLE(TUMBLE(TABLE " + tableFiles
> + ", DESCRIPTOR(ts), INTERVAL '1' SECONDS))) table_files\n" +*
> *"  ON fs_files.location = table_files.location AND\n" +*
> *" fs_files.window_start = table_files.window_start AND
> \n" +*
> *" fs_files.window_end = table_files.window_end\n");*
>
>
> And there is nothing in the logs for the join either.
>
> I think I might miss something around the windowing, and my joined windows
> are not triggered with the complex queries, but I am stuck at the moment,
> so any help would be appreciated.
>
> Thanks,
> Peter
>


Flink SQL ANTI JOIN results with windowing in DataStream

2024-01-16 Thread Péter Váry
Hi Team,

I am working on Iceberg in process compaction, and trying to use SQL window
join to compare 2 streams like this:




*Table fsFiles = tEnv.sqlQuery("SELECT runId, location, window_start,
window_end " +"FROM TABLE(" +"*

*TUMBLE(" +"TABLE " + fileSystemFilesTable + "," + "
DESCRIPTOR(ts), " +*

*"**INTERVAL '1' SECONDS))");*



*Table tableFiles = tEnv.sqlQuery(*

*"SELECT runId, location, window_start, window_end " +*

*"FROM TABLE(" +*

*"**TUMBLE(" +*

*"TABLE " + **tableFilesTable** + "," +*

*"DESCRIPTOR(ts), " +**"**INTERVAL '1' SECONDS))");*


Then I print out these streams with the following code, I see the values in
the logs:

*tEnv.toDataStream(fsFiles).print("FS");*
*tEnv.toDataStream(tableFiles).print("TS");*


The result is:

*FS:2> +I[1705405510802,
file:/var/folders/19/xs17kb0j7dj0klq324_vj7scgn/T/junit13711198986865553391/db.db/test_table_with_pk/metadata/0-b717c629-bb71-48df-a30b-615aeb320aec.metadata.json,
2024-01-16T11:45:10, 2024-01-16T11:45:11]*
*[..]*
*TS:2> +I[1705405510802,
file:/var/folders/19/xs17kb0j7dj0klq324_vj7scgn/T/junit13711198986865553391/db.db/test_table_unpartitioned/metadata/snap-532818363465442978-1-dc47e70d-82eb-490a-a21d-c032b88c3303.avro,
2024-01-16T11:45:10, 2024-01-16T11:45:11]*
*[..]*


So this is as I expected the 2 streams periodically emit the incoming data
files with the runIds, timestamps.

Now, I try to run an ANTI JOIN on these streams:

*Table missingFiles = tEnv.sqlQuery(*
*"   SELECT ts, location\n" +*
*"   FROM (\n" +*
*"   SELECT * FROM TABLE(TUMBLE(TABLE " + fsFiles + ",
DESCRIPTOR(ts), INTERVAL '1' SECONDS))\n" +*
*"   ) L WHERE L.location NOT IN (\n" +*
*" SELECT location FROM (   \n" +*
*"   SELECT * FROM TABLE(TUMBLE(TABLE " + tableFiles + ",
DESCRIPTOR(ts), INTERVAL '1' SECONDS))\n" +*
*" ) R WHERE L.window_start = R.window_start AND
L.window_end = R.window_end)");*


And event though there is some missing files based on the logs, I do not
see any records in the logs for the missing table:

*tEnv.toDataStream(missingFiles).print("MISSING");*


Just for trying out a different way of solving/checking this, I tried to
have a FULL JOIN to see how the join works:

*Table joined = tEnv.sqlQuery(*
*"SELECT fs_files.location AS fs_location, table_files.location
AS table_location,\n" +*
*"  COALESCE(fs_files.window_start, table_files.window_start)
as window_start,\n" +*
*"  COALESCE(fs_files.window_end, table_files.window_end) as
window_end\n" +*
*"  FROM (SELECT * FROM TABLE(TUMBLE(TABLE " + fsFiles + ",
DESCRIPTOR(ts), INTERVAL '1' SECONDS))) fs_files\n" +*
*"  LEFT JOIN (SELECT * FROM TABLE(TUMBLE(TABLE " + tableFiles
+ ", DESCRIPTOR(ts), INTERVAL '1' SECONDS))) table_files\n" +*
*"  ON fs_files.location = table_files.location AND\n" +*
*" fs_files.window_start = table_files.window_start AND \n"
+*
*" fs_files.window_end = table_files.window_end\n");*


And there is nothing in the logs for the join either.

I think I might miss something around the windowing, and my joined windows
are not triggered with the complex queries, but I am stuck at the moment,
so any help would be appreciated.

Thanks,
Peter


Re: Re:[DISCUSS] FLIP-418: Show data skew score on Flink Dashboard

2024-01-16 Thread Kartoglu, Emre
Hi Xuyang,

Thanks for the feedback! Please find my response below.

> 1. How will the colors of vertics with high data skew scores be unified with 
> existing backpressure and high busyness
colors on the UI? Users should be able to distinguish at a glance which vertics 
in the entire job graph is skewed.

The current proposal does not suggest to change the colours of the vertices 
based on data skew. In another exchange with Rui, we touch on why data skew 
might not necessarily be bad (for instance if data skew is the designed 
behaviour). The colours are currently dedicated to the Busy/Backpressure 
metrics. I would not be keen on introducing another colour or using the same 
colours for data skew as I am not sure if that'll help or confuse users. I am 
also keen to keep the scope of this FLIP as minimal as possible with as few 
contentious points as possible. We could also revisit this point in future 
FLIPs, if it does not become a blocker for this one. Please let me know your 
thoughts.

2. Can you tell me that you prefer to unify Data Skew Score and Exception tab? 
In my opinion, Data Skew Score is in
the same category as the existing Backpressured and Busy metrics.

The FLIP does not propose to unify the Data Skew tab and the Exception tab. The 
proposed Data Skew tab would sit next to the Exception tab (but I'm not too 
opinionated on where it sits). Backpressure and Busy metrics are somewhat 
special in that they have high visibility thanks to the vertices changing 
colours based on their value. I agree that Data Skew is in the same category in 
that it can be used as an indicator of the job's health. I'm not sure if the 
suggestion here then is to not introduce a tab for data skew? I'd appreciate 
some clarification here.

Look forward to hearing your thoughts.

Emre


On 16/01/2024, 06:05, "Xuyang" mailto:xyzhong...@163.com>> wrote:


CAUTION: This email originated from outside of the organization. Do not click 
links or open attachments unless you can confirm the sender and know the 
content is safe.






Hi, Emre.




In large-scale production jobs, the phenomenon of data skew often occurs. 
Having an metric on the UI that
reflects data skew without the need for manual inspection of each vertex by 
clicking on them would be quite cool.
This could help users quickly identify problematic nodes, simplifying 
development and operations.




I'm mainly curious about two minor points:
1. How will the colors of vertics with high data skew scores be unified with 
existing backpressure and high busyness
colors on the UI? Users should be able to distinguish at a glance which vertics 
in the entire job graph is skewed.
2. Can you tell me that you prefer to unify Data Skew Score and Exception tab? 
In my opinion, Data Skew Score is in
the same category as the existing Backpressured and Busy metrics.




Looking forward to your reply.






--


Best!
Xuyang










At 2024-01-16 00:59:57, "Kartoglu, Emre" mailto:kar...@amazon.co.uk.inva>LID> wrote:
>Hello,
>
>I’m opening this thread to discuss a FLIP[1] to make data skew more visible on 
>Flink Dashboard.
>
>Data skew is currently not as visible as it should be. Users have to click 
>each operator and check how much data each sub-task is processing and compare 
>the sub-tasks against each other. This is especially cumbersome and 
>error-prone for jobs with big job graphs and high parallelism. I’m proposing 
>this FLIP to improve this.
>
>Kind regards,
>Emre
>
>[1] 
>https://cwiki.apache.org/confluence/display/FLINK/FLIP-418%3A+Show+data+skew+score+on+Flink+Dashboard
> 
>
>
>
>





Re: [DISCUSS] FLIP-418: Show data skew score on Flink Dashboard

2024-01-16 Thread Kartoglu, Emre
Hi Rui,

Thanks for the feedback. Please find my response below:

> The number_of_records_received_by_each_subtask is the total received records, 
> right?

No it's not the total. I understand why this is confusing. I had initially 
wanted to name it "the list of number of records received by each subtask". So 
its type is a list. Example: [10, 10, 10] => 3 sub-tasks and each one received 
10 records. 

In your example, you have subtasks with each one designed to receive records at 
different times of the day. I hadn't thought about this use case! 
So you would have a high data skew while 1 subtask is receiving all the data, 
but on average (say over 1-2 days) data skew would come down to 0 because all 
subtasks would have received their portion of the data.
I'm inclined to think that the current proposal might still be fair, as you do 
indeed have a skew by definition (but an intentional one). We can have a few 
ways forward:

0) We can keep the behaviour as proposed. My thoughts are that data skew is 
data skew, however intentional it may be. It is not necessarily bad, like in 
your example.

1) Show data skew based on the beginning of time (not a live/current score). I 
mentioned some downsides to this in the FLIP: If you break or fix your data 
skew recently, the historical data might hide the recent fix/breakage, and it 
is inconsistent with the other metrics shown on the vertices e.g. 
Backpressure/Busy metrics show the live/current score.

2) We can choose not to put data skew score on the vertices on the job graph. 
And instead just use the new proposed Data Skew tab which could show 
live/current skew score and the total data skew score from the beginning of job.

Keen to hear your thoughts.

Kind regards,
Emre


On 16/01/2024, 06:44, "Rui Fan" <1996fan...@gmail.com 
> wrote:


CAUTION: This email originated from outside of the organization. Do not click 
links or open attachments unless you can confirm the sender and know the 
content is safe.






Thanks Emre for driving this proposal!


It's very useful for troubleshooting.


I have a question:


The number_of_records_received_by_each_subtask is the
total received records, right?


I'm not sure whether we should check data skew based on
the latest duration period.


In the production, I found the the total received records of
all subtasks is balanced, but in the each time period, they
are skew.


For example, a flink job has `group by` or `keyBy` based on
hour field. It mean:
- In the 0-1 o'clock, subtaskA is busy, the rest of subtasks are idle.
- In the 1-2 o'clock, subtaskB is busy, the rest of subtasks are idle.
- Next hour, the busy subtask is changed.


Looking forward to your opinions~


Best,
Rui


On Tue, Jan 16, 2024 at 2:05 PM Xuyang mailto:xyzhong...@163.com>> wrote:


> Hi, Emre.
>
>
> In large-scale production jobs, the phenomenon of data skew often occurs.
> Having an metric on the UI that
> reflects data skew without the need for manual inspection of each vertex
> by clicking on them would be quite cool.
> This could help users quickly identify problematic nodes, simplifying
> development and operations.
>
>
> I'm mainly curious about two minor points:
> 1. How will the colors of vertics with high data skew scores be unified
> with existing backpressure and high busyness
> colors on the UI? Users should be able to distinguish at a glance which
> vertics in the entire job graph is skewed.
> 2. Can you tell me that you prefer to unify Data Skew Score and Exception
> tab? In my opinion, Data Skew Score is in
> the same category as the existing Backpressured and Busy metrics.
>
>
> Looking forward to your reply.
>
>
>
> --
>
> Best!
> Xuyang
>
>
>
>
>
> At 2024-01-16 00:59:57, "Kartoglu, Emre"  LID>
> wrote:
> >Hello,
> >
> >I’m opening this thread to discuss a FLIP[1] to make data skew more
> visible on Flink Dashboard.
> >
> >Data skew is currently not as visible as it should be. Users have to
> click each operator and check how much data each sub-task is processing and
> compare the sub-tasks against each other. This is especially cumbersome and
> error-prone for jobs with big job graphs and high parallelism. I’m
> proposing this FLIP to improve this.
> >
> >Kind regards,
> >Emre
> >
> >[1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-418%3A+Show+data+skew+score+on+Flink+Dashboard
>  
> 
> >
> >
> >
>





[jira] [Created] (FLINK-34114) Parse error while $internal.application.program-args contains '#' in Yarn/K8s Application Mode

2024-01-16 Thread jonasjc (Jira)
jonasjc created FLINK-34114:
---

 Summary: Parse error while $internal.application.program-args 
contains '#' in Yarn/K8s Application Mode
 Key: FLINK-34114
 URL: https://issues.apache.org/jira/browse/FLINK-34114
 Project: Flink
  Issue Type: Bug
  Components: API / Core
Affects Versions: 1.18.0
Reporter: jonasjc
 Fix For: 1.19.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] Support detecting table schema from external files.

2024-01-16 Thread Benchao Li
Thanks Yisha for bringing up this discussion. Schema inferring is a
very interesting and useful feature, especially when it comes to
formats with well defined schemas such as Protobuf/Parquet. I'm
looking forward to the FLIP.

Yisha Zhou  于2024年1月15日周一 16:29写道:
>
> Hi dev,
>
> Currently,  we are used to creating a table by listing all physical columns 
> or using like syntax to reuse the table schema in Catalogs.
> However, in our company there are many cases that the messages in the 
> external systems are with very complex schema. The worst
> case is that some protobuf data has even thousands of fields in it.
>
> In these cases, listing fields in the DDL will be a very hard work. Creating 
> and updating such complex schema in Catalogs will also cost a lot.
> Therefore, I’d like to introduce an ability for detecting table schema from 
> external files in DDL.
>
> A good precedent from SnowFlake[1] works like below:
>
> CREATE TABLE mytable
>   USING TEMPLATE (
> SELECT ARRAY_AGG(OBJECT_CONSTRUCT(*))
>   FROM TABLE(
> INFER_SCHEMA(
>   LOCATION=>'@mystage/json/',
>   FILE_FORMAT=>'my_json_format'
> )
>   ));
>
> The INFER_SCHEMA is a table function to 'automatically detects the file 
> metadata schema in a set of staged data files that contain
> semi-structured data and retrieves the column definitions.’ The files can be 
> in Parquet, Avro, ORC, JSON, and CSV.
>
> We don’t need to follow the syntax, but the functionality is exactly what I 
> want. In addition, the file can be more than just semi-structured data
> file. It can be metadata file. For example, a .proto file, a .thrift file.
>
> As it will be a big feature which deserves a FLIP to describe it in detail. 
> I'm forward to your feedback and suggestions before I start to do it.
>
> Best,
> Yisha
>
> [1]https://docs.snowflake.com/en/sql-reference/functions/infer_schema 
> 



-- 

Best,
Benchao Li


Re: [DISCUSS] FLIP-415: Introduce a new join operator to support minibatch

2024-01-16 Thread Benchao Li
shuai,

Thanks for the explanations, I understand the scenario you described
now. IIUC, this will be a rather rare case that need to disable
"compaction" when mini-batch is enabled, so I won't be against
introducing it. However, I would suggest to enable the "compaction" by
default (if mini-batch enabled), which will benefit most of use cases.
For others that have special requirements about the changelog semantic
(no compaction), they can disable compaction by themselves. WDYT?

> This is a relatively large optimization that may pose a significant
> risk of bugs, so I like to keep it from being enabled by default for
> now.
@Jingsong has raised an interesting point that for large optimization
or new features, we want to have an option for it and disable it by
default in case of the risk of bugs. I agree with it, mostly.
Currently there is no standard about whether a change is major or not,
which means we may run into a situation debating whether a change is
major or not. Anyway, it's an orthogonal topic to this discussion.

shuai xu  于2024年1月16日周二 13:14写道:
>
> Hi Benchao,
>
> Do you have any other questions about this issue?  Also, I would appreciate 
> your thoughts on the proposal to introduce the new option 
> 'table.exec.mini-batch.compact-changes-enabled'. I’m looking forward your 
> feedback.
>
> > 2024年1月12日 15:01,shuai xu  写道:
> >
> > Suppose we currently have a job that joins two CDC sources after 
> > de-duplicating them and the output is available for audit analysis, and the 
> > user turns off the parameter 
> > "table.exec.deduplicate.mini-batch.compact-changes-enabled" to ensure that 
> > it does not lose update details. If we don't introduce this parameter, 
> > after the user upgrades the version, some update details may be lost due to 
> > the mini-batch connection being enabled by default, resulting in distorted 
> > audit results.
> >
> >> 2024年1月11日 16:19,Benchao Li  写道:
> >>
> >>> the change might not be supposed for the downstream of the job which 
> >>> requires details of changelog
> >>
> >> Could you elaborate on this a bit? I've never met such kinds of
> >> requirements before, I'm curious what is the scenario that requires
> >> this.
> >>
> >> shuai xu  于2024年1月11日周四 13:08写道:
> >>>
> >>> Thanks for your response, Benchao.
> >>>
> >>> Here is my thought on the newly added option.
> >>> Users' current jobs are running on a version without minibatch join. If 
> >>> the existing option to enable minibatch join is utilized, then when 
> >>> users' jobs are migrated to the new version, the internal behavior of the 
> >>> join operation within the jobs will change. Although the semantic of 
> >>> changelog emitted by the Join operator is eventual consistency, the 
> >>> change might not be supposed for the downstream of the job which requires 
> >>> details of changelog. This newly added option also refers to 
> >>> 'table.exec.deduplicate.mini-batch.compact-changes-enabled'.
> >>>
> >>> As for the implementation,The new operator shares the state of the 
> >>> original operator and it merely has an additional minibatch for storing 
> >>> records to do some optimization. The storage remains consistent, and 
> >>> there is minor modification to the computational logic.
> >>>
> >>> Best,
> >>> Xu Shuai
> >>>
>  2024年1月10日 22:56,Benchao Li  写道:
> 
>  Thanks shuai for driving this, mini-batch Join is a very useful
>  optimization, +1 for the general idea.
> 
>  Regarding the configuration
>  "table.exec.stream.join.mini-batch-enabled", I'm not sure it's really
>  necessary. The semantic of changelog emitted by the Join operator is
>  eventual consistency, so there is no much difference between original
>  Join and mini-batch Join from this aspect. Besides, introducing more
>  options would make it more complex for users, harder to understand and
>  maintain, which we should be careful about.
> 
>  One thing about the implementation, could you make the new operator
>  share the same state definition with the original one?
> 
>  shuai xu  于2024年1月10日周三 21:23写道:
> >
> > Hi devs,
> >
> > I’d like to start a discussion on FLIP-415: Introduce a new join 
> > operator to support minibatch[1].
> >
> > Currently, when performing cascading connections in Flink, there is a 
> > pain point of record amplification. Every record join operator receives 
> > would trigger join process. However, if records of +I and -D matches , 
> > they could be folded to reduce two times of join process. Besides, 
> > records of  -U +U might output 4 records in which two records are 
> > redundant when encountering outer join .
> >
> > To address this issue, this FLIP introduces a new  
> > MiniBatchStreamingJoinOperator to achieve batch processing which could 
> > reduce number of outputting redundant messages and avoid unnecessary 
> > join processes.
> > A new option is added to 

Re:Re: [VOTE] FLIP-377: Support fine-grained configuration to control filter push down for Table/SQL Sources

2024-01-16 Thread Xuyang
+1 (non-binding)


--

Best!
Xuyang





在 2024-01-16 17:52:38,"Leonard Xu"  写道:
>+1 (binding)
>
>Best,
>Leonard
>
>> 2024年1月16日 下午5:40,Hang Ruan  写道:
>> 
>> +1 (non-binding)
>> 
>> Best,
>> Hang
>> 
>> Jiabao Sun  于2024年1月9日周二 19:39写道:
>> 
>>> Hi Devs,
>>> 
>>> I'd like to start a vote on FLIP-377: Support fine-grained configuration
>>> to control filter push down for Table/SQL Sources[1]
>>> which has been discussed in this thread[2].
>>> 
>>> The vote will be open for at least 72 hours unless there is an objection
>>> or not enough votes.
>>> 
>>> [1]
>>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=276105768
>>> [2] https://lists.apache.org/thread/nvxx8sp9jm009yywm075hoffr632tm7j
>>> 
>>> Best,
>>> Jiabao


Re: [VOTE] FLIP-377: Support fine-grained configuration to control filter push down for Table/SQL Sources

2024-01-16 Thread Leonard Xu
+1 (binding)

Best,
Leonard

> 2024年1月16日 下午5:40,Hang Ruan  写道:
> 
> +1 (non-binding)
> 
> Best,
> Hang
> 
> Jiabao Sun  于2024年1月9日周二 19:39写道:
> 
>> Hi Devs,
>> 
>> I'd like to start a vote on FLIP-377: Support fine-grained configuration
>> to control filter push down for Table/SQL Sources[1]
>> which has been discussed in this thread[2].
>> 
>> The vote will be open for at least 72 hours unless there is an objection
>> or not enough votes.
>> 
>> [1]
>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=276105768
>> [2] https://lists.apache.org/thread/nvxx8sp9jm009yywm075hoffr632tm7j
>> 
>> Best,
>> Jiabao



[jira] [Created] (FLINK-34113) Update flink-connector-elasticsearch to be compatible with updated SinkV2 interfaces

2024-01-16 Thread Martijn Visser (Jira)
Martijn Visser created FLINK-34113:
--

 Summary: Update flink-connector-elasticsearch to be compatible 
with updated SinkV2 interfaces
 Key: FLINK-34113
 URL: https://issues.apache.org/jira/browse/FLINK-34113
 Project: Flink
  Issue Type: Technical Debt
  Components: Connectors / ElasticSearch
Reporter: Martijn Visser
 Fix For: elasticsearch-3.2.0


Make sure that the connector is updated to deal with the new changes introduced 
in FLINK-33973



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [VOTE] FLIP-377: Support fine-grained configuration to control filter push down for Table/SQL Sources

2024-01-16 Thread Hang Ruan
+1 (non-binding)

Best,
Hang

Jiabao Sun  于2024年1月9日周二 19:39写道:

> Hi Devs,
>
> I'd like to start a vote on FLIP-377: Support fine-grained configuration
> to control filter push down for Table/SQL Sources[1]
> which has been discussed in this thread[2].
>
> The vote will be open for at least 72 hours unless there is an objection
> or not enough votes.
>
> [1]
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=276105768
> [2] https://lists.apache.org/thread/nvxx8sp9jm009yywm075hoffr632tm7j
>
> Best,
> Jiabao


[jira] [Created] (FLINK-34112) JavaCodeSplitter OOM

2024-01-16 Thread Junning Liang (Jira)
Junning Liang created FLINK-34112:
-

 Summary: JavaCodeSplitter OOM
 Key: FLINK-34112
 URL: https://issues.apache.org/jira/browse/FLINK-34112
 Project: Flink
  Issue Type: Bug
Reporter: Junning Liang


I writed a sql that has many case when syntax in FLINK 1.17 release version. 
But even if the client provides 8GB of memory, there is still an OOM exception:
{code:java}
16:38:51,975 ERROR org.apache.flink.client.didi.job.FlinkKubernetesJobClient
[] - Failed to execute flink 
job.org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: JavaCodeSplitter failed. This is a bug. Please file an issue.  
  at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
 ~[flink-dist_2.12-1.17.0-018.jar:1.17.0-018]at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
 ~[flink-dist_2.12-1.17.0-018.jar:1.17.0-018]at 
org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:158)
 ~[flink-dist_2.12-1.17.0-018.jar:1.17.0-018]at 
org.apache.flink.client.didi.job.FlinkKubernetesJobClient.internalSubmitJob(FlinkKubernetesJobClient.java:474)
 ~[flink-client-executor-core-1.17.0-011_pre.jar:?]at 
org.apache.flink.client.didi.job.FlinkKubernetesJobClient.submitStreamSQL(FlinkKubernetesJobClient.java:184)
 [flink-client-executor-core-1.17.0-011_pre.jar:?]at 
com.didichuxing.bigdata.flink.client.executor.k8s.core.component.FlinkJobK8sClientComponent.startStreamSql(FlinkJobK8sClientComponent.java:107)
 [flink-client-executor-core-1.17.0-011_pre.jar:?]at 
com.didichuxing.bigdata.flink.client.executor.k8s.core.FlinkK8sClientExecutor.handleStartFlinkJob(FlinkK8sClientExecutor.java:230)
 [flink-client-executor-core-1.17.0-011_pre.jar:?]at 
com.didichuxing.bigdata.flink.client.executor.k8s.core.FlinkK8sClientExecutor.handleStartJob(FlinkK8sClientExecutor.java:130)
 [flink-client-executor-core-1.17.0-011_pre.jar:?]at 
com.didichuxing.bigdata.flink.client.executor.k8s.core.FlinkK8sClientExecutor.main(FlinkK8sClientExecutor.java:55)
 [flink-client-executor-core-1.17.0-011_pre.jar:?]Caused by: 
java.lang.RuntimeException: JavaCodeSplitter failed. This is a bug. Please file 
an issue.at 
org.apache.flink.table.codesplit.JavaCodeSplitter.split(JavaCodeSplitter.java:37)
 ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018]at 
org.apache.flink.table.runtime.generated.GeneratedClass.(GeneratedClass.java:58)
 ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018]at 
org.apache.flink.table.runtime.generated.GeneratedOperator.(GeneratedOperator.java:43)
 ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018]at 
org.apache.flink.table.planner.codegen.OperatorCodeGenerator$.generateOneInputStreamOperator(OperatorCodeGenerator.scala:130)
 ~[flink-table-planner-loader-bundle-1.17.0-018.jar:1.17.0-018]at 
org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateCalcOperator(CalcCodeGenerator.scala:60)
 ~[flink-table-planner-loader-bundle-1.17.0-018.jar:1.17.0-018]at 
org.apache.flink.table.planner.codegen.CalcCodeGenerator.generateCalcOperator(CalcCodeGenerator.scala)
 ~[flink-table-planner-loader-bundle-1.17.0-018.jar:1.17.0-018]at 
org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecCalc.translateToPlanInternal(CommonExecCalc.java:100)
 ~[flink-table-planner-loader-bundle-1.17.0-018.jar:1.17.0-018]at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:161)
 ~[flink-table-planner-loader-bundle-1.17.0-018.jar:1.17.0-018]at 
org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:257)
 ~[flink-table-planner-loader-bundle-1.17.0-018.jar:1.17.0-018]at 
org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecUnion.translateToPlanInternal(CommonExecUnion.java:61)
 ~[flink-table-planner-loader-bundle-1.17.0-018.jar:1.17.0-018]at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:161)
 ~[flink-table-planner-loader-bundle-1.17.0-018.jar:1.17.0-018]at 
org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:257)
 ~[flink-table-planner-loader-bundle-1.17.0-018.jar:1.17.0-018]at 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.java:145)
 ~[flink-table-planner-loader-bundle-1.17.0-018.jar:1.17.0-018]at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:161)
 ~[flink-table-planner-loader-bundle-1.17.0-018.jar:1.17.0-018]at 
org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:85)
 ~[flink-table-planner-loader-bundle-1.17.0-018.jar:1.17.0-018]at 

[jira] [Created] (FLINK-34111) Add JSON_QUOTE and JSON_UNQUOTE function

2024-01-16 Thread Martijn Visser (Jira)
Martijn Visser created FLINK-34111:
--

 Summary: Add JSON_QUOTE and JSON_UNQUOTE function
 Key: FLINK-34111
 URL: https://issues.apache.org/jira/browse/FLINK-34111
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / API
Reporter: Martijn Visser


Escapes or unescapes a JSON string removing traces of offending characters that 
could prevent parsing.

Proposal:
- JSON_QUOTE: Quotes a string by wrapping it with double quote characters and 
escaping interior quote and other characters, then returning the result as a 
utf8mb4 string. Returns NULL if the argument is NULL.
- JSON_UNQUOTE: Unquotes value and returns the result as a string. Returns NULL 
if the argument is NULL. An error occurs if the value starts and ends with 
double quotes but is not a valid JSON string literal.

The following characters are reserved in JSON and must be properly escaped to 
be used in strings:

Backspace is replaced with \b
Form feed is replaced with \f
Newline is replaced with \n
Carriage return is replaced with \r
Tab is replaced with \t
Double quote is replaced with \"
Backslash is replaced with \\

This function exists in MySQL: 
- 
https://dev.mysql.com/doc/refman/8.0/en/json-creation-functions.html#function_json-quote
- 
https://dev.mysql.com/doc/refman/8.0/en/json-modification-functions.html#function_json-unquote

It's still open in Calcite CALCITE-3130



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34110) Bump janino to 3.1.11

2024-01-16 Thread Sergey Nuyanzin (Jira)
Sergey Nuyanzin created FLINK-34110:
---

 Summary: Bump janino to 3.1.11
 Key: FLINK-34110
 URL: https://issues.apache.org/jira/browse/FLINK-34110
 Project: Flink
  Issue Type: Technical Debt
  Components: Table SQL / Planner
Reporter: Sergey Nuyanzin
Assignee: Sergey Nuyanzin


Among others it fixes {{ArrayIndexOutOfBoundsException}} for unconditional 
loops which connot complete normally 
https://github.com/janino-compiler/janino/issues/208 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34109) FileSystem sink connector restore job from historical checkpoint failure

2024-01-16 Thread Sergey Paryshev (Jira)
Sergey Paryshev created FLINK-34109:
---

 Summary: FileSystem sink connector restore job from historical 
checkpoint failure
 Key: FLINK-34109
 URL: https://issues.apache.org/jira/browse/FLINK-34109
 Project: Flink
  Issue Type: Bug
  Components: Connectors / FileSystem
Affects Versions: 1.17.2, 1.16.3, 1.18.0, 1.15.4, 1.14.6, 1.13.6, 1.12.7
Reporter: Sergey Paryshev


FileSystem connector sink can't restore job from historical checkpoint (when 
MAX_RETAINED_CHECKPOINTS > 1 and restroing checkpoint is not last)
{code:java}
java.io.UncheckedIOException: java.io.FileNotFoundException: File 
file:/tmp/parquet-test/output/.uncompacted-part-81340e1d-9004-4ce2-a45c-628d17919bbf-0-1
 does not exist or the user running Flink ('user') has insufficient permissions 
to access it.
    at 
org.apache.flink.connector.file.table.stream.compact.CompactCoordinator.lambda$coordinate$1(CompactCoordinator.java:165)
 ~[classes/:?]
    at 
org.apache.flink.connector.file.table.BinPacking.pack(BinPacking.java:40) 
~[classes/:?]
    at 
org.apache.flink.connector.file.table.stream.compact.CompactCoordinator.lambda$coordinate$2(CompactCoordinator.java:175)
 ~[classes/:?]
    at java.util.HashMap.forEach(HashMap.java:1290) ~[?:1.8.0_312]
    at 
org.apache.flink.connector.file.table.stream.compact.CompactCoordinator.coordinate(CompactCoordinator.java:171)
 ~[classes/:?]
    at 
org.apache.flink.connector.file.table.stream.compact.CompactCoordinator.commitUpToCheckpoint(CompactCoordinator.java:153)
 ~[classes/:?]
    at 
org.apache.flink.connector.file.table.stream.compact.CompactCoordinator.processElement(CompactCoordinator.java:143)
 ~[classes/:?]
    at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:262)
 ~[classes/:?]
    at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:155)
 ~[classes/:?]
    at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:114)
 ~[classes/:?]
    at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
 ~[classes/:?]
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:554)
 ~[classes/:?]
    at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:245)
 ~[classes/:?]
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:848)
 ~[classes/:?]
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:797) 
~[classes/:?]
    at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:954)
 ~[classes/:?]
    at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:933) 
~[classes/:?]
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:747) 
~[classes/:?]
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) 
~[classes/:?]
    at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_312]
Caused by: java.io.FileNotFoundException: File 
file:/tmp/parquet-test/output/.uncompacted-part-81340e1d-9004-4ce2-a45c-628d17919bbf-0-1
 does not exist or the user running Flink ('user') has insufficient permissions 
to access it.
    at 
org.apache.flink.core.fs.local.LocalFileSystem.getFileStatus(LocalFileSystem.java:113)
 ~[classes/:?]
    at 
org.apache.flink.core.fs.SafetyNetWrapperFileSystem.getFileStatus(SafetyNetWrapperFileSystem.java:65)
 ~[classes/:?]
    at 
org.apache.flink.connector.file.table.stream.compact.CompactCoordinator.lambda$coordinate$1(CompactCoordinator.java:163)
 ~[classes/:?]
    ... 19 more {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34108) Add URL_ENCODE and URL_DECODE function

2024-01-16 Thread Martijn Visser (Jira)
Martijn Visser created FLINK-34108:
--

 Summary: Add URL_ENCODE and URL_DECODE function
 Key: FLINK-34108
 URL: https://issues.apache.org/jira/browse/FLINK-34108
 Project: Flink
  Issue Type: New Feature
  Components: Table SQL / API
Reporter: Martijn Visser


Add URL_ENCODE and URL_DECODE function

URL_ENCODE(str) - Translates a string into 'application/x-www-form-urlencoded' 
format using a specific encoding scheme. 
URL_DECODE(str) - Decodes a string in 'application/x-www-form-urlencoded' 
format using a specific encoding scheme. 

Related ticket from Calcite: CALCITE-5825



--
This message was sent by Atlassian Jira
(v8.20.10#820010)