[jira] [Created] (FLINK-25496) ThreadDumpInfoTest.testComparedWithDefaultJDKImplemetation failed on azure

2021-12-30 Thread Yun Gao (Jira)
Yun Gao created FLINK-25496:
---

 Summary:  
ThreadDumpInfoTest.testComparedWithDefaultJDKImplemetation failed on azure
 Key: FLINK-25496
 URL: https://issues.apache.org/jira/browse/FLINK-25496
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Task
Reporter: Yun Gao
 Fix For: 1.15.0


 
{code:java}
Dec 31 02:53:26 [ERROR] Failures: 
Dec 31 02:53:26 [ERROR]   
ThreadDumpInfoTest.testComparedWithDefaultJDKImplemetation:66 expected:<"main" 
[prio=5 ]Id=1 RUNNABLE
Dec 31 02:53:26 at ja...> but was:<"main" []Id=1 RUNNABLE
Dec 31 02:53:26 at ja...>
Dec 31 02:53:26 [INFO] 
Dec 31 02:53:26 [ERROR] Tests run: 5958, Failures: 1, Errors: 0, Skipped: 26
{code}
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=28779=logs=b0a398c0-685b-599c-eb57-c8c2a771138e=747432ad-a576-5911-1e2a-68c6bedc248a=13859

 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25495) Client support attach mode when using the deployment of application mode

2021-12-30 Thread Junfan Zhang (Jira)
Junfan Zhang created FLINK-25495:


 Summary: Client support attach mode when using the deployment of 
application mode
 Key: FLINK-25495
 URL: https://issues.apache.org/jira/browse/FLINK-25495
 Project: Flink
  Issue Type: Improvement
  Components: Client / Job Submission
Reporter: Junfan Zhang






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: [VOTE] Apache Flink ML Release 2.0.0, release candidate #3

2021-12-30 Thread Zhipeng Zhang
+1 (non-binding)

- Verified that the checksums and GPG files match the corresponding release
files
- Verified that the source distributions do not contain any binaries
- Built the source distribution with Maven to ensure all source files have
Apache headers
- Verified that all POM files point to the same version
- Verified that the README.md file does not have anything unexpected
- Verified the NOTICE and LICENSE follows the rules
- Checked JIRA release notes
- Checked source code tag "release-2.0.0-rc3"


Dong Lin  于2021年12月31日周五 09:09写道:

> +1 (non-binding)
>
> - Verified that the checksums and GPG files match the corresponding release
> files
> - Verified that the source distributions do not contain any binaries
> - Built the source distribution with Maven to ensure all source files have
> Apache headers
> - Verified that all POM files point to the same version
> - Verified that the README.md file does not have anything unexpected
> - Verified the NOTICE and LICENSE follows the rules specified in the wiki
> <
> https://cwiki.apache.org/confluence/display/FLINK/Verifying+a+Flink+ML+Release
> >
> .
> - Checked JIRA release notes
> - Checked source code tag "release-2.0.0-rc2"
> - Checked flink-web PR
>
>
>
> On Fri, Dec 31, 2021 at 1:24 AM Yun Gao  wrote:
>
> > Hi everyone,
> >
> >
> >
> > Please review and vote on the release candidate #3 for the version 2.0.0
> > of Apache Flink ML,
> >
> > as follows:
> >
> > [ ] +1, Approve the release
> >
> > [ ] -1, Do not approve the release (please provide specific comments)
> >
> >
> >
> > **Testing Guideline**
> >
> >
> >
> > You can find here [1] a page in the project wiki on instructions for
> > testing. To cast a vote, it is not necessary to perform all listed
> checks,
> > but please mention which checks you have performed when voting.
> >
> >
> >
> > **Release Overview**
> >
> >
> >
> > As an overview, the release consists of the following:
> >
> > a) Flink ML source release to be deployed to dist.apache.org
> >
> > b) Flink ML Python source distributions to be deployed to PyPI
> >
> > c) Maven artifacts to be deployed to the Maven Central Repository
> >
> >
> >
> > **Staging Areas to Review**
> >
> >
> >
> > The staging areas containing the above mentioned artifacts are as
> follows,
> > for your review:
> >
> > * All artifacts for a) and b) can be found in the corresponding dev
> > repository at dist.apache.org [2], which are signed with the key with
> > fingerprint CBE82BEFD827B08AFA843977EDBF922A7BC84897 [3]
> >
> > * All artifacts for c) can be found at the Apache Nexus Repository [4]
> >
> >
> >
> > Other links for your review:
> >
> > * JIRA release notes [5]
> >
> > * Source code tag "release-2.0.0-rc3" [6]
> >
> > * PR to update the website Downloads page to include Flink ML links [7]
> >
> >
> >
> > **Vote Duration**
> >
> >
> >
> > The voting time will run for at least 72 hours.
> >
> > It is adopted by majority approval, with at least 3 PMC affirmative
> votes.
> >
> >
> >
> > Thanks,
> >
> > Dong and Yun
> >
> >
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/display/FLINK/Verifying+a+Flink+ML+Release
> >
> > [2] https://dist.apache.org/repos/dist/dev/flink/flink-ml-2.0.0-rc3
> >
> > [3]  https://dist.apache.org/repos/dist/release/flink/KEYS
> >
> > [4]
> https://repository.apache.org/content/repositories/orgapacheflink-1477
> >
> > [5]
> >
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12351079
> >
> > [6] https://github.com/apache/flink-ml/releases/tag/release-2.0.0-rc3
> >
> > [7] https://github.com/apache/flink-web/pull/493
> >
>


-- 
best,
Zhipeng


[jira] [Created] (FLINK-25494) Duplicate element serializer during DefaultOperatorStateBackendSnapshotStrategy#syncPrepareResources

2021-12-30 Thread Yun Tang (Jira)
Yun Tang created FLINK-25494:


 Summary: Duplicate element serializer during 
DefaultOperatorStateBackendSnapshotStrategy#syncPrepareResources
 Key: FLINK-25494
 URL: https://issues.apache.org/jira/browse/FLINK-25494
 Project: Flink
  Issue Type: Bug
  Components: Runtime / State Backends
Reporter: Yun Tang
Assignee: Yun Tang
 Fix For: 1.15.0, 1.13.6, 1.14.3


Currently, during 
DefaultOperatorStateBackendSnapshotStrategy#syncPrepareResources, it will copy 
the array list serializer via PartitionableListState#deepCopy. However, it just 
initialize another ArrayListSerializer and not duplicate the internal state 
serializer:

 

See "{{internalListCopySerializer}}":
 
{code:java}
private PartitionableListState(
RegisteredOperatorStateBackendMetaInfo stateMetaInfo, ArrayList 
internalList) {

this.stateMetaInfo = Preconditions.checkNotNull(stateMetaInfo);
this.internalList = Preconditions.checkNotNull(internalList);
this.internalListCopySerializer =
new 
ArrayListSerializer<>(stateMetaInfo.getPartitionStateSerializer());
} {code}
 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25493) 日期类型数据为空导致SQL校验失败原因为null

2021-12-30 Thread kunghsu (Jira)
kunghsu created FLINK-25493:
---

 Summary: 日期类型数据为空导致SQL校验失败原因为null
 Key: FLINK-25493
 URL: https://issues.apache.org/jira/browse/FLINK-25493
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / API
Affects Versions: 1.12.3
Reporter: kunghsu
 Attachments: image-2021-12-31-14-03-47-795.png

日期类型数据为空导致SQL校验失败原因为null

假如数据表中date类型的数值为空,在执行sql 
query时,会发现异常org.apache.flink.table.api.ValidationException: SQL validation 
failed. null

然后在cause by下发现了一个空指针异常,如下:

!image-2021-12-31-14-03-47-795.png!

 

深入排查后发现是因为数据表中date类型的数据为NULL,导致了上面的现象。

猜测是SQL执行过程中,遇到null无法处理,没有做好非空判断,导致了空指针。

但问题的关键是,出现这种数据错误,不应该提示 SQL validation failed. null

这种报错提示很容易让人误以为SQL本身有问题,但SQL其实是正确的。

 

建议:

优化这种null数据导致的错误提示,方便定位问题

 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: [DISCUSS] FLIP-206: Support PyFlink Runtime Execution in Thread Mode

2021-12-30 Thread Wei Zhong
Hi Xingbo,

Thanks for creating this FLIP. Big +1 for it!

I have some question about the Thread Mode:

1. It seems that we dynamically load an embedded Python and user dependencies 
in the TM process. Can they be uninstalled cleanly after the task finished? 
i.e. Can we use the Thread Mode in session mode and Pyflink shell?

2. Does one TM have only one embedded Python running at the same time? If all 
the Python operator in the TM share the same PVM, will there be a loss in 
performance?

3. How do we load the relevant c library if the python.executable is provided 
by users? May there be a risk of version conflicts?

Best,
Wei


> 2021年12月29日 上午11:56,Xingbo Huang  写道:
> 
> Hi everyone,
> 
> I would like to start a discussion thread on "Support PyFlink Runtime
> Execution in Thread Mode"
> 
> We have provided PyFlink Runtime framework to support Python user-defined
> functions since Flink 1.10. The PyFlink Runtime framework is called Process
> Mode, which depends on an inter-process communication architecture based on
> the Apache Beam Portability framework. Although starting a dedicated
> process to execute Python user-defined functions could have better resource
> isolation, it will bring greater resource and performance overhead.
> 
> In order to overcome the resource and performance problems on Process Mode,
> we will propose a new execution mode which executes Python user-defined
> functions in the same thread instead of a separate process.
> 
> I have drafted the FLIP-206[1]. Please feel free to reply to this email
> thread. Looking forward to your feedback!
> 
> Best,
> Xingbo
> 
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-206%3A+Support+PyFlink+Runtime+Execution+in+Thread+Mode



[jira] [Created] (FLINK-25492) Changelog with InMemoryStateChangelogStorage cannot work well

2021-12-30 Thread Hangxiang Yu (Jira)
Hangxiang Yu created FLINK-25492:


 Summary: Changelog with InMemoryStateChangelogStorage cannot work 
well
 Key: FLINK-25492
 URL: https://issues.apache.org/jira/browse/FLINK-25492
 Project: Flink
  Issue Type: Bug
  Components: Runtime / State Backends
Reporter: Hangxiang Yu
 Fix For: 1.15.0
 Attachments: changelog_in-memory_storage_bug.patch

while using ChangelogStataBackend with InMemoryStateChangelogStorage, the seq 
will be increased when append a record. The materialization will materilalize 
the record into delegated state backend. But after the materialization 
finished, the persist method still puts the record into 
InMemoryChangelogStateHandle which is incorrect.
Just as the patch shows, ChangelogDelegateHashMapInMemoryTest will fail because 
when restore, the medata cannot be found and the record that should't be there 
is restored firstly.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25491) Code generation: init method exceeds 64 KB when there is a large IN filter with Table API

2021-12-30 Thread Daniel Cheng (Jira)
Daniel Cheng created FLINK-25491:


 Summary: Code generation: init method exceeds 64 KB when there is 
a large IN filter with Table API
 Key: FLINK-25491
 URL: https://issues.apache.org/jira/browse/FLINK-25491
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / API
Affects Versions: 1.14.2
Reporter: Daniel Cheng


When using Table API, if you are filtering using an IN filter with a lot of 
values, e.g. `$(colName).in()`, it will result in the error

```

Code of method "(...)V" of class "BatchExecCal$3006" grows beyond 64 KB

```

The size of the init method mainly comes from the below method, which 
initializes the hash set with all the values in the filter.

[https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGeneratorContext.scala#L409]

 

This affects older versions as well, with 1.14.2 being the latest version that 
exhibits this issue.

 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: [VOTE] Apache Flink ML Release 2.0.0, release candidate #3

2021-12-30 Thread Dong Lin
+1 (non-binding)

- Verified that the checksums and GPG files match the corresponding release
files
- Verified that the source distributions do not contain any binaries
- Built the source distribution with Maven to ensure all source files have
Apache headers
- Verified that all POM files point to the same version
- Verified that the README.md file does not have anything unexpected
- Verified the NOTICE and LICENSE follows the rules specified in the wiki

.
- Checked JIRA release notes
- Checked source code tag "release-2.0.0-rc2"
- Checked flink-web PR



On Fri, Dec 31, 2021 at 1:24 AM Yun Gao  wrote:

> Hi everyone,
>
>
>
> Please review and vote on the release candidate #3 for the version 2.0.0
> of Apache Flink ML,
>
> as follows:
>
> [ ] +1, Approve the release
>
> [ ] -1, Do not approve the release (please provide specific comments)
>
>
>
> **Testing Guideline**
>
>
>
> You can find here [1] a page in the project wiki on instructions for
> testing. To cast a vote, it is not necessary to perform all listed checks,
> but please mention which checks you have performed when voting.
>
>
>
> **Release Overview**
>
>
>
> As an overview, the release consists of the following:
>
> a) Flink ML source release to be deployed to dist.apache.org
>
> b) Flink ML Python source distributions to be deployed to PyPI
>
> c) Maven artifacts to be deployed to the Maven Central Repository
>
>
>
> **Staging Areas to Review**
>
>
>
> The staging areas containing the above mentioned artifacts are as follows,
> for your review:
>
> * All artifacts for a) and b) can be found in the corresponding dev
> repository at dist.apache.org [2], which are signed with the key with
> fingerprint CBE82BEFD827B08AFA843977EDBF922A7BC84897 [3]
>
> * All artifacts for c) can be found at the Apache Nexus Repository [4]
>
>
>
> Other links for your review:
>
> * JIRA release notes [5]
>
> * Source code tag "release-2.0.0-rc3" [6]
>
> * PR to update the website Downloads page to include Flink ML links [7]
>
>
>
> **Vote Duration**
>
>
>
> The voting time will run for at least 72 hours.
>
> It is adopted by majority approval, with at least 3 PMC affirmative votes.
>
>
>
> Thanks,
>
> Dong and Yun
>
>
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/Verifying+a+Flink+ML+Release
>
> [2] https://dist.apache.org/repos/dist/dev/flink/flink-ml-2.0.0-rc3
>
> [3]  https://dist.apache.org/repos/dist/release/flink/KEYS
>
> [4] https://repository.apache.org/content/repositories/orgapacheflink-1477
>
> [5]
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12351079
>
> [6] https://github.com/apache/flink-ml/releases/tag/release-2.0.0-rc3
>
> [7] https://github.com/apache/flink-web/pull/493
>


[VOTE] Apache Flink ML Release 2.0.0, release candidate #3

2021-12-30 Thread Yun Gao
Hi everyone,
Please review and vote on the release candidate #3 for the version 2.0.0 of 
Apache Flink ML,
as follows:
[ ] +1, Approve the release
[ ] -1, Do not approve the release (please provide specific comments)
**Testing Guideline** 
You can find here [1] a page in the project wiki on instructions for testing. 
To cast a vote, it is not necessary to perform all listed checks, but please 
mention which checks you have performed when voting. 
**Release Overview**
As an overview, the release consists of the following:
a) Flink ML source release to be deployed to dist.apache.org
b) Flink ML Python source distributions to be deployed to PyPI
c) Maven artifacts to be deployed to the Maven Central Repository
**Staging Areas to Review**
The staging areas containing the above mentioned artifacts are as follows, for 
your review:
* All artifacts for a) and b) can be found in the corresponding dev repository 
at dist.apache.org [2], which are signed with the key with fingerprint 
CBE82BEFD827B08AFA843977EDBF922A7BC84897 [3]
* All artifacts for c) can be found at the Apache Nexus Repository [4]
Other links for your review:
* JIRA release notes [5]
* Source code tag "release-2.0.0-rc3" [6]
* PR to update the website Downloads page to include Flink ML links [7]
**Vote Duration**
The voting time will run for at least 72 hours.
It is adopted by majority approval, with at least 3 PMC affirmative votes.
Thanks,
Dong and Yun
[1] 
https://cwiki.apache.org/confluence/display/FLINK/Verifying+a+Flink+ML+Release
[2] https://dist.apache.org/repos/dist/dev/flink/flink-ml-2.0.0-rc3
[3] https://dist.apache.org/repos/dist/release/flink/KEYS
[4] https://repository.apache.org/content/repositories/orgapacheflink-1477
[5] 
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12351079
[6] https://github.com/apache/flink-ml/releases/tag/release-2.0.0-rc3
[7] https://github.com/apache/flink-web/pull/493

[jira] [Created] (FLINK-25490) Update the Chinese document related to final checkpoint

2021-12-30 Thread Yun Gao (Jira)
Yun Gao created FLINK-25490:
---

 Summary: Update the Chinese document related to final checkpoint
 Key: FLINK-25490
 URL: https://issues.apache.org/jira/browse/FLINK-25490
 Project: Flink
  Issue Type: Sub-task
  Components: Documentation, Runtime / Checkpointing
Affects Versions: 1.15.0
Reporter: Yun Gao
 Fix For: 1.15.0






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: [DISCUSS] FLIP-201: Persist local state in working directory

2021-12-30 Thread Till Rohrmann
Hi David,

Thanks for your feedback.

With the graceful shutdown I mean a way to stop the TaskManager and to
clean up the working directory. At the moment, I think we always kill the
process via SIGTERM or SIGKILL. This won't clean up the working directory
because it could also originate from a process failure. I think what Niklas
does is to introduce a signal handler to react to SIGTERM to disconnect
from the JobMaster.

You are right that by default Flink will now set the RocksDB directory to
the working temp directory. Before it defaulted to the spilling
directories. I think this is not a problem because users can still manually
configure multiple RocksDB directories via state.backend.rocksdb.localdir.
Moreover, I am not sure how well this mechanism works in practice. Flink
will simply iterate through the directories when creating new RocksDB state
backends w/o a lot of smartness. If one of the directories is full, then
Flink won't use another one but simply fail.

I do see the point of a proper serialization format and I agree that we
should eventually implement it. My reasoning was that the PR is already
quite big and I would prefer getting it in and then tackling this problem
as a follow-up instead of increasing the scope of the changes further
because the serialization format is not required for this feature (strictly
speaking). I hope that this makes sense.

I will also respond to your PR comments.

Cheers,
Till

On Thu, Dec 30, 2021 at 4:00 PM David Morávek  wrote:

> Hi Till,
>
> thanks for drafting the FLIP, it looks really good. I did a quick pass over
> the PR and it seems to be heading in a right direction.
>
> It might be required to introduce a graceful shutdown of the TaskExecutor
> > in order to support proper cleanup of resources.
> >
>
> This is actively being worked on by Niklas in FLINK-25277 [1].
>
> In the PR, I've seen that you're also replacing directories for storing the
> local state with the working directory. Should this be a concern? Is for
> example rocksdb able to leverage multiple mount paths for spreading the
> load?
>
> I'd also be in favor of introducing a proper (evolving) serialization
> format right away instead of the Java serialization, but no hard feelings
> if we don't.
>
> [1] https://issues.apache.org/jira/browse/FLINK-25277
>
> Best,
> D.
>
> On Wed, Dec 29, 2021 at 4:58 PM Till Rohrmann 
> wrote:
>
> > I've created draft PR for the desired changes [1]. It might be easier to
> > take a look at than the branch.
> >
> > [1] https://github.com/apache/flink/pull/18237
> >
> > Cheers,
> > Till
> >
> > On Tue, Dec 28, 2021 at 3:22 PM Till Rohrmann 
> > wrote:
> >
> > > Hi everyone,
> > >
> > > I would like to start a discussion about using the working directory to
> > > persist local state for faster recovery (FLIP-201) [1]. Persisting the
> > > local state will be beneficial if a crashed process is restarted with
> the
> > > same working directory. In this case, Flink does not have to download
> the
> > > state artifacts again and can recover locally.
> > >
> > > A POC can be found here [2].
> > >
> > > Looking forward to your feedback.
> > >
> > > [1] https://cwiki.apache.org/confluence/x/wJuqCw
> > > [2] https://github.com/tillrohrmann/flink/tree/FLIP-201
> > >
> > > Cheers,
> > > Till
> > >
> >
>


[jira] [Created] (FLINK-25489) failed at task: node and npm install when building flink from source in mac m1

2021-12-30 Thread jie han (Jira)
jie han created FLINK-25489:
---

 Summary: failed at task: node and npm install when building flink 
from source in mac m1
 Key: FLINK-25489
 URL: https://issues.apache.org/jira/browse/FLINK-25489
 Project: Flink
  Issue Type: Bug
  Components: Build System
Affects Versions: 1.14.2
 Environment: macbook pro, apple m1 pro
Reporter: jie han


when I build flink from source code in my m1 mac, I met the problem: 

Failed to execute goal com.github.eirslett:frontend-maven-plugin with 
version:1.9.1

the error is: _Could not download Node.js: Got error code 404 from the server_

I found the solution in github:

[https://github.com/eirslett/frontend-maven-plugin/issues/952]

upgrade version of frontend-maven-plugin to 1.11.0 can fix it



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: [NOTICE] Table API is now Scala-free by introducing a flink-table-planner-loader

2021-12-30 Thread Till Rohrmann
This is really great news. Thanks a lot for all the effort Timo, Francesco
and everyone else who was involved! I believe that this will make it a lot
easier for our users to use any Scala version they want with Flink's
vanilla distribution :-)

Cheers,
Till

On Thu, Dec 30, 2021 at 4:03 PM David Morávek  wrote:

> Great job! This brings the scala-free effort close to the finish line!
>
> D.
>
> On Thu, Dec 30, 2021 at 3:08 PM Timo Walther  wrote:
>
> > Hi everyone,
> >
> > The new module flink-table-planner-loader replaces
> > flink-table-planner_2.12 and avoids the need for a specific Scala
> > version in downstream projects. It is included in the Flink distribution
> > under /lib. For backwards compatibility, users can still swap it with
> > flink-table-planner_2.12 located in /opt.
> >
> > As a consequence, flink-table-uber has been split into
> > flink-table-api-java-uber, flink-table-planner(-loader), and
> > table-runtime. flink-sql-client has no Scala suffix anymore.
> >
> > It is recommended to let new projects depend on
> > flink-table-planner-loader (without Scala suffix) + flink-table-runtime
> > in provided scope next to the API.
> >
> > Note that the distribution does not include the Scala API by default.
> > Scala users need to explicitly add a dependency to flink-table-api-scala
> > or flink-table-api-scala-bridge.
> >
> > We added a new README that can be used for future reference about all
> > modules that we provide:
> >
> > https://github.com/apache/flink/blob/master/flink-table/README.md
> >
> > To the best of our knowledge, currently only the Hive compatibility
> > layer needs the old flink-table-planner_2.12. Connectors developers and
> > user programs should be fine with flink-table-planner-loader.
> >
> > For more information see also:
> >
> > https://issues.apache.org/jira/browse/FLINK-25128
> >
> >
> > Regards,
> > Timo
> >
>


Re: [NOTICE] Table API is now Scala-free by introducing a flink-table-planner-loader

2021-12-30 Thread David Morávek
Great job! This brings the scala-free effort close to the finish line!

D.

On Thu, Dec 30, 2021 at 3:08 PM Timo Walther  wrote:

> Hi everyone,
>
> The new module flink-table-planner-loader replaces
> flink-table-planner_2.12 and avoids the need for a specific Scala
> version in downstream projects. It is included in the Flink distribution
> under /lib. For backwards compatibility, users can still swap it with
> flink-table-planner_2.12 located in /opt.
>
> As a consequence, flink-table-uber has been split into
> flink-table-api-java-uber, flink-table-planner(-loader), and
> table-runtime. flink-sql-client has no Scala suffix anymore.
>
> It is recommended to let new projects depend on
> flink-table-planner-loader (without Scala suffix) + flink-table-runtime
> in provided scope next to the API.
>
> Note that the distribution does not include the Scala API by default.
> Scala users need to explicitly add a dependency to flink-table-api-scala
> or flink-table-api-scala-bridge.
>
> We added a new README that can be used for future reference about all
> modules that we provide:
>
> https://github.com/apache/flink/blob/master/flink-table/README.md
>
> To the best of our knowledge, currently only the Hive compatibility
> layer needs the old flink-table-planner_2.12. Connectors developers and
> user programs should be fine with flink-table-planner-loader.
>
> For more information see also:
>
> https://issues.apache.org/jira/browse/FLINK-25128
>
>
> Regards,
> Timo
>


Re: [DISCUSS] FLIP-201: Persist local state in working directory

2021-12-30 Thread David Morávek
Hi Till,

thanks for drafting the FLIP, it looks really good. I did a quick pass over
the PR and it seems to be heading in a right direction.

It might be required to introduce a graceful shutdown of the TaskExecutor
> in order to support proper cleanup of resources.
>

This is actively being worked on by Niklas in FLINK-25277 [1].

In the PR, I've seen that you're also replacing directories for storing the
local state with the working directory. Should this be a concern? Is for
example rocksdb able to leverage multiple mount paths for spreading the
load?

I'd also be in favor of introducing a proper (evolving) serialization
format right away instead of the Java serialization, but no hard feelings
if we don't.

[1] https://issues.apache.org/jira/browse/FLINK-25277

Best,
D.

On Wed, Dec 29, 2021 at 4:58 PM Till Rohrmann  wrote:

> I've created draft PR for the desired changes [1]. It might be easier to
> take a look at than the branch.
>
> [1] https://github.com/apache/flink/pull/18237
>
> Cheers,
> Till
>
> On Tue, Dec 28, 2021 at 3:22 PM Till Rohrmann 
> wrote:
>
> > Hi everyone,
> >
> > I would like to start a discussion about using the working directory to
> > persist local state for faster recovery (FLIP-201) [1]. Persisting the
> > local state will be beneficial if a crashed process is restarted with the
> > same working directory. In this case, Flink does not have to download the
> > state artifacts again and can recover locally.
> >
> > A POC can be found here [2].
> >
> > Looking forward to your feedback.
> >
> > [1] https://cwiki.apache.org/confluence/x/wJuqCw
> > [2] https://github.com/tillrohrmann/flink/tree/FLIP-201
> >
> > Cheers,
> > Till
> >
>


[NOTICE] Table API is now Scala-free by introducing a flink-table-planner-loader

2021-12-30 Thread Timo Walther

Hi everyone,

The new module flink-table-planner-loader replaces 
flink-table-planner_2.12 and avoids the need for a specific Scala 
version in downstream projects. It is included in the Flink distribution 
under /lib. For backwards compatibility, users can still swap it with 
flink-table-planner_2.12 located in /opt.


As a consequence, flink-table-uber has been split into 
flink-table-api-java-uber, flink-table-planner(-loader), and 
table-runtime. flink-sql-client has no Scala suffix anymore.


It is recommended to let new projects depend on 
flink-table-planner-loader (without Scala suffix) + flink-table-runtime 
in provided scope next to the API.


Note that the distribution does not include the Scala API by default. 
Scala users need to explicitly add a dependency to flink-table-api-scala 
or flink-table-api-scala-bridge.


We added a new README that can be used for future reference about all 
modules that we provide:


https://github.com/apache/flink/blob/master/flink-table/README.md

To the best of our knowledge, currently only the Hive compatibility 
layer needs the old flink-table-planner_2.12. Connectors developers and 
user programs should be fine with flink-table-planner-loader.


For more information see also:

https://issues.apache.org/jira/browse/FLINK-25128


Regards,
Timo


Re: [DISCUSS] FLIP-188: Introduce Built-in Dynamic Table Storage

2021-12-30 Thread Timo Walther

+1 for a separate repository. And also +1 for finding a good name.

`flink-warehouse` would be definitely a good marketing name but I agree 
that we should not start marketing for code bases. Are we planning to 
make this storage also available to DataStream API users? If not, I 
would also vote for `flink-managed-table` or better: `flink-table-store`


Thanks,
Timo



On 29.12.21 07:58, Jingsong Li wrote:

Thanks Till for your suggestions.

Personally, I like flink-warehouse, this is what we want to convey to
the user, but it indicates a bit too much scope.

How about just calling it flink-store?
Simply to convey an impression: this is flink's store project,
providing a built-in store for the flink compute engine, which can be
used by flink-table as well as flink-datastream.

Best,
Jingsong

On Tue, Dec 28, 2021 at 5:15 PM Till Rohrmann  wrote:


Hi Jingsong,

I think that developing flink-dynamic-storage as a separate sub project is
a very good idea since it allows us to move a lot faster and decouple
releases from Flink. Hence big +1.

Do we want to name it flink-dynamic-storage or shall we use a more
descriptive name? dynamic-storage sounds a bit generic to me and I wouldn't
know that this has something to do with letting Flink manage your tables
and their storage. I don't have a very good idea but maybe we can call it
flink-managed-tables, flink-warehouse, flink-olap or so.

Cheers,
Till

On Tue, Dec 28, 2021 at 9:49 AM Martijn Visser 
wrote:


Hi Jingsong,

That sounds promising! +1 from my side to continue development under
flink-dynamic-storage as a Flink subproject. I think having a more in-depth
interface will benefit everyone.

Best regards,

Martijn

On Tue, 28 Dec 2021 at 04:23, Jingsong Li  wrote:


Hi all,

After some experimentation, we felt no problem putting the dynamic
storage outside of flink, and it also allowed us to design the
interface in more depth.

What do you think? If there is no problem, I am asking for PMC's help
here: we want to propose flink-dynamic-storage as a flink subproject,
and we want to build the project under apache.

Best,
Jingsong


On Wed, Nov 24, 2021 at 8:10 PM Jingsong Li 
wrote:


Hi Stephan,

Thanks for your reply.

Data never expires automatically.

If there is a need for data retention, the user can choose one of the
following options:
- In the SQL for querying the managed table, users filter the data by

themselves

- Define the time partition, and users can delete the expired
partition by themselves. (DROP PARTITION ...)
- In the future version, we will support the "DELETE FROM" statement,
users can delete the expired data according to the conditions.

So to answer your question:


Will the VMQ send retractions so that the data will be removed from

the table (via compactions)?


The current implementation is not sending retraction, which I think
theoretically should be sent, currently the user can filter by
subsequent conditions.
And yes, the subscriber would not see strictly a correct result. I
think this is something we can improve for Flink SQL.


Do we want time retention semantics handled by the compaction?


Currently, no, Data never expires automatically.


Do we want to declare those types of queries "out of scope" initially?


I think we want users to be able to use three options above to
accomplish their requirements.

I will update FLIP to make the definition clearer and more explicit.

Best,
Jingsong

On Wed, Nov 24, 2021 at 5:01 AM Stephan Ewen 

wrote:


Thanks for digging into this.
Regarding this query:

INSERT INTO the_table
   SELECT window_end, COUNT(*)
 FROM (TUMBLE(TABLE interactions, DESCRIPTOR(ts), INTERVAL '5'

MINUTES))

GROUP BY window_end
   HAVING now() - window_end <= INTERVAL '14' DAYS;

I am not sure I understand what the conclusion is on the data

retention question, where the continuous streaming SQL query has retention
semantics. I think we would need to answer the following questions (I will
call the query that computed the managed table the "view materializer
query" - VMQ).


(1) I guess the VMQ will send no updates for windows beyond the

"retention period" is over (14 days), as you said. That makes sense.


(2) Will the VMQ send retractions so that the data will be removed

from the table (via compactions)?

   - if yes, this seems semantically better for users, but it will be

expensive to keep the timers for retractions.

   - if not, we can still solve this by adding filters to queries

against the managed table, as long as these queries are in Flink.

   - any subscriber to the changelog stream would not see strictly a

correct result if we are not doing the retractions


(3) Do we want time retention semantics handled by the compaction?
   - if we say that we lazily apply the deletes in the queries that

read the managed tables, then we could also age out the old data during
compaction.

   - that is cheap, but it might be too much of a special case to be

very relevant here.


(4) Do we want to declare 

[jira] [Created] (FLINK-25488) Using a pipe symbol as pair delimiter in STR_TO_MAP in combination with concatenation results in broken output

2021-12-30 Thread Martijn Visser (Jira)
Martijn Visser created FLINK-25488:
--

 Summary: Using a pipe symbol as pair delimiter in STR_TO_MAP in 
combination with concatenation results in broken output 
 Key: FLINK-25488
 URL: https://issues.apache.org/jira/browse/FLINK-25488
 Project: Flink
  Issue Type: Bug
Reporter: Martijn Visser


Reproducible using Flink Faker:

{code:sql}
-- Create source table
CREATE TABLE `customers` (
  `identifier` STRING,
  `fullname` STRING,
  `postal_address` STRING,
  `residential_address` STRING
) WITH (
  'connector' = 'faker',
  'fields.identifier.expression' = '#{Internet.uuid}',
  'fields.fullname.expression' = '#{Name.firstName} #{Name.lastName}',
  'fields.postal_address.expression' = '#{Address.fullAddress}',
  'fields.residential_address.expression' = '#{Address.fullAddress}',
  'rows-per-second' = '1'
);
{code}

{code:sql}
-- Doesn't generate expected output
SELECT 
  `identifier`,
  `fullname`,
  STR_TO_MAP('postal_address:' || postal_address || '|residential_address:' || 
residential_address,'|',':') AS `addresses`
FROM `customers`;
{code}

Output will look like:
{code:sql}
{=, A=null, C=null, D=null, L=null, O=null, P=null, S=null, T=null, _=null,  
=null, a=null, b=null, c=null, d=null, e=null, g=null, h=null, i=null, ,=null, 
l=null, -=null, m=null, .=null, n=null, o=null, p=null, q=null, 2=null, r=null, 
3=null, s=null, 4=null, t=null, 5=null, u=null, 6=null, v=null, 7=null, w=null, 
8=null, 9=null, |=null}
{code}

When using:
{code:sql}
-- Output looks like expected when using a different separator 
SELECT 
  `identifier`,
  `fullname`,
  STR_TO_MAP('postal_address:' || postal_address || ';residential_address:' || 
residential_address,';',':') AS `addresses`
FROM `customers`;
{code}

The output looks as expected:
{code:sql}
{postal_address=6654 Chong Meadows, East Lupita, CT 51702-8560, 
residential_address=Apt. 098 51845 Shields Fork, North Erikland, NV 10386}
{code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25487) Improve exception when flink-table-runtime is not in classpath

2021-12-30 Thread Timo Walther (Jira)
Timo Walther created FLINK-25487:


 Summary: Improve exception when flink-table-runtime is not in 
classpath
 Key: FLINK-25487
 URL: https://issues.apache.org/jira/browse/FLINK-25487
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: Timo Walther


If flink-table-runtime is not in the classpath, an exception can be quite 
cryptic. 

{code}
Exception in thread "main" java.lang.NoClassDefFoundError: 
org/apache/flink/calcite/shaded/com/jayway/jsonpath/spi/mapper/MappingProvider
at java.lang.Class.getDeclaredMethods0(Native Method)
at java.lang.Class.privateGetDeclaredMethods(Class.java:2701)
at java.lang.Class.privateGetMethodRecursive(Class.java:3048)
at java.lang.Class.getMethod0(Class.java:3018)
at java.lang.Class.getMethod(Class.java:1784)
at org.apache.calcite.linq4j.tree.Types.lookupMethod(Types.java:314)
at org.apache.calcite.util.BuiltInMethod.(BuiltInMethod.java:646)
at 
org.apache.calcite.util.BuiltInMethod.(BuiltInMethod.java:345)
at 
org.apache.calcite.rel.metadata.RelMdPercentageOriginalRows.(RelMdPercentageOriginalRows.java:41)
at 
org.apache.calcite.rel.metadata.DefaultRelMetadataProvider.(DefaultRelMetadataProvider.java:42)
at 
org.apache.calcite.rel.metadata.DefaultRelMetadataProvider.(DefaultRelMetadataProvider.java:28)
at org.apache.calcite.plan.RelOptCluster.(RelOptCluster.java:91)
at org.apache.calcite.plan.RelOptCluster.create(RelOptCluster.java:100)
at 
org.apache.flink.table.planner.calcite.FlinkRelOptClusterFactory$.create(FlinkRelOptClusterFactory.scala:36)
at 
org.apache.flink.table.planner.calcite.FlinkRelOptClusterFactory.create(FlinkRelOptClusterFactory.scala)
at 
org.apache.flink.table.planner.delegation.PlannerContext.(PlannerContext.java:137)
at 
org.apache.flink.table.planner.delegation.PlannerBase.(PlannerBase.scala:113)
at 
org.apache.flink.table.planner.delegation.StreamPlanner.(StreamPlanner.scala:55)
at 
org.apache.flink.table.planner.delegation.DefaultPlannerFactory.create(DefaultPlannerFactory.java:62)
at 
org.apache.flink.table.planner.loader.DelegatePlannerFactory.create(DelegatePlannerFactory.java:36)
at 
org.apache.flink.table.factories.PlannerFactoryUtil.createPlanner(PlannerFactoryUtil.java:53)
at 
org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.create(StreamTableEnvironmentImpl.java:123)
at 
org.apache.flink.table.api.bridge.java.StreamTableEnvironment.create(StreamTableEnvironment.java:128)
at 
org.apache.flink.table.api.bridge.java.StreamTableEnvironment.create(StreamTableEnvironment.java:96)
at com.ververica.Example_04_Table_ETL.main(Example_04_Table_ETL.java:18)
Caused by: java.lang.ClassNotFoundException: 
org.apache.flink.calcite.shaded.com.jayway.jsonpath.spi.mapper.MappingProvider
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
at 
org.apache.flink.core.classloading.ComponentClassLoader.loadClassFromOwnerOnly(ComponentClassLoader.java:139)
at 
org.apache.flink.core.classloading.ComponentClassLoader.loadClassFromComponentFirst(ComponentClassLoader.java:133)
at 
org.apache.flink.core.classloading.ComponentClassLoader.loadClass(ComponentClassLoader.java:95)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
... 25 more
{code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: Unable to update logback configuration in Flink Native Kubernetes

2021-12-30 Thread Sharon Xie
I've faced the same issue before.

I figured out that there is an internal configuration
`$internal.deployment.config-dir` (code
)
which allows me to specify a local folder which contains the logback config
using file `logback-console.xml`. The content of the file is then used to
create the config map.

Hope it helps.


Sharon

On Wed, Dec 29, 2021 at 7:04 AM Raghavendar T S 
wrote:

> Hi
>
> I have created a Flink Native Kubernetes (1.14.2) cluster which is
> successful. I am trying to update the logback configuration for which I am
> using the configmap exposed by Flink Native Kubernetes. Flink Native
> Kubernetes is creating this configmap during the start of the cluster and
> deleting it when the cluster is stopped and this behavior is as per the
> official documentation.
>
> I updated the logback configmap which is also successful and this process
> even updates the actual logback files (conf folder) in the job manager and
> task manager. But Flink is not loading (hot reloading) this logback
> configuration.
>
> Also I want to make sure that the logback configmap configuration is
> persisted even during cluster restarts. But the Flink Native Kubernetes
> recreates the configmap each time the cluster is started.
>
> What is that I am missing here? How to make the updated logback
> configuration work?
>
>
> Thanks & Regards
> Raghavendar T S
>
>
> 
>  Virus-free.
> www.avast.com
> 
> <#m_9211879584941238630_DAB4FAD8-2DD7-40BB-A1B8-4E2AA1F9FDF2>
>


Re: [DISCUSS] FLIP-200: Support Multiple Rule and Dynamic Rule Changing (Flink CEP)

2021-12-30 Thread David Morávek
Hi all,

sorry for the late reply, vacation season ;) I'm still not 100% sold on
choosing the OC for this use-case, but on the other hand I don't have
strong arguments against it. Few more questions / thoughts:

We're still talking about the "web server based"
pattern_processor_discoverer, but what about other use cases? One of my big
concerns is that user's can not really reuse any part of the Flink
ecosystem to implement the discovery logic. For example if they want to
read patterns from Kafka topic, they need to roll their own discoverer
based on the vanilla Kafka client. If we're talking about extensibility,
should we also make sure that the existing primitives can be reused?

For instance, the example I gave in my previous email seems not easily
> achievable with side-input / broadcast streams: a single invalid pattern
> detected on a TM can be disabled elegantly globally without crashing the
> entire Flink job.


This can be done for the side-input as well by filtering invalid patterns
before the broadcast. You can also send the invalid patterns to any side
output you want. I have a feeling that we're way too attached to the REST
server use case in this discussion. I agree that for that case, this
solution is the most straightforward one.

OC is a 2-way communication mechanism, i.e. a subtask can also send
> OperatorEvent to the coordinator to report its owns status, so that the
> coordinator can act accordingly.
>

I agree that 2-way communication in the "data-flow like" API is tricky,
because it requires cycles / iterations, which are still not really solved
(for a good reason, it's really tough nut to crack). This makes me think
that the OC may be bit of a "incomplete" workaround for not having fully
working support for iterations.

For example I'm not really confident that the checkpointing of the OC works
correctly right now, because it doesn't seem to require checkpoint barrier
alignment as the regular stream inputs. We also don't have a proper support
for watermarking (this is again tricky, because of the cycle).

If we decide to go down this road, should we first address some of these
limitations?

OperatorCoordinator will checkpoint the full amount of PatternProcessor
> data. For the reprocessing of historical data, you can read the
> PatternProcessor snapshots saved by this checkpoint from a certain
> historical checkpoint, and then recreate the historical data through these
> PatternProcessor snapshots.
>

If I understand that correctly, this means only the LATEST state of the
patterns (in other words - patterns that are currently in use). Is this
really sufficient for historical re-processing? Can someone for example
want re-process the data in more of a "temporal join" fashion? Also AFAIK
historical processing in combination with "coordinator checkpoints" is not
really something that we currently support of the box, are there any plans
on tackling this (my other concern is that this should not go against the
"unified batch & stream processing" efforts)?

I do agree that having the user defined control logic defined in the JM
> increases the chance of instability.
>

I can imagine that if this should be a concern, we could move the execution
of the OC to the task managers. This also makes me thing, that we shouldn't
make any strong assumptions that the OC will always run in the JobManager
(this is especially relevant for the embedded web-server use case).

If an agreement is reached on OperatorCoodinator, I will start the voting
> thread.
>

As for the vote, I'd would be great if we can wait until the next week as
many people took vacation until end of the year.

Overall, I really like the feature, this will be a great addition to Flink.

Best,
D.



On Thu, Dec 30, 2021 at 11:27 AM Martijn Visser 
wrote:

> Hi all,
>
> I can understand the need for a control plane mechanism. I'm not the
> technical go-to person for questions on the OperatorCoordinator, but I
> would expect that we could offer those interfaces from Flink but shouldn't
> recommend running user-code in the JobManager itself. I think the user code
> (like a webserver) should run outside of Flink (like via a sidecar) and use
> only the provided interfaces to communicate.
>
> I would like to get @David Morávek  opinion on the
> technical part.
>
> Best regards,
>
> Martijn
>
> On Thu, 30 Dec 2021 at 10:07, Nicholas Jiang 
> wrote:
>
>> Hi Konstantin, Becket, Martijn,
>>
>> Thanks for sharing your feedback. What other concerns do you have about
>> OperatorCoodinator? If an agreement is reached on OperatorCoodinator, I
>> will start the voting thread.
>>
>> Best,
>> Nicholas Jiang
>>
>> On 2021/12/22 03:19:58 Becket Qin wrote:
>> > Hi Konstantin,
>> >
>> > Thanks for sharing your thoughts. Please see the reply inline below.
>> >
>> > Thanks,
>> >
>> > Jiangjie (Becket) Qin
>> >
>> > On Tue, Dec 21, 2021 at 7:14 PM Konstantin Knauf 
>> wrote:
>> >
>> > > Hi Becket, Hi Nicholas,
>> > >
>> > > Thanks for joining the discussion.
>> 

[jira] [Created] (FLINK-25486) Perjob can not recover from checkpoint when zookeeper leader changes

2021-12-30 Thread Liu (Jira)
Liu created FLINK-25486:
---

 Summary: Perjob can not recover from checkpoint when zookeeper 
leader changes
 Key: FLINK-25486
 URL: https://issues.apache.org/jira/browse/FLINK-25486
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Checkpointing
Reporter: Liu


When the config 
high-availability.zookeeper.client.tolerate-suspended-connections is default 
false, the appMaster will failover once zk leader changes. In this case, the 
old appMaster will clean up all the zk info and the new appMaster will not 
recover from the latest checkpoint.

The process is as following:
 # Start a perJob application.
 # kill zk's leade node which cause the perJob to suspend.
 # In MiniDispatcher's function jobReachedTerminalState, shutDownFuture is set 
to UNKNOWN .
 # The future is transferred to ClusterEntrypoint, the method is called with 
cleanupHaData true.
 # Clean up zk data and exit.
 # The new appMaster will not find any checkpoints to start and the state is 
lost.

Since the job can recover automatically when the zk leader changes, it is 
reasonable to keep zk info for the coming recovery.

 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: [DISCUSS] FLIP-205: Support cache in DataStream for Batch Processing

2021-12-30 Thread Gen Luo
Hi Xuannan,

I found FLIP-188[1] that is aiming to introduce a built-in dynamic table
storage, which provides a unified changelog & table representation. Tables
stored there can be used in further ad-hoc queries. To my understanding,
it's quite like an implementation of caching in Table API, and the ad-hoc
queries are somehow like further steps in an interactive program.

As you replied, caching at Table/SQL API is the next step, as a part of
interactive programming in Table API, which we all agree is the major
scenario. What do you think about the relation between it and FLIP-188?

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-188%3A+Introduce+Built-in+Dynamic+Table+Storage


On Wed, Dec 29, 2021 at 7:53 PM Xuannan Su  wrote:

> Hi David,
>
> Thanks for sharing your thoughts.
>
> You are right that most people tend to use high-level API for
> interactive data exploration. Actually, there is
> the FLIP-36 [1] covering the cache API at Table/SQL API. As far as I
> know, it has been accepted but hasn’t been implemented. At the time
> when it is drafted, DataStream did not support Batch mode but Table
> API does.
>
> Now that the DataStream API does support batch processing, I think we
> can focus on supporting cache at DataStream first. It is still
> valuable for DataStream users and most of the work we do in this FLIP
> can be reused. So I want to limit the scope of this FLIP.
>
> After caching is supported at DataStream, we can continue from where
> FLIP-36 left off to support caching at Table/SQL API. We might have to
> re-vote FLIP-36 or draft a new FLIP. What do you think?
>
> Best,
> Xuannan
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink
>
>
>
> On Wed, Dec 29, 2021 at 6:08 PM David Morávek  wrote:
> >
> > Hi Xuannan,
> >
> > thanks for drafting this FLIP.
> >
> > One immediate thought, from what I've seen for interactive data
> exploration
> > with Spark, most people tend to use the higher level APIs, that allow for
> > faster prototyping (Table API in Flink's case). Should the Table API also
> > be covered by this FLIP?
> >
> > Best,
> > D.
> >
> > On Wed, Dec 29, 2021 at 10:36 AM Xuannan Su 
> wrote:
> >
> > > Hi devs,
> > >
> > > I’d like to start a discussion about adding support to cache the
> > > intermediate result at DataStream API for batch processing.
> > >
> > > As the DataStream API now supports batch execution mode, we see users
> > > using the DataStream API to run batch jobs. Interactive programming is
> > > an important use case of Flink batch processing. And the ability to
> > > cache intermediate results of a DataStream is crucial to the
> > > interactive programming experience.
> > >
> > > Therefore, we propose to support caching a DataStream in Batch
> > > execution. We believe that users can benefit a lot from the change and
> > > encourage them to use DataStream API for their interactive batch
> > > processing work.
> > >
> > > Please check out the FLIP-205 [1] and feel free to reply to this email
> > > thread. Looking forward to your feedback!
> > >
> > > [1]
> > >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-205%3A+Support+Cache+in+DataStream+for+Batch+Processing
> > >
> > > Best,
> > > Xuannan
> > >
>


Re: [DISCUSS] FLIP-205: Support cache in DataStream for Batch Processing

2021-12-30 Thread Zhipeng Zhang
Hi Xuannan,

Thanks for starting the discussion. This would certainly help a lot on both
efficiency and reproducibility in machine learning cases :)

I have a few questions as follows:

1. Can we support caching both the output and sideoutputs of a
SingleOutputStreamOperator (which I believe is a reasonable use case),
given that  `cache()` is defined on `SingleOutputStreamOperator`?

If not, shall we introduce another class, say
"CachedSingleOutputStreamOperator", which extends
SingleOutputStreamOperator and overrides the getSideOutput method and
return CachedDataStream?

2. Is there any chance that we also support cache in Stream Mode if the one
SingleOutputStreamOperator is bounded? We may also want to run batch jobs
in a Stream Mode. Could you add some discussions in the FLIP?

3. What are we going to do if users change the parallelism of
CachedDataStream? Shall we throw an exception or add a new operator when
translating the job graph?

Two typos:
1.  ...a stream node with the sample parallelism as its input is added to
the StreamGraph
---> "the same parallelism"
2. In figure of Job1, one-input transformation
---> MapTransformation

Best,
Zhipeng


David Morávek  于2021年12月29日周三 18:08写道:

> Hi Xuannan,
>
> thanks for drafting this FLIP.
>
> One immediate thought, from what I've seen for interactive data exploration
> with Spark, most people tend to use the higher level APIs, that allow for
> faster prototyping (Table API in Flink's case). Should the Table API also
> be covered by this FLIP?
>
> Best,
> D.
>
> On Wed, Dec 29, 2021 at 10:36 AM Xuannan Su  wrote:
>
> > Hi devs,
> >
> > I’d like to start a discussion about adding support to cache the
> > intermediate result at DataStream API for batch processing.
> >
> > As the DataStream API now supports batch execution mode, we see users
> > using the DataStream API to run batch jobs. Interactive programming is
> > an important use case of Flink batch processing. And the ability to
> > cache intermediate results of a DataStream is crucial to the
> > interactive programming experience.
> >
> > Therefore, we propose to support caching a DataStream in Batch
> > execution. We believe that users can benefit a lot from the change and
> > encourage them to use DataStream API for their interactive batch
> > processing work.
> >
> > Please check out the FLIP-205 [1] and feel free to reply to this email
> > thread. Looking forward to your feedback!
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-205%3A+Support+Cache+in+DataStream+for+Batch+Processing
> >
> > Best,
> > Xuannan
> >
>


-- 
best,
Zhipeng


Re: [DISCUSS] FLIP-200: Support Multiple Rule and Dynamic Rule Changing (Flink CEP)

2021-12-30 Thread Martijn Visser
Hi all,

I can understand the need for a control plane mechanism. I'm not the
technical go-to person for questions on the OperatorCoordinator, but I
would expect that we could offer those interfaces from Flink but shouldn't
recommend running user-code in the JobManager itself. I think the user code
(like a webserver) should run outside of Flink (like via a sidecar) and use
only the provided interfaces to communicate.

I would like to get @David Morávek  opinion on the
technical part.

Best regards,

Martijn

On Thu, 30 Dec 2021 at 10:07, Nicholas Jiang 
wrote:

> Hi Konstantin, Becket, Martijn,
>
> Thanks for sharing your feedback. What other concerns do you have about
> OperatorCoodinator? If an agreement is reached on OperatorCoodinator, I
> will start the voting thread.
>
> Best,
> Nicholas Jiang
>
> On 2021/12/22 03:19:58 Becket Qin wrote:
> > Hi Konstantin,
> >
> > Thanks for sharing your thoughts. Please see the reply inline below.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> > On Tue, Dec 21, 2021 at 7:14 PM Konstantin Knauf 
> wrote:
> >
> > > Hi Becket, Hi Nicholas,
> > >
> > > Thanks for joining the discussion.
> > >
> > > 1 ) Personally, I would argue that we should only run user code in the
> > > Jobmanager/Jobmaster if we can not avoid it. It seems wrong to me to
> > > encourage users to e.g. run a webserver on the Jobmanager, or
> continuously
> > > read patterns from a Kafka Topic on the Jobmanager, but both of these
> I see
> > > happening with the current design. We've had lots of issues with
> > > classloading leaks and other stability issues on the Jobmanager and
> making
> > > this more complicated, if there is another way, seems unnecessary.
> >
> >
> > I think the key question here is what primitive does Flink provide to
> > facilitate the user implementation of their own control logic / control
> > plane? It looks that previously, Flink assumes that all the user logic is
> > just data processing logic without any control / coordination
> requirements.
> > However, it turns out that a decent control plane abstraction is required
> > in association with the data processing logic in many cases, including
> > Source / Sink and other user defined operators in general. The fact that
> we
> > ended up with adding the SplitEnumerator and GlobalCommitter are just two
> > examples of the demand of such coordination among user defined logics.
> > There are other cases that we see in ecosystem projects, such as
> > deep-learning-on-flink[1]. Now we see this again in CEP.
> >
> > Such control plane primitives are critical to the extensibility of a
> > project. If we look at other projects, exposing such control plane logic
> is
> > quite common. For example, Hadoop ended up with exposing YARN as a public
> > API to the users, which is extremely popular. Kafka consumers exposed the
> > consumer group rebalance logic to the users via
> ConsumerPartitionAssigner,
> > which is also a control plane primitive.
> >
> > To me it is more important to think about how we can improve the
> stability
> > of such a control plane mechanism, instead of simply saying no to the
> users.
> >
> >
> >
> >
> > > 2) In addition, I suspect that, over time we will have to implement
> all the
> > > functionality that regular sources already provide around consistency
> > > (watermarks, checkpointing) for the PatternProcessorCoordinator, too.
> >
> >
> > I think OperatorCoordinator should have a generic communication mechanism
> > for all the operators, not specific to Source. We should probably have an
> > AbstractOperatorCoordinator help dealing with the communication layer,
> and
> > leave the state maintenance and event handling logic to the user code.
> >
> >
> > > 3) I understand that running on the Jobmanager is easier if you want to
> > > launch a REST server directly. Here my question would be: does this
> really
> > > need to be solved inside of Flink or couldn't you start a webserver
> next to
> > > Flink? If we start using the Jobmanager as a REST server users will
> expect
> > > that e.g. it is highly available and can be load balanced and we
> quickly
> > > need to think about aspects that we never wanted to think about in the
> > > context of a Flink Jobmanager.
> > >
> >
> > I think the REST API is just for receiving commands targeting a running
> > Flink job. If the job fails, the REST API would be useless.
> >
> >
> > > So, can you elaborate a bit more, why a side-input/broadcast stream is
> > >
> > > a) more difficult
> > > b) has vague semantics (To me semantics of a stream-stream seem clearer
> > > when it comes to out-of-orderness, late data, reprocessing or batch
> > > execution mode.)
> >
> >
> > I do agree that having the user defined control logic defined in the JM
> > increases the chance of instability. In that case, we may think of other
> > solutions and I am fully open to that. But the side-input / broadcast
> > stream seems more like a bandaid instead of a carefully designed control

[jira] [Created] (FLINK-25485) JDBC connector implicitly add options when use mysql

2021-12-30 Thread Ada Wong (Jira)
Ada Wong created FLINK-25485:


 Summary: JDBC connector implicitly add options when use mysql
 Key: FLINK-25485
 URL: https://issues.apache.org/jira/browse/FLINK-25485
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / JDBC
Affects Versions: 1.14.2
Reporter: Ada Wong


When we directly use mysql sink, buffer-flush 
options(sink.buffer-flush.max-rows, sink.buffer-flush.interval) can not 
increase throughput.

We must set 'rewriteBatchedStatements=true' into mysql jdbc url, for 
'sink.buffer-flush' to take effect.

I want add a method appendUrlSuffix in MySQLDialect to auto set this jdbc 
option.

Many users forget or don't know this option.

 

Inspired by alibaba DataX.

https://github.com/alibaba/DataX/blob/master/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/util/DataBaseType.java



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: [DISCUSS] FLIP-200: Support Multiple Rule and Dynamic Rule Changing (Flink CEP)

2021-12-30 Thread Nicholas Jiang
Hi Konstantin, Becket, Martijn,

Thanks for sharing your feedback. What other concerns do you have about 
OperatorCoodinator? If an agreement is reached on OperatorCoodinator, I will 
start the voting thread.

Best,
Nicholas Jiang

On 2021/12/22 03:19:58 Becket Qin wrote:
> Hi Konstantin,
> 
> Thanks for sharing your thoughts. Please see the reply inline below.
> 
> Thanks,
> 
> Jiangjie (Becket) Qin
> 
> On Tue, Dec 21, 2021 at 7:14 PM Konstantin Knauf  wrote:
> 
> > Hi Becket, Hi Nicholas,
> >
> > Thanks for joining the discussion.
> >
> > 1 ) Personally, I would argue that we should only run user code in the
> > Jobmanager/Jobmaster if we can not avoid it. It seems wrong to me to
> > encourage users to e.g. run a webserver on the Jobmanager, or continuously
> > read patterns from a Kafka Topic on the Jobmanager, but both of these I see
> > happening with the current design. We've had lots of issues with
> > classloading leaks and other stability issues on the Jobmanager and making
> > this more complicated, if there is another way, seems unnecessary.
> 
> 
> I think the key question here is what primitive does Flink provide to
> facilitate the user implementation of their own control logic / control
> plane? It looks that previously, Flink assumes that all the user logic is
> just data processing logic without any control / coordination requirements.
> However, it turns out that a decent control plane abstraction is required
> in association with the data processing logic in many cases, including
> Source / Sink and other user defined operators in general. The fact that we
> ended up with adding the SplitEnumerator and GlobalCommitter are just two
> examples of the demand of such coordination among user defined logics.
> There are other cases that we see in ecosystem projects, such as
> deep-learning-on-flink[1]. Now we see this again in CEP.
> 
> Such control plane primitives are critical to the extensibility of a
> project. If we look at other projects, exposing such control plane logic is
> quite common. For example, Hadoop ended up with exposing YARN as a public
> API to the users, which is extremely popular. Kafka consumers exposed the
> consumer group rebalance logic to the users via ConsumerPartitionAssigner,
> which is also a control plane primitive.
> 
> To me it is more important to think about how we can improve the stability
> of such a control plane mechanism, instead of simply saying no to the users.
> 
> 
> 
> 
> > 2) In addition, I suspect that, over time we will have to implement all the
> > functionality that regular sources already provide around consistency
> > (watermarks, checkpointing) for the PatternProcessorCoordinator, too.
> 
> 
> I think OperatorCoordinator should have a generic communication mechanism
> for all the operators, not specific to Source. We should probably have an
> AbstractOperatorCoordinator help dealing with the communication layer, and
> leave the state maintenance and event handling logic to the user code.
> 
> 
> > 3) I understand that running on the Jobmanager is easier if you want to
> > launch a REST server directly. Here my question would be: does this really
> > need to be solved inside of Flink or couldn't you start a webserver next to
> > Flink? If we start using the Jobmanager as a REST server users will expect
> > that e.g. it is highly available and can be load balanced and we quickly
> > need to think about aspects that we never wanted to think about in the
> > context of a Flink Jobmanager.
> >
> 
> I think the REST API is just for receiving commands targeting a running
> Flink job. If the job fails, the REST API would be useless.
> 
> 
> > So, can you elaborate a bit more, why a side-input/broadcast stream is
> >
> > a) more difficult
> > b) has vague semantics (To me semantics of a stream-stream seem clearer
> > when it comes to out-of-orderness, late data, reprocessing or batch
> > execution mode.)
> 
> 
> I do agree that having the user defined control logic defined in the JM
> increases the chance of instability. In that case, we may think of other
> solutions and I am fully open to that. But the side-input / broadcast
> stream seems more like a bandaid instead of a carefully designed control
> plane mechanism.
> 
> A decent control plane requires two-way communication, so information can
> be reported / collected from the entity being controlled, and the
> coordinator / controller can send decisions or commands to the entities
> accordingly, just like our TM / JM communication. IIUC, this is not
> achievable with the existing side-input / broadcast stream as both of them
> are one-way communication mechanisms. For instance, the example I gave in
> my previous email seems not easily achievable with side-input / broadcast
> streams: a single invalid pattern detected on a TM can be disabled
> elegantly globally without crashing the entire Flink job.
> 
> 
> > Cheers,
> >
> > Konstantin
> >
> >
> > On Tue, Dec 21, 2021 at 11:38 AM Becket Qin