Re: [DISCUSS] FLIP-306: Unified File Merging Mechanism for Checkpoints

2023-04-06 Thread Zakelly Lan
Hi Piotr,

Thanks for all the feedback.

(1) Thanks for the reminder. I have just seen the FLINK-24611, the delayed
deletion by JM resolves some sync problems between JM and TM, but I'm
afraid it is still not feasible for the file sharing in this FLIP.
Considering a concurrent checkpoint scenario as follows:
   a. Checkpoint 1 finishes. 1.sst, 2.sst and 3.sst are written in file 1,
and 4.sst is written in file 2.
   b. Checkpoint 2 starts based on checkpoint 1, including 1.sst, 2.sst and
5.sst.
   c. Checkpoint 3 starts based on checkpoint 1, including 1.sst, 2.sst and
5.sst as well.
   d. Checkpoint 3 reuses the file 2, TM writes 5.sst on it.
   e. Checkpoint 2 creates a new file 3, TM writes 5.sst on it.
   f. Checkpoint 2 finishes, checkpoint 1 is subsumed and the file 2 is
deleted, while checkpoint 3 still needs file 2.

I attached a diagram to describe the scenario.
[image: concurrent cp.jpg]
The core issue is that this FLIP introduces a mechanism that allows
physical files to be potentially used by the next several checkpoints. JM
is uncertain whether there will be a TM continuing to write to a specific
file. So in this FLIP, TMs take the responsibility to delete the physical
files.

(2) IIUC, the RecoverableWriter is introduced to persist data in the "in
progress" files after each checkpoint, and the implementation may be based
on the file sync in some file systems. However, since the sync is a heavy
operation for DFS, this FLIP wants to use flush instead of the sync with
the best effort. This only fits the case that the DFS is considered
reliable. The problems they want to solve are different.

(3) Yes, if files are managed by JM via the shared registry, this problem
is solved. And as I mentioned in (1), there are some other corner cases
hard to resolve via the shared registry.

The goal of this FLIP is to have a common way of merging files in all use
cases. For shared state it merges at subtask level, while for private state
(and changelog files, as I replied to Yanfei), files are merged at TM
level. So it is not contrary to the current plan for the unaligned
checkpoint state (FLINK-26803). You are right that the unaligned checkpoint
state would be merged with the operator's state file, so overall, it is
slightly better than what's currently done.


Thanks again for the valuable comments!

Best regards,
Zakelly



On Wed, Apr 5, 2023 at 8:43 PM Piotr Nowojski  wrote:

> Hi,
>
> Thanks for coming up with the proposal, it's definitely valuable. I'm still
> reading and trying to understand the proposal, but a couple of comments
> from my side.
>
> (1)
> > Ownership of a single checkpoint file is transferred to TM, while JM
> manages the parent directory of these files.
>
> Have you seen https://issues.apache.org/jira/browse/FLINK-24611 before? I
> don't fully remember why, but we have rejected the idea of moving the file
> ownership to TM and instead reworked the shared file registry in a way that
> I think should be sufficient for file sharing. Could you elaborate why we
> need to move the file ownership to TM, and why is the current mechanism not
> sufficient?
>
> (2)
> > File visibility is needed when a Flink job recovers after a checkpoint is
> materialized. In some DFS, such as most object storages, a file is only
> visible after it is closed
>
> Is that really the case?
> `org.apache.flink.core.fs.FileSystem#createRecoverableWriter` seems to be
> addressing exactly this issue, and the most frequently used FileSystem (S3)
> AFAIK supports it with no problems?
>
> (3)
> > 4.1.2 Merge files within a subtask or a TM
> > Given that TMs are reassigned after restoration, it is difficult to
> manage physical files that contain data from multiple subtasks scattered
> across different TMs (as depicted in Fig.3). There is no synchronization
> mechanism between TMs, making file management in this scenario challenging.
>
> I think this is solved in many places already via the shared state managed
> by the JM, as I mentioned in (1).
>
>
> If I understand it correctly you are proposing to have a common
> interface/way of merging small files, in all use cases, that would work
> only across a single subtask? That's contrary to what's currently done for
> unaligned checkpoints, right? But if this generic mechanism was to be used
> for unaligned checkpoints, unaligned checkpoint state would have been
> merged with the operators state file, so all in all there would be no
> regression visible to a user? The limit is that we always have at least a
> single file per subtask, but in exchange we are getting a simpler threading
> model?
>
> Bets,
> Piotrek
>
> wt., 4 kwi 2023 o 08:51 Zakelly Lan  napisał(a):
>
> > Hi Yanfei,
> >
> > Thank you for your prompt response.
> >
> > I agree that managing (deleting) only some folders with JM can greatly
> > relieve JM's burden. Thanks for pointing this out.
> >
> > In general, merging at the TM level is more effective since there are
> > usually more files to merge. Therefore,

Re: [DISCUSS] FLIP-302: Support TRUNCATE TABLE statement

2023-04-06 Thread yuxia
Hi everyone.

If there are no other questions or concerns for the FLIP[1], I'd like to start 
the vote next Monday (4.10).

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-302%3A+Support+TRUNCATE+TABLE+statement

Best regards,
Yuxia

- 原始邮件 -
发件人: "yuxia" 
收件人: "dev" 
发送时间: 星期五, 2023年 3 月 24日 上午 11:27:42
主题: Re: [DISCUSS] FLIP-302: Support TRUNCATE TABLE statement

Thanks all for your feedback.

@Shammon FY
My gut feeling is that the end user shouldn't care about whether it'll delete 
direcotry or move to Trash directory with the TRUNCATE TABLE statement. They 
only need to know it will delete all rows from a table.
To me, I think delete directory or move to trash is more likely to be a 
behavior of external storage level instead of SQL statement level. In Hive, if 
user configure Trash, it will then move files to trash for DROP statment.
Also, hardly did I see such usage with TRUNCATE TABLE statement in other 
engines. What's more, to support it, we have to extend the TRUNCATE TABLE synax 
which won't then compliant with SQL standard. I really don't want to do that 
and I believe it'll make user confused if we do so.

@Hang
`TRUNCATE TABLE` is meant to delete all rows of a base table. So, it makes no 
sense that table source implements it.
If user use TRUNCATE TABLE statement to truncate a table, the planner will only 
try to
find the DynamicTableSink for the corresponding table. 

@Ran Tao
1: Thanks for you reminder. I said it won't support view in the FLIP, but 
forget to said temporary table is also not supported. Now, I add this part to 
this FLIP.

2: Yes, I also considered to incldue it in this FLIP before. But as far as I 
see, I haven't seen much usage of truncate table with partition. It's not as 
useful as truncate table. So, I tend to keep this FLIP simple in here without 
supporting truncate table with partition.
Also, seems for `truncate table with partition`, differnet engines may have 
differernt syntax;
Hive[1]/Spark[2] use the following syntax:
TRUNCATE TABLE table_name [PARTITION partition_spec]

SqlServer[3] use the follwoing syntax:
TRUNCATE TABLE { database_name.schema_name.table_name | schema_name.table_name 
| table_name } [ WITH ( PARTITIONS ( {  |  }
So, I'm tend to be cautious about it.

But I'm open to this. If there's any feedback or strong requirement, I don't 
mind to add it in this FLIP.
If we do need it in some day, I can propose it in a new FLIP. It won't break 
the current design.

As for concrete syntax in the FLIP, I think the current one is the concrete 
syntax, we don't allow TABLE keyword to be optional.

3: Thanks for your reminder, I have updadted the FLIP for this.


[1]https://cwiki.apache.org/confluence/display/hive/languagemanual+ddl#LanguageManualDDL-TruncateTable
[2]https://spark.apache.org/docs/3.0.0-preview/sql-ref-syntax-ddl-truncate-table.html
[3]https://learn.microsoft.com/en-us/sql/t-sql/statements/truncate-table-transact-sql?view=sql-server-ver16



Best regards,
Yuxia

- 原始邮件 -
发件人: "Ran Tao" 
收件人: "dev" 
发送时间: 星期四, 2023年 3 月 23日 下午 6:28:17
主题: Re: [DISCUSS] FLIP-302: Support TRUNCATE TABLE statement

Hi, yuxia.

Thanks for starting the discussion.
I think it's a nice improvement to support TRUNCATE TABLE statement because
many other mature engines supports it.

I have some questions.
1. because table has different types, whether we will support view or
temporary tables?

2. some other engines such as spark and hive support TRUNCATE TABLE with
partition. whether we will support?
btw, i think you need give the TRUNCATE TABLE concrete syntax in the FLIP
because some engines has different syntaxes.
for example, hive allow TRUNCATE TABLE be TRUNCATE [TABLE] which means
TABLE keyword can be optional.

3. The Proposed Changes try to use SqlToOperationConverter and run in
TableEnvironmentImpl#executeInternal.
I think it's out of date, the community is refactoring the conversion logic
from SqlNode to operation[1] and executions in TableEnvironmentImpl[2].
I suggest you can use new way to support it.

[1] https://issues.apache.org/jira/browse/FLINK-31464
[2] https://issues.apache.org/jira/browse/FLINK-31368

Best Regards,
Ran Tao
https://github.com/chucheng92


yuxia  于2023年3月22日周三 21:13写道:

> Hi, devs.
>
> I'd like to start a discussion about FLIP-302: Support TRUNCATE TABLE
> statement [1].
>
> The TRUNCATE TABLE statement is a SQL command that allows users to quickly
> and efficiently delete all rows from a table without dropping the table
> itself. This statement is commonly used in data warehouse, where large data
> sets are frequently loaded and unloaded from tables.
> So, this FLIP is meant to support TRUNCATE TABLE statement. M ore exactly,
> this FLIP will bring Flink the TRUNCATE TABLE syntax and an interface with
> which the coresponding connectors can implement their own logic for
> truncating table.
>
> Looking forwards to your feedback.
>
> [1]: [
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-302%3A+Support+

[jira] [Created] (FLINK-31743) Avoid relocating the RocksDB's log failure when filename exceeds 255 characters

2023-04-06 Thread jinghaihang (Jira)
jinghaihang created FLINK-31743:
---

 Summary: Avoid relocating the RocksDB's log failure when filename 
exceeds 255 characters
 Key: FLINK-31743
 URL: https://issues.apache.org/jira/browse/FLINK-31743
 Project: Flink
  Issue Type: Bug
  Components: Runtime / State Backends
Affects Versions: 1.15.4, 1.16.1
Reporter: jinghaihang
 Fix For: 1.16.1


Since FLINK-24785 , the file name of the rocksdb LOG is generated by parsing 
the db path, when the db path is long and the filename exceeds 255 characters, 
the creation of the file will fail, so the relevant rocksdb LOG cannot be seen 
in the flink log dir.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31744) Extend Adaptive Scheduler sparse EG to contain maxParallelism

2023-04-06 Thread Chesnay Schepler (Jira)
Chesnay Schepler created FLINK-31744:


 Summary: Extend Adaptive Scheduler sparse EG to contain 
maxParallelism
 Key: FLINK-31744
 URL: https://issues.apache.org/jira/browse/FLINK-31744
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Coordination, Runtime / REST
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler
 Fix For: 1.18.0


When a job is WaitingForResources the adpative scheduler returns a sparse 
execution graph that omits many details that are only know at execution time 
(like subtasks).

We could include all JobVertex-level information though, which would cover 
things like the vertex id/name and the maxParallelism.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31745) Performance regression on serializerHeavyString since April 3rd

2023-04-06 Thread Martijn Visser (Jira)
Martijn Visser created FLINK-31745:
--

 Summary: Performance regression on serializerHeavyString since 
April 3rd
 Key: FLINK-31745
 URL: https://issues.apache.org/jira/browse/FLINK-31745
 Project: Flink
  Issue Type: Bug
Reporter: Martijn Visser
 Fix For: 1.18.0


serializerHeavyString baseline=241.682406 current_value=203.24132

http://codespeed.dak8s.net:8000/timeline/#/?exe=1&ben=serializerHeavyString&extr=on&quarts=on&equid=off&env=2&revs=200



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-306: Unified File Merging Mechanism for Checkpoints

2023-04-06 Thread Yun Tang
Hi Zakelly,

Thanks for driving this work!

I'm not sure did you ever read the discussion between Stephan, Roman, Piotr, 
Yuan and I in the design doc [1] in nearly two years ago.

From my understanding, your proposal is also a mixed state ownership: some 
states are owned by the TM while some are owned by the JM. If my memory is 
correct, we did not take the option-3 or option-5 in the design doc [1] for the 
code complexity when implements the 1st version of changelog state-backend.

Could you also compare the current FLIP with the proposals in the design 
doc[1]? From my understanding, we should at least consider to comapre with 
option-3 and option-5 as they are all mixed solutions.


[1] 
https://docs.google.com/document/d/1NJJQ30P27BmUvD7oa4FChvkYxMEgjRPTVdO1dHLl_9I/edit#

Best
Yun Tang


From: Zakelly Lan 
Sent: Thursday, April 6, 2023 16:38
To: dev@flink.apache.org 
Subject: Re: [DISCUSS] FLIP-306: Unified File Merging Mechanism for Checkpoints

Hi Piotr,

Thanks for all the feedback.

(1) Thanks for the reminder. I have just seen the FLINK-24611, the delayed 
deletion by JM resolves some sync problems between JM and TM, but I'm afraid it 
is still not feasible for the file sharing in this FLIP. Considering a 
concurrent checkpoint scenario as follows:
   a. Checkpoint 1 finishes. 1.sst, 2.sst and 3.sst are written in file 1, and 
4.sst is written in file 2.
   b. Checkpoint 2 starts based on checkpoint 1, including 1.sst, 2.sst and 
5.sst.
   c. Checkpoint 3 starts based on checkpoint 1, including 1.sst, 2.sst and 
5.sst as well.
   d. Checkpoint 3 reuses the file 2, TM writes 5.sst on it.
   e. Checkpoint 2 creates a new file 3, TM writes 5.sst on it.
   f. Checkpoint 2 finishes, checkpoint 1 is subsumed and the file 2 is 
deleted, while checkpoint 3 still needs file 2.

I attached a diagram to describe the scenario.
[concurrent cp.jpg]
The core issue is that this FLIP introduces a mechanism that allows physical 
files to be potentially used by the next several checkpoints. JM is uncertain 
whether there will be a TM continuing to write to a specific file. So in this 
FLIP, TMs take the responsibility to delete the physical files.

(2) IIUC, the RecoverableWriter is introduced to persist data in the "in 
progress" files after each checkpoint, and the implementation may be based on 
the file sync in some file systems. However, since the sync is a heavy 
operation for DFS, this FLIP wants to use flush instead of the sync with the 
best effort. This only fits the case that the DFS is considered reliable. The 
problems they want to solve are different.

(3) Yes, if files are managed by JM via the shared registry, this problem is 
solved. And as I mentioned in (1), there are some other corner cases hard to 
resolve via the shared registry.

The goal of this FLIP is to have a common way of merging files in all use 
cases. For shared state it merges at subtask level, while for private state 
(and changelog files, as I replied to Yanfei), files are merged at TM level. So 
it is not contrary to the current plan for the unaligned checkpoint state 
(FLINK-26803). You are right that the unaligned checkpoint state would be 
merged with the operator's state file, so overall, it is slightly better than 
what's currently done.


Thanks again for the valuable comments!

Best regards,
Zakelly



On Wed, Apr 5, 2023 at 8:43 PM Piotr Nowojski 
mailto:pnowoj...@apache.org>> wrote:
Hi,

Thanks for coming up with the proposal, it's definitely valuable. I'm still
reading and trying to understand the proposal, but a couple of comments
from my side.

(1)
> Ownership of a single checkpoint file is transferred to TM, while JM
manages the parent directory of these files.

Have you seen https://issues.apache.org/jira/browse/FLINK-24611 before? I
don't fully remember why, but we have rejected the idea of moving the file
ownership to TM and instead reworked the shared file registry in a way that
I think should be sufficient for file sharing. Could you elaborate why we
need to move the file ownership to TM, and why is the current mechanism not
sufficient?

(2)
> File visibility is needed when a Flink job recovers after a checkpoint is
materialized. In some DFS, such as most object storages, a file is only
visible after it is closed

Is that really the case?
`org.apache.flink.core.fs.FileSystem#createRecoverableWriter` seems to be
addressing exactly this issue, and the most frequently used FileSystem (S3)
AFAIK supports it with no problems?

(3)
> 4.1.2 Merge files within a subtask or a TM
> Given that TMs are reassigned after restoration, it is difficult to
manage physical files that contain data from multiple subtasks scattered
across different TMs (as depicted in Fig.3). There is no synchronization
mechanism between TMs, making file management in this scenario challenging.

I think this is solved in many places already via the shared state managed
by the JM, as I mentioned in (1).


If I underst

Re: [ANNOUNCE] Kafka Connector Code Removal from apache/flink:main branch and code freezing

2023-04-06 Thread Chesnay Schepler

Do we have to move Debezium _now_?

There is no hard dependency between debezium-json and the Kafka 
connector, nor does the format depend on Kafka afaict.
So is this only about the e2e test that uses debezium-json + kafka 
connector?


If so, then I would suggest to put debezium-json issue aside for now.
Move the e2e test to the connector repo, but leave the format in Flink.

We will still have reasonable test coverage, and can tackle this later 
on; but I wouldn't want to delay the Kafka externalization further 
because of this.


On 06/04/2023 06:29, Mason Chen wrote:

Hi all,

I am looking for feedback on removing Kafka code from apache/flink:master:
https://issues.apache.org/jira/browse/FLINK-30859. If it wasn't clear, this
involves moving more documentation, Kafka examples, Kafka related formats,
and test related code to the new repo.

Regarding the Kafka related formats, I would like to see if anyone has any
objections, as it is not as straightforward as moving modules to the
external repo. This required moving the confluent-avro and debezium-json
code from the flink-formats into new modules. The confluent-avro part is
straightforward and it is relocated under a new module called
flink-formats-kafka. However, the debezium-json code required moving only
the debezium part of flink-json and the artifactId has been changed to
"flink-json-debezium". The code for debezium format is unfortunately
scattered across multiple modules (flink-avro-confluent-registry +
flink-json) leading to this situation and it is not obvious since debezium
code doesn't have an explicit Kafka dependency. Thanks to Gordon for
catching this!

I would suggest:
1. Move the debezium part of flink-json to external repo.
2. Create a new module under the external repo called
flink-formats-kafka/flink-json-debezium where code will be located under
artifactId `flink-json-debezium`. Update documentation so SQL users refer
to this new artifact for the pluggable format.
3. Not remove debezium code from apache/flink:master to preserve backward
compatibility for the flink-json dependency. Mark the code as deprecated to
signal that new code contributions should go to the external repo.

Here is a PR [1] if you would like to see the changes for yourself. Any
objections?

[1] https://github.com/apache/flink-connector-kafka/pull/16

Best,
Mason

On Mon, Mar 27, 2023 at 8:26 AM Tzu-Li (Gordon) Tai 
wrote:


Thanks for the updates.

So far the above mentioned issues seem to all have PRs against
apache/flink-connector-kafka now.

To be clear, this notice isn't about discussing _what_ PRs we will be
merging or not merging - we should try to review all of them eventually.
The only reason I've made a list of PRs in the original notice is just to
make it visible which PRs we need to reopen against
apache/flink-connector-kafka due to the code removal.

Thanks,
Gordon

On Sun, Mar 26, 2023 at 7:07 PM Jacky Lau  wrote:


Hi Gordon. https://issues.apache.org/jira/browse/FLINK-31006, which is
also
a critical bug in kafka. it will not exit after all partitions consumed
when jobmanager failover in pipeline mode running unbounded source. and i
talked with   @PatrickRen  offline, don't
have a suitable way to fix it before. and we will solved it in this week

Shammon FY  于2023年3月25日周六 13:13写道:


Thanks Jing and Gordon, I have closed the pr
https://github.com/apache/flink/pull/21965 and will open a new one for
kafka connector


Best,
shammon FY


On Saturday, March 25, 2023, Ran Tao  wrote:


Thank you Gordon and all the people who have worked on the

externalized

kafka implementation.
I have another pr related to Kafka[1]. I will be very appreciative if

you

can help me review it in your free time.

[1] https://github.com/apache/flink-connector-kafka/pull/10

Best Regards,
Ran Tao


Tzu-Li (Gordon) Tai  于2023年3月24日周五 23:21写道:


Thanks Jing! I missed https://github.com/apache/flink/pull/21965

indeed.

Please let us know if anything else was overlooked.

On Fri, Mar 24, 2023 at 8:13 AM Jing Ge 
Thanks Gordon for driving this! There is another PR related to

Kafka

connector: https://github.com/apache/flink/pull/21965

Best regards,
Jing

On Fri, Mar 24, 2023 at 4:06 PM Tzu-Li (Gordon) Tai <

tzuli...@apache.org

wrote:


Hi all,

Now that Flink 1.17 has been released, and given that we've

already

synced

the latest Kafka connector code up to Flink 1.17 to the
apache/flink-connector-kafka repo (thanks to Mason and Martijn

for

most

of

the effort!), we're now in the final step of completely

removing

the

Kafka

connector code from apache/flink:main branch, tracked by

FLINK-30859

[1].

As such, we'd like to ask that no more Kafka connector changes

gets

merged

to apache/flink:main, effective now. Going forward, all Kafka

connector

PRs

should be opened directly against the

apache/flink-connector-kafka:

main

branch.

Meanwhile, there's a couple of "dangling" Kafka connector PRs

over

the

last

2 months that is opened

[jira] [Created] (FLINK-31746) Batch workload output completes while the job client fails

2023-04-06 Thread Haoze Wu (Jira)
Haoze Wu created FLINK-31746:


 Summary: Batch workload output completes while the job client fails
 Key: FLINK-31746
 URL: https://issues.apache.org/jira/browse/FLINK-31746
 Project: Flink
  Issue Type: Improvement
Affects Versions: 1.14.0
Reporter: Haoze Wu


We are doing testing on Flink-1.14.0 (We know 1.14.0 is not supported now so we 
are also testing Flink-1.17.0 to see if it has the same issue). We run a batch 
processing job. The input of the job is a file in the disk; the output of the 
job is a Kafka topic, which should receive 170 messages when the workload 
finishes. In the testing, we introduce a fault (an IOException) in a 
taskmanager, then the batch processing job client fails:

 
{code:java}
2023-03-26T19:05:48,922 ERROR cli.CliFrontend 
(CliFrontend.java:handleError(923)) - Error while running the 
command.org.apache.flink.client.program.ProgramInvocationException: The main 
method caused an error: 
org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 
85c9bd56d6dd111f858b4b5a99551c53) {code}
The IOException occurs in 
`BoundedBlockingSubpartitionDirectTransferReader$FileRegionReader` when running 
`FileChannel.open`. It has multiple chances to occur in a workload.

 

 
{code:java}
    FileRegionReader(Path filePath) throws IOException {
        this.fileChannel = FileChannel.open(filePath, StandardOpenOption.READ);
        this.headerBuffer = BufferReaderWriterUtil.allocatedHeaderBuffer();
    }
 {code}
The call stack of this fault site:
{code:java}
(org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartitionDirectTransferReader$FileRegionReader,,200),
 
(org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartitionDirectTransferReader,,74),
 
(org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartition,createReadView,221),
 
(org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition,createSubpartitionView,205),
 
(org.apache.flink.runtime.io.network.partition.ResultPartitionManager,createSubpartitionView,76),
 
(org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel,requestSubpartition,133),
 
(org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate,internalRequestPartitions,330),
 
(org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate,requestPartitions,299),
 
(org.apache.flink.runtime.taskmanager.InputGateWithMetrics,requestPartitions,127),
 
(org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1,runThrowing,50),
(org.apache.flink.streaming.runtime.tasks.mailbox.Mail,run,90), 
(org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor,processMailsNonBlocking,353),
 
(org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor,processMail,319),
 
(org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor,runMailboxLoop,201),
 (org.apache.flink.streaming.runtime.tasks.StreamTask,runMailboxLoop,809),
(org.apache.flink.streaming.runtime.tasks.StreamTask,invoke,761),
(org.apache.flink.runtime.taskmanager.Task,runWithSystemExitMonitoring,958),
(org.apache.flink.runtime.taskmanager.Task,restoreAndInvoke,937),
(org.apache.flink.runtime.taskmanager.Task,doRun,766),
(org.apache.flink.runtime.taskmanager.Task,run,575),
(java.lang.Thread,run,748) {code}
 

We inspect the name of the threads where the fault occurs, we find that our 
workload can be divided into these tasks:

Split Reader: Custom File Source -> Flat Map (1/8)#0
...
Split Reader: Custom File Source -> Flat Map (8/8)#0
Keyed Aggregation -> Map -> Sink Unnamed Writer (1/8)#0
...
Keyed Aggregation -> Map -> Sink Unnamed Writer (8/8)#0
Sink Unnamed Committer (1/1)#0

Such fault during “Split Reader” or “Keyed Aggregation” will trigger this “Job 
failed” message and our Kafka topic can’t receive the complete correct output 
(i.e., less than 170 messages). However, if the exception happens during “Sink 
Unnamed Committer”, the client still recognizes the “Job failed”, while our 
Kafka topic already completely got what it wants.

We assume that our workload is translated into a few steps: “Custom File Source 
-> Flat Map”, “Keyed Aggregation -> Map -> Sink Unnamed Writer”, and “Sink 
Unnamed Committer”. The last one is responsible for some “commit” for it does 
not affect our end-to-end results. However, the fault in the “commit” stage 
still reports a “failure” to the job client, while the job client may get 
confused.

We have some questions about the design rationales:
 # In some workloads such as our case, the “commit” at last seems not to matter 
that much. Can it be seen as tolerable?
 # The client log is confusing. It shows tons of exceptions but it does not 
show in which stage of the workload the failure happens. The most useful 
information for the client is something like “Sink Unnamed Committer (1/1)#0 
(7b19f0a2f247b8f38fe9141c9872ef58) switched f

[jira] [Created] (FLINK-31747) Externalize debezium from flink-json

2023-04-06 Thread Mason Chen (Jira)
Mason Chen created FLINK-31747:
--

 Summary: Externalize debezium from flink-json
 Key: FLINK-31747
 URL: https://issues.apache.org/jira/browse/FLINK-31747
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Kafka, Formats (JSON, Avro, Parquet, ORC, 
SequenceFile)
Affects Versions: 1.18.0
Reporter: Mason Chen


debezium code from Flink-json should move to the external Kafka repo. however, 
we need to ensure backward compatibility with dependencies referencing 
`Flink-json`.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31748) Adapt SplitFetcherManager#removeSplit for flink-connector-pulsar

2023-04-06 Thread Zili Chen (Jira)
Zili Chen created FLINK-31748:
-

 Summary: Adapt SplitFetcherManager#removeSplit for 
flink-connector-pulsar
 Key: FLINK-31748
 URL: https://issues.apache.org/jira/browse/FLINK-31748
 Project: Flink
  Issue Type: Sub-task
Reporter: Zili Chen
Assignee: Yufan Sheng






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31749) The Using Hadoop OutputFormats example is not avaliable for DataStream

2023-04-06 Thread junzhong qin (Jira)
junzhong qin created FLINK-31749:


 Summary: The Using Hadoop OutputFormats example is not avaliable 
for DataStream
 Key: FLINK-31749
 URL: https://issues.apache.org/jira/browse/FLINK-31749
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Affects Versions: 1.15.4, 1.17.0
Reporter: junzhong qin


The following example shows how to use Hadoop’s {{TextOutputFormat from the 
doc: 
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/datastream/formats/hadoop/#using-hadoop-outputformats
 . But the DataStream has no outpu()).}}
{code:java}
// Obtain the result we want to emit
DataStream> hadoopResult = [...]

// Set up the Hadoop TextOutputFormat.
HadoopOutputFormat hadoopOF =
  // create the Flink wrapper.
  new HadoopOutputFormat(
// set the Hadoop OutputFormat and specify the job.
new TextOutputFormat(), job
  );
hadoopOF.getConfiguration().set("mapreduce.output.textoutputformat.separator", 
" ");
TextOutputFormat.setOutputPath(job, new Path(outputPath));

// Emit data using the Hadoop TextOutputFormat.
hadoopResult.output(hadoopOF); {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31750) Hash Keys are duplicate when join reorder happens in stream mode

2023-04-06 Thread ZhengYi Weng (Jira)
ZhengYi Weng created FLINK-31750:


 Summary: Hash Keys are duplicate when join reorder happens in 
stream mode
 Key: FLINK-31750
 URL: https://issues.apache.org/jira/browse/FLINK-31750
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Reporter: ZhengYi Weng
 Attachments: image-2023-04-07-10-39-13-831.png

When I run  `JoinReorderTestBase#testAllInnerJoin` , I find hash keys are 
duplicate. !image-2023-04-07-10-39-13-831.png|width=571,height=263!

The reason why it happens is that when join reorder, the join condition will 
change and generate the same column condition, for example,the condition of T1 
join(T4 join T5)is a1 = a4 and a1 = a5.  It can de fixed if columns in  
`StreamPhysicalJoinRuleBase#onMatch#toHashTraitByColumns` are not duplicate.

I will fix it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-302: Support TRUNCATE TABLE statement

2023-04-06 Thread Jingsong Li
+1 for voting.

Best,
Jingsong

On Thu, Apr 6, 2023 at 4:52 PM yuxia  wrote:
>
> Hi everyone.
>
> If there are no other questions or concerns for the FLIP[1], I'd like to 
> start the vote next Monday (4.10).
>
> [1] 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-302%3A+Support+TRUNCATE+TABLE+statement
>
> Best regards,
> Yuxia
>
> - 原始邮件 -
> 发件人: "yuxia" 
> 收件人: "dev" 
> 发送时间: 星期五, 2023年 3 月 24日 上午 11:27:42
> 主题: Re: [DISCUSS] FLIP-302: Support TRUNCATE TABLE statement
>
> Thanks all for your feedback.
>
> @Shammon FY
> My gut feeling is that the end user shouldn't care about whether it'll delete 
> direcotry or move to Trash directory with the TRUNCATE TABLE statement. They 
> only need to know it will delete all rows from a table.
> To me, I think delete directory or move to trash is more likely to be a 
> behavior of external storage level instead of SQL statement level. In Hive, 
> if user configure Trash, it will then move files to trash for DROP statment.
> Also, hardly did I see such usage with TRUNCATE TABLE statement in other 
> engines. What's more, to support it, we have to extend the TRUNCATE TABLE 
> synax which won't then compliant with SQL standard. I really don't want to do 
> that and I believe it'll make user confused if we do so.
>
> @Hang
> `TRUNCATE TABLE` is meant to delete all rows of a base table. So, it makes no 
> sense that table source implements it.
> If user use TRUNCATE TABLE statement to truncate a table, the planner will 
> only try to
> find the DynamicTableSink for the corresponding table.
>
> @Ran Tao
> 1: Thanks for you reminder. I said it won't support view in the FLIP, but 
> forget to said temporary table is also not supported. Now, I add this part to 
> this FLIP.
>
> 2: Yes, I also considered to incldue it in this FLIP before. But as far as I 
> see, I haven't seen much usage of truncate table with partition. It's not as 
> useful as truncate table. So, I tend to keep this FLIP simple in here without 
> supporting truncate table with partition.
> Also, seems for `truncate table with partition`, differnet engines may have 
> differernt syntax;
> Hive[1]/Spark[2] use the following syntax:
> TRUNCATE TABLE table_name [PARTITION partition_spec]
>
> SqlServer[3] use the follwoing syntax:
> TRUNCATE TABLE { database_name.schema_name.table_name | 
> schema_name.table_name | table_name } [ WITH ( PARTITIONS ( { 
>  |  }
> So, I'm tend to be cautious about it.
>
> But I'm open to this. If there's any feedback or strong requirement, I don't 
> mind to add it in this FLIP.
> If we do need it in some day, I can propose it in a new FLIP. It won't break 
> the current design.
>
> As for concrete syntax in the FLIP, I think the current one is the concrete 
> syntax, we don't allow TABLE keyword to be optional.
>
> 3: Thanks for your reminder, I have updadted the FLIP for this.
>
>
> [1]https://cwiki.apache.org/confluence/display/hive/languagemanual+ddl#LanguageManualDDL-TruncateTable
> [2]https://spark.apache.org/docs/3.0.0-preview/sql-ref-syntax-ddl-truncate-table.html
> [3]https://learn.microsoft.com/en-us/sql/t-sql/statements/truncate-table-transact-sql?view=sql-server-ver16
>
>
>
> Best regards,
> Yuxia
>
> - 原始邮件 -
> 发件人: "Ran Tao" 
> 收件人: "dev" 
> 发送时间: 星期四, 2023年 3 月 23日 下午 6:28:17
> 主题: Re: [DISCUSS] FLIP-302: Support TRUNCATE TABLE statement
>
> Hi, yuxia.
>
> Thanks for starting the discussion.
> I think it's a nice improvement to support TRUNCATE TABLE statement because
> many other mature engines supports it.
>
> I have some questions.
> 1. because table has different types, whether we will support view or
> temporary tables?
>
> 2. some other engines such as spark and hive support TRUNCATE TABLE with
> partition. whether we will support?
> btw, i think you need give the TRUNCATE TABLE concrete syntax in the FLIP
> because some engines has different syntaxes.
> for example, hive allow TRUNCATE TABLE be TRUNCATE [TABLE] which means
> TABLE keyword can be optional.
>
> 3. The Proposed Changes try to use SqlToOperationConverter and run in
> TableEnvironmentImpl#executeInternal.
> I think it's out of date, the community is refactoring the conversion logic
> from SqlNode to operation[1] and executions in TableEnvironmentImpl[2].
> I suggest you can use new way to support it.
>
> [1] https://issues.apache.org/jira/browse/FLINK-31464
> [2] https://issues.apache.org/jira/browse/FLINK-31368
>
> Best Regards,
> Ran Tao
> https://github.com/chucheng92
>
>
> yuxia  于2023年3月22日周三 21:13写道:
>
> > Hi, devs.
> >
> > I'd like to start a discussion about FLIP-302: Support TRUNCATE TABLE
> > statement [1].
> >
> > The TRUNCATE TABLE statement is a SQL command that allows users to quickly
> > and efficiently delete all rows from a table without dropping the table
> > itself. This statement is commonly used in data warehouse, where large data
> > sets are frequently loaded and unloaded from tables.
> > So, this FLIP is meant to support TRUNCATE TABLE statement.