[jira] [Created] (FLINK-25553) Remove MapR filesystem

2022-01-05 Thread Martijn Visser (Jira)
Martijn Visser created FLINK-25553:
--

 Summary: Remove MapR filesystem
 Key: FLINK-25553
 URL: https://issues.apache.org/jira/browse/FLINK-25553
 Project: Flink
  Issue Type: Technical Debt
  Components: Connectors / FileSystem
Reporter: Martijn Visser


Pending a positive outcome in the Dev mailing list 
https://lists.apache.org/thread/od2137fk5j1gq034sopj5n2th2w719w4 we can remove 
the MapR filesystem



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25552) Support MinMaxScaler in FlinkML

2022-01-05 Thread weibo zhao (Jira)
weibo zhao created FLINK-25552:
--

 Summary: Support MinMaxScaler in FlinkML
 Key: FLINK-25552
 URL: https://issues.apache.org/jira/browse/FLINK-25552
 Project: Flink
  Issue Type: New Feature
  Components: Library / Machine Learning
Reporter: weibo zhao


Support MinMaxScalar in FlinkML



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: [DISCUSS] FLIP-200: Support Multiple Rule and Dynamic Rule Changing (Flink CEP)

2022-01-05 Thread Till Rohrmann
Hi Becket,

I might be missing something but having to define interfaces/formats for
the CEP patterns should be necessary for either approach. The OC approach
needs to receive and understand the pattern data from somewhere as well and
will probably also have to deal with evolving formats. Hence, I believe
that this work wouldn't be wasted.

I might misjudge the willingness of our users to do some extra set up work,
but I think that some of them would already be happy with state 1.

My understanding so far was that the OC approach also requires the CEP
operator infrastructure (making the operator accept new patterns) work that
I proposed to do as a first step. The only difference is where the new
patterns/commands are coming from. If we design this correctly, then this
should change only very little depending on whether you read from a
side-input or from an OC.

Another benefit of downscoping the FLIP is to make it more realistic to be
completed. Smaller and incremental steps are usually easier to realize. If
we now say that this FLIP requires a general purpose user controlled
control plane that gives you hard guarantees, then I am pretty sure that
this will take at least half a year.

Cheers,
Till

On Thu, Jan 6, 2022 at 4:45 AM Becket Qin  wrote:

> Thanks for the explanation, Till. I like the idea, but have a question
> about the first step.
>
> After the first step, would users be able to actually use the dynamic
> patterns in CEP?
>
> In the first step you mentioned, the commands and formats for a new CEP
> pattern seem related to how users would ingest the commands. If we go with
> the OC, these commands and formats would become internal interfaces. The
> end users would just use the REST API, or in the beginning, implement a
> Java plugin of dynamic pattern provider. In this case our focus would be on
> designing a good plugin interface. On the other hand, if we go with the
> side-input, users would need to know the command format so they can send
> the commands to the CEP operator. Therefore we need to think about stuff
> like versioning, request validation and backwards compatibility.
>
> Also, because the public interface is all about how the users can ingest
> the dynamic patterns. It seems we still need to figure that out before we
> can conclude the FLIP.
>
> Assuming the first step closes this FLIP and after that users would be able
> to use the CEP dynamic pattern, are you suggesting the following?
>
> 1. We will design the commands and format of CEP dynamic pattern, and also
> how CEP operators take them into effect. This design would assume that
> users can send the commands directly to the CEP operator via side-input. So
> the protocol would take versioning and format evolution into account. After
> the first step, the users CAN make dynamic pattern work with the help from
> some external dependencies and maintenance effort.
>
> 2. Discuss about a control plane from the use case of CEP dynamic pattern,
> and let CEP dynamic pattern use that control plane if we eventually think
> that is the right way to go.
>
> Assuming that we are not going to stop after step 1 is done, will the end
> state be that the CEP dynamic pattern supports both approaches? It is not
> clear to me how this would work. For example, will a pattern ingested from
> side-input be managed by OC as well? If more and more users pick the more
> ergonomic way, would the side-input option just die out? In that case, will
> we deprecate that?
>
> My main concern is that the time and work we invest in step 1 will be
> thrown away. Not only that, at the end of step 1, in order to actually use
> the feature and make it production ready, there is a lot of work for the
> users to do. All that work may also become in vain. So it would be good if
> we can avoid that by having a clean approach to begin with. What do you
> think?
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
>
> On Wed, Jan 5, 2022 at 11:35 PM Till Rohrmann 
> wrote:
>
> > I think I would scope the effort slightly differently. Note that I might
> be
> > missing some requirements or overlook something.
> >
> > 1. Enable Flink to support CEP dynamic patterns
> >
> > Here I would define the commands and formats for a new CEP pattern. Then
> I
> > would extend the CEP operator to understand these commands so that we can
> > change the CEP patterns dynamically. This should give the building blocks
> > to set up a job where you can change the CEP patterns by reading from an
> > arbitrary side input the CEP pattern commands. Note that it will be the
> > responsibility of the user to ingest the data somehow. This is nothing
> > Flink is concerned with at this time.
> >
> > 2. Think about how to make the CEP dynamic pattern feature more ergonomic
> >
> > Here we can pick up the discussion about OC and having a REST ingestion
> > endpoint in Flink vs. offering tools that live outside of Flink itself. I
> > could imagine that this could become a separate small project that builds
> > 

Re: [DISCUSS] FLIP-205: Support cache in DataStream for Batch Processing

2022-01-05 Thread Xuannan Su
Hi Yun,

Thanks for your feedback!

1. With the cached stream the compile and job submission happens as a
regular job submission. And a job with multiple concurrent cached
DataStream is supported. For your example, a and b are run in the same
job. Thus, all the cached DataStream are created when the job is
completed.

2. If I understand your question correctly, this wouldn’t be a problem
if we support concurrent cached DataStream in a job.

3. Yes, the execution would have the same compile result regardless of
what deployment mode it is. If the user tries to run multiple batch
job that uses cache in one StreamExecutionEnvironment with Per-job
deployment mode.  The cache-consuming job will fail and we go through
the failover procedure to re-submit the job with the original as if
the cache hasn’t been created.
We can do better if we know what deployment mode upfront and disable
the caching for Per-job mode. Maybe we can check the
`execution.target` option to see if it is Per-Job mode. What do you
think?

4. This is a good question. And I can imagine a use case where users
want to process some bounded sources and cache the intermediate
result, verify the result, and then use it later for a Stream job.
Batch mode is required when creating the cache so that we know the job
will finish and the cache can be reused. When consuming the cache, it
could be in either Batch mode or Stream mode. For stream mode, it
behaves differently when the cache Datastream hasn't been created or
is invalid. It should compute the intermediate result from scratch but
it should not cache the intermediate result.

For remote shuffle service, I think it is fine if the current
design is aligned with remote shuffle service. For any work that is
required for remote shuffle service to work with caching, I am more
than happy to help.

Best,
Xuannan



On Wed, Jan 5, 2022 at 4:49 PM Yun Gao  wrote:
>
> Hi Xuannan,
>
> Very thanks for drafting the FLIP and initiating the discussion!
>
> I have several issues, sorry if I have misunderstandings:
>
> 1. With the cached stream, when would the compile and job submission
> happens? Does it happen on calling execute_and_cache() ? If so, could
> we support the job with multiple concurrent cached stream like
>
> DataStream a = ...
> DataStream b = ...
> a.cache()
> b.cache()
> // could we run a/b in a same job ?
>
> If not, perhaps we might have part of graphs that would not be executed?
>
> 2. If the part of graphs using the cache partition is executed as a second
> job, would the job be executed after its precedent jobs get finished?
> Would the StreamExecutionEnviornment does this tracking?
>
> 3. Do the execution would have the same compile result when running on 
> per-job v.s. application / session mode ? Since for per-job mode, when 
> executing the part of the graph that using the cached result, we might need 
> to run the whole graph from the sources; but for application / session mode, 
> it would be
> compiled to a separate job reading from the cached result partitions. If the
> compile result is different, perhaps currently we could not get the execution 
> mode when compiling ?
>
> 4. For the part of graph using the cached result, do we support the stream 
> case? Like we have a part of graph that have two sources, one source is a
> cached result partition and the other one is an unbounded job.
>
> For remote shuffle service, It seems to me currently we do not have
> complete process for them to support the cache ResultPartition, since
> in JobMasterPartitionTrackerImpl we have not support prompt a result
> partition via pluggable ShuffleMaster yet. But we should be able to further
> complete this part.
>
> Best,
> Yun
>
>
> --
> From:Xuannan Su 
> Send Time:2022 Jan. 5 (Wed.) 14:04
> To:dev 
> Subject:Re: [DISCUSS] FLIP-205: Support cache in DataStream for Batch 
> Processing
>
> Hi David,
>
> We have a description in the FLIP about the case of TM failure without
> the remote shuffle service. Basically, since the partitions are stored
> at the TM, a TM failure requires recomputing the intermediate result.
>
> If a Flink job uses the remote shuffle service, the partitions are
> stored at the remote shuffle service. In this case, the failure of TM
> will not cause any partition loss. Therefore, recomputing the
> intermediate result is not required. In case of partition lost at the
> remote shuffle service, even without a TM failure, the cached
> intermediate result is not valid anymore, so the intermediate result
> has to be recomputed.
>
> To summarize, no matter where the partitions are stored, locally at TM
> or remotely at remote shuffle service, recomputing is only required if
> the consuming job finds some partitions lost.
>
> I updated the FLIP to include the description of failover when using
> remote shuffle service.
>
> Best,
> Xuannan
>
>
> On Mon, Jan 3, 2022 at 4:17 PM David Morávek  wrote:
> >
> > One more 

[RESULT] [VOTE] Apache Flink ML Release 2.0.0, release candidate #3

2022-01-05 Thread Yun Gao
I'm happy to announce that we have unanimously approved this release.

There are 6 approving votes, 3 of which are binding:
* Dong Lin (non-binding)
* Zhipeng Zhang (non-binding)
* Xingbo Huang (non-binding)
* Till Rohrmann  (binding)
* Dian Fu  (binding)
* Becket Qin (binding)

There are no disapproving votes.

Thanks everyone!

Re: Re: [VOTE] Apache Flink ML Release 2.0.0, release candidate #3

2022-01-05 Thread Yun Gao
Very thanks everyone for the verification!

I'll announce the result in a separate thread.

Best,
Yun

--
Sender:Becket Qin
Date:2022/01/05 23:16:26
Recipient:dev
Cc:Yun Gao
Theme:Re: [VOTE] Apache Flink ML Release 2.0.0, release candidate #3

+1 (binding)

- Verified the checksum and signature
- Built java code and ran all the tests
- Installed python packages according to the instructions in README.md (there 
are some dependency conflicts but it looks they are due to my local environment 
issues.)

Regards,

Jiangjie (Becket) Qin
On Wed, Jan 5, 2022 at 8:51 PM Dian Fu  wrote:
+1 (binding)

 - Verified the checksum and signature
 - Build the Java code and also run the tests using `mvn clean verify`
 - Checked the NOTICE file
 - Pip installed the python package in MacOS under Python 3.7
 - Reviewed the flink-web PR

 Regards,
 Dian

 On Tue, Jan 4, 2022 at 12:25 AM Dong Lin  wrote:

 > Thank you Till for the vote.
 >
 > FYI, if you were not able to use `pip3 install` to verify the python source
 > release, that might be because you are using Python 3.9 (or later versions)
 > with pip3. The issue could be fixed by using e.g. Python 3.8.
 >
 > The supported python versions are documented in flink-ml-python/README.md
 > but we forgot to enforce this check in the setup.py previously. We have
 > merged PR (link ) to improve
 > the error message in the master branch.
 >
 >
 >
 >
 > On Mon, Jan 3, 2022 at 10:10 PM Till Rohrmann 
 > wrote:
 >
 > > +1 (binding)
 > >
 > > - Checked the checksums and signatures
 > > - Built java part from source release
 > > - Ran all Java tests
 > > - Checked the blog post PR
 > >
 > > What I did not manage to do is to build the Python part locally. I assume
 > > that this was due to my local Python setup. Maybe somebody else can
 > double
 > > check this part if not already done.
 > >
 > > Cheers,
 > > Till
 > >
 > > On Fri, Dec 31, 2021 at 10:16 AM Xingbo Huang 
 > wrote:
 > >
 > > > +1 (non-binding)
 > > >
 > > > - Verified checksums and signatures
 > > > - Pip install the apache-flink-ml package
 > > > - Run the apache-flink-ml tests (We will add more python examples in
 > the
 > > > next release https://issues.apache.org/jira/browse/FLINK-25497).
 > > >
 > > > Best,
 > > > Xingbo
 > > >
 > > > Zhipeng Zhang  于2021年12月31日周五 15:01写道:
 > > >
 > > > > +1 (non-binding)
 > > > >
 > > > > - Verified that the checksums and GPG files match the corresponding
 > > > release
 > > > > files
 > > > > - Verified that the source distributions do not contain any binaries
 > > > > - Built the source distribution with Maven to ensure all source files
 > > > have
 > > > > Apache headers
 > > > > - Verified that all POM files point to the same version
 > > > > - Verified that the README.md file does not have anything unexpected
 > > > > - Verified the NOTICE and LICENSE follows the rules
 > > > > - Checked JIRA release notes
 > > > > - Checked source code tag "release-2.0.0-rc3"
 > > > >
 > > > >
 > > > > Dong Lin  于2021年12月31日周五 09:09写道:
 > > > >
 > > > > > +1 (non-binding)
 > > > > >
 > > > > > - Verified that the checksums and GPG files match the corresponding
 > > > > release
 > > > > > files
 > > > > > - Verified that the source distributions do not contain any
 > binaries
 > > > > > - Built the source distribution with Maven to ensure all source
 > files
 > > > > have
 > > > > > Apache headers
 > > > > > - Verified that all POM files point to the same version
 > > > > > - Verified that the README.md file does not have anything
 > unexpected
 > > > > > - Verified the NOTICE and LICENSE follows the rules specified in
 > the
 > > > wiki
 > > > > > <
 > > > > >
 > > > >
 > > >
 > >
 > https://cwiki.apache.org/confluence/display/FLINK/Verifying+a+Flink+ML+Release
 > > > > > >
 > > > > > .
 > > > > > - Checked JIRA release notes
 > > > > > - Checked source code tag "release-2.0.0-rc2"
 > > > > > - Checked flink-web PR
 > > > > >
 > > > > >
 > > > > >
 > > > > > On Fri, Dec 31, 2021 at 1:24 AM Yun Gao 
 > > wrote:
 > > > > >
 > > > > > > Hi everyone,
 > > > > > >
 > > > > > >
 > > > > > >
 > > > > > > Please review and vote on the release candidate #3 for the
 > version
 > > > > 2.0.0
 > > > > > > of Apache Flink ML,
 > > > > > >
 > > > > > > as follows:
 > > > > > >
 > > > > > > [ ] +1, Approve the release
 > > > > > >
 > > > > > > [ ] -1, Do not approve the release (please provide specific
 > > comments)
 > > > > > >
 > > > > > >
 > > > > > >
 > > > > > > **Testing Guideline**
 > > > > > >
 > > > > > >
 > > > > > >
 > > > > > > You can find here [1] a page in the project wiki on instructions
 > > for
 > > > > > > testing. To cast a vote, it is not necessary to perform all
 > listed
 > > > > > checks,
 > > > > > > but please mention which checks you have performed when voting.
 > > > > > >
 > > > > > >
 > > > > > >
 > > > > > > **Release Overview**
 > > > > > >
 > > > > > >
 > > > > > >
 > > 

[jira] [Created] (FLINK-25551) Add example and documentation on the usage of Row in Python UDTF

2022-01-05 Thread Huang Xingbo (Jira)
Huang Xingbo created FLINK-25551:


 Summary: Add example and documentation on the usage of Row in 
Python UDTF
 Key: FLINK-25551
 URL: https://issues.apache.org/jira/browse/FLINK-25551
 Project: Flink
  Issue Type: Improvement
  Components: API / Python, Documentation
Affects Versions: 1.14.2, 1.13.5, 1.15.0
Reporter: Huang Xingbo


The following example comes from pyflink users:

{code:python}
source_table = """
CREATE TABLE source_table (
  a ARRAY>,
  b ARRAY
) WITH (
'connector' = 'datagen',
'number-of-rows' = '10'
)
"""
@udtf(result_types=[DataTypes.STRING(), DataTypes.DOUBLE()])
def split(x: list):
for s in x:
yield s

 @udtf(result_types=[
 DataTypes.ROW([
  DataTypes.FIELD("PRODID", 
DataTypes.STRING()),
  DataTypes.FIELD("ADDMONEY", 
DataTypes.DOUBLE())])])
def split2(x: list):
for s in x:
yield s,  # NOTE: This ',' is important

t_env.execute_sql(source_table)
# If you want to split the Row into two columns
t_env.from_path("source_table").join_lateral(split(col('a')).alias('x', 
'y')).select("x, y")
# If you want to treat the entire row as a column

t_env.from_path("source_table").join_lateral(split2(col('a')).alias('x')).select("x")

{code}




--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: [DISCUSS] JUnit 5 Migration

2022-01-05 Thread Hang Ruan
Hi, Ryan,
Thanks a lot for helping with the migration. Some modules are already
migrated by us, but the code hasn't been merged since we still have some
pending details to discuss.

These modules are flink-runtime,  flink-core, flink-test-utils,
flink-runtime-web,
flink-yarn, flink-kuberbetes, flink-dstl, flink-sql-parser,
flink-clients, flink-streaming-java, flink-optimizer, flink-connector-base
and flink-connector-kafka. We have created issues for these modules. Sorry
for confusing.

As for the commit author, we have some concerns about using individual
authors for commits. Migrating to JUnit 5 will touch a huge number of code
lines, and these changes are not quite helpful for tracing the actual
evolution of these test cases using git blame. Last year the Flink project
has a huge code reformatting commit touching almost all code (FLINK-20651
), and we ignore this
commit in git blame to avoid polluting the git history. Maybe we can use
the similar approach for JUnit migration commits.

Best,
Hang

Ryan Skraba  于2022年1月5日周三 18:39写道:

> Hello!  I can help out with the effort -- I've got a bit of experience with
> JUnit 4 and 5 migration, and it looks like even with the AssertJ scripts
> there's going to be a lot of mechanical and manual work to be done.  The
> migration document looks pretty comprehensive!
>
> For the remaining topics to be discussed:
>
> I don't have a strong opinion on what to do about parameterized tests that
> use inheritance, although Jing Ge's proposal sounds reasonable and easy to
> follow.  I wouldn't be worried about temporarily redundant test code too
> much if it simplifies getting us into a good final state, especially since
> redundant code would be easy to spot and remove when we get rid of JUnit 4
> artifacts.
>
> Getting rid of PowerMock sounds fine to me.
>
> I don't think it's necessary to have a common author for commits, given
> that commits will have the [JUnit5 migration] tag.  I guess my preference
> would be to have "one or a few" commits per module, merged progressively.
>
> Is there an existing branch on a repo with some of the modules already
> migrated?
>
> All my best, Ryan
>
> On Fri, Dec 17, 2021 at 5:19 PM Jing Ge  wrote:
>
> > Thanks Hang and Qingsheng for your effort and starting this discussion.
> As
> > additional information, I've created an umbrella ticket(
> > https://issues.apache.org/jira/browse/FLINK-25325). It is recommended to
> > create all JUnit5 migration related tasks under it, So we could track the
> > whole migration easily.
> >
> > I think, for the parameterized test issue, the major problem is that, on
> > one hand, JUnit 5 has its own approach to make parameterized tests and it
> > does not allow to use parameterized fixtures at class level. This is a
> huge
> > difference compared to JUnit4. On the other hand, currently, there are
> many
> > cross module test class inheritances, which means that the migration
> could
> > not be done in one shot. It must be allowed to run JUnit4 and JUnit5
> tests
> > simultaneously during the migration process. As long as there are sub
> > parameterized test classes in JUnit4, it will be risky to migrate the
> > parent class to JUnit5. And if the parent class has to stick with JUnit4
> > during the migration, any migrated JUnit5 subclass might need to
> duplicate
> > the test methods defined in the parent class. In this case, I would
> prefer
> > to duplicate the test methods with different names in the parent class
> for
> > both JUnit4 and JUnit5 only during the migration process as temporary
> > solution and remove the test methods for JUnit4 once the migration
> process
> > is finished, i.e. when all subclasses are JUnit5 tests. It is a trade-off
> > solution. Hopefully we could find another better solution during the
> > discussion.
> >
> > Speaking of replacing @Test with @TestTemplate, since I did read all
> tests,
> > do we really need to replace all of them with @TestTemplate w.r.t. the
> > parameterized tests?
> >
> > For the PowrMock tests, it is a good opportunity to remove them.
> >
> > best regards
> > Jing
> >
> > On Fri, Dec 17, 2021 at 2:14 PM Hang Ruan 
> wrote:
> >
> > > Hi, all,
> > >
> > > Apache Flink is using JUnit for unit tests and integration tests widely
> > in
> > > the project, however, it binds to the legacy JUnit 4 deeply. It is
> > > important to migrate existing cases to JUnit 5 in order to avoid
> > splitting
> > > the project into different JUnit versions.
> > >
> > > Qingsheng Ren and I have conducted some trials about the JUnit 5
> > migration,
> > > but there are too many modules that need to migrate. We would like to
> get
> > > more help from the community. It is planned to migrate module by
> module,
> > > and a JUnit 5 migration guide
> > > <
> > >
> >
> https://docs.google.com/document/d/1514Wa_aNB9bJUen4xm5uiuXOooOJTtXqS_Jqk9KJitU/edit?usp=sharing
> > > >[1]
> > > has been provided to new helpers on the 

Re: [DISCUSS] FLIP-200: Support Multiple Rule and Dynamic Rule Changing (Flink CEP)

2022-01-05 Thread Becket Qin
Thanks for the explanation, Till. I like the idea, but have a question
about the first step.

After the first step, would users be able to actually use the dynamic
patterns in CEP?

In the first step you mentioned, the commands and formats for a new CEP
pattern seem related to how users would ingest the commands. If we go with
the OC, these commands and formats would become internal interfaces. The
end users would just use the REST API, or in the beginning, implement a
Java plugin of dynamic pattern provider. In this case our focus would be on
designing a good plugin interface. On the other hand, if we go with the
side-input, users would need to know the command format so they can send
the commands to the CEP operator. Therefore we need to think about stuff
like versioning, request validation and backwards compatibility.

Also, because the public interface is all about how the users can ingest
the dynamic patterns. It seems we still need to figure that out before we
can conclude the FLIP.

Assuming the first step closes this FLIP and after that users would be able
to use the CEP dynamic pattern, are you suggesting the following?

1. We will design the commands and format of CEP dynamic pattern, and also
how CEP operators take them into effect. This design would assume that
users can send the commands directly to the CEP operator via side-input. So
the protocol would take versioning and format evolution into account. After
the first step, the users CAN make dynamic pattern work with the help from
some external dependencies and maintenance effort.

2. Discuss about a control plane from the use case of CEP dynamic pattern,
and let CEP dynamic pattern use that control plane if we eventually think
that is the right way to go.

Assuming that we are not going to stop after step 1 is done, will the end
state be that the CEP dynamic pattern supports both approaches? It is not
clear to me how this would work. For example, will a pattern ingested from
side-input be managed by OC as well? If more and more users pick the more
ergonomic way, would the side-input option just die out? In that case, will
we deprecate that?

My main concern is that the time and work we invest in step 1 will be
thrown away. Not only that, at the end of step 1, in order to actually use
the feature and make it production ready, there is a lot of work for the
users to do. All that work may also become in vain. So it would be good if
we can avoid that by having a clean approach to begin with. What do you
think?

Thanks,

Jiangjie (Becket) Qin


On Wed, Jan 5, 2022 at 11:35 PM Till Rohrmann  wrote:

> I think I would scope the effort slightly differently. Note that I might be
> missing some requirements or overlook something.
>
> 1. Enable Flink to support CEP dynamic patterns
>
> Here I would define the commands and formats for a new CEP pattern. Then I
> would extend the CEP operator to understand these commands so that we can
> change the CEP patterns dynamically. This should give the building blocks
> to set up a job where you can change the CEP patterns by reading from an
> arbitrary side input the CEP pattern commands. Note that it will be the
> responsibility of the user to ingest the data somehow. This is nothing
> Flink is concerned with at this time.
>
> 2. Think about how to make the CEP dynamic pattern feature more ergonomic
>
> Here we can pick up the discussion about OC and having a REST ingestion
> endpoint in Flink vs. offering tools that live outside of Flink itself. I
> could imagine that this could become a separate small project that builds
> upon Flink, for example.
>
> Cheers,
> Till
>
> On Wed, Jan 5, 2022 at 2:19 PM Becket Qin  wrote:
>
> > Hi Till,
> >
> > Thanks for the prompt reply. Like you said, we are indeed using the
> dynamic
> > CEP pattern use case to test the existing primitives in Flink to see if
> > they can meet the requirements. I fully understand the concern of
> > exposing OC as a user interface. Meanwhile I see CEP dynamic patterns as
> a
> > good opportunity to battle test and enhance the OC as a user facing
> control
> > plane which is currently missing. After all, there is no better person
> than
> > ourselves to try it out first.
> >
> > It is not clear to me whether it is worth continuing the effort of
> > supporting dynamic CEP pattern without concluding the control plane
> > discussion. Let's say we have a CEP job reading from Kafka. To make this
> > work with side-input, a few things need to be done.
> >
> >1. In order to support dynamic patterns, users would create another
> >Kafka topic as side-input to receive dynamic patterns.
> >2. In order to insert dynamic patterns, users would use a separate web
> >server that is provided by us as a separate tool. The web server takes
> > http
> >requests and sends dynamic pattern records to Kafka via a Kafka sink
> > (using
> >a KafkaProducer is likely simpler here, though).
> >3. Regarding querying the running dynamic 

[jira] [Created] (FLINK-25545) [JUnit5 Migration] Module: flink-clients

2022-01-05 Thread Hang Ruan (Jira)
Hang Ruan created FLINK-25545:
-

 Summary: [JUnit5 Migration] Module: flink-clients
 Key: FLINK-25545
 URL: https://issues.apache.org/jira/browse/FLINK-25545
 Project: Flink
  Issue Type: Sub-task
  Components: Tests
Reporter: Hang Ruan






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25550) [JUnit5 Migration] Module: flink-kuberbetes

2022-01-05 Thread Hang Ruan (Jira)
Hang Ruan created FLINK-25550:
-

 Summary: [JUnit5 Migration] Module: flink-kuberbetes
 Key: FLINK-25550
 URL: https://issues.apache.org/jira/browse/FLINK-25550
 Project: Flink
  Issue Type: Sub-task
  Components: Tests
Reporter: Hang Ruan






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25549) [JUnit5 Migration] Module: flink-dstl

2022-01-05 Thread Hang Ruan (Jira)
Hang Ruan created FLINK-25549:
-

 Summary: [JUnit5 Migration] Module: flink-dstl
 Key: FLINK-25549
 URL: https://issues.apache.org/jira/browse/FLINK-25549
 Project: Flink
  Issue Type: Sub-task
  Components: Tests
Reporter: Hang Ruan






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25548) [JUnit5 Migration] Module: flink-sql-parser

2022-01-05 Thread Hang Ruan (Jira)
Hang Ruan created FLINK-25548:
-

 Summary: [JUnit5 Migration] Module: flink-sql-parser
 Key: FLINK-25548
 URL: https://issues.apache.org/jira/browse/FLINK-25548
 Project: Flink
  Issue Type: Sub-task
  Components: Tests
Reporter: Hang Ruan






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25547) [JUnit5 Migration] Module: flink-optimizer

2022-01-05 Thread Hang Ruan (Jira)
Hang Ruan created FLINK-25547:
-

 Summary: [JUnit5 Migration] Module: flink-optimizer
 Key: FLINK-25547
 URL: https://issues.apache.org/jira/browse/FLINK-25547
 Project: Flink
  Issue Type: Sub-task
  Components: Tests
Reporter: Hang Ruan






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25546) [JUnit5 Migration] Module: flink-connector-base

2022-01-05 Thread Hang Ruan (Jira)
Hang Ruan created FLINK-25546:
-

 Summary: [JUnit5 Migration] Module: flink-connector-base
 Key: FLINK-25546
 URL: https://issues.apache.org/jira/browse/FLINK-25546
 Project: Flink
  Issue Type: Sub-task
  Components: Tests
Reporter: Hang Ruan






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25544) [JUnit5 Migration] Module: flink-streaming-java

2022-01-05 Thread Hang Ruan (Jira)
Hang Ruan created FLINK-25544:
-

 Summary: [JUnit5 Migration] Module: flink-streaming-java
 Key: FLINK-25544
 URL: https://issues.apache.org/jira/browse/FLINK-25544
 Project: Flink
  Issue Type: Sub-task
  Components: Tests
Reporter: Hang Ruan






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25543) [JUnit5 Migration] Module: flink-yarn

2022-01-05 Thread Hang Ruan (Jira)
Hang Ruan created FLINK-25543:
-

 Summary: [JUnit5 Migration] Module: flink-yarn
 Key: FLINK-25543
 URL: https://issues.apache.org/jira/browse/FLINK-25543
 Project: Flink
  Issue Type: Sub-task
  Components: Tests
Reporter: Hang Ruan






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25542) [JUnit5 Migration] Module: flink-runtime-web

2022-01-05 Thread Hang Ruan (Jira)
Hang Ruan created FLINK-25542:
-

 Summary: [JUnit5 Migration] Module: flink-runtime-web
 Key: FLINK-25542
 URL: https://issues.apache.org/jira/browse/FLINK-25542
 Project: Flink
  Issue Type: Sub-task
  Components: Tests
Reporter: Hang Ruan






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25541) [JUnit5 Migration] Module: flink-test-utils

2022-01-05 Thread Hang Ruan (Jira)
Hang Ruan created FLINK-25541:
-

 Summary: [JUnit5 Migration] Module: flink-test-utils
 Key: FLINK-25541
 URL: https://issues.apache.org/jira/browse/FLINK-25541
 Project: Flink
  Issue Type: Sub-task
  Components: Tests
Reporter: Hang Ruan






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25540) [JUnit5 Migration] Module: flink-runtime

2022-01-05 Thread Hang Ruan (Jira)
Hang Ruan created FLINK-25540:
-

 Summary: [JUnit5 Migration] Module: flink-runtime
 Key: FLINK-25540
 URL: https://issues.apache.org/jira/browse/FLINK-25540
 Project: Flink
  Issue Type: Sub-task
  Components: Tests
Reporter: Hang Ruan






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25538) Migrate flink-connector-kafka to JUnit 5

2022-01-05 Thread Qingsheng Ren (Jira)
Qingsheng Ren created FLINK-25538:
-

 Summary: Migrate flink-connector-kafka to JUnit 5
 Key: FLINK-25538
 URL: https://issues.apache.org/jira/browse/FLINK-25538
 Project: Flink
  Issue Type: Sub-task
  Components: Connectors / Kafka
Reporter: Qingsheng Ren






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25539) 用flink的BatchTableEnvironment创建连接器去读取oss文件,并行度设为16,读数时有时会出现线程报错:Null IO stream

2022-01-05 Thread Jira
王康 created FLINK-25539:
--

 Summary: 
用flink的BatchTableEnvironment创建连接器去读取oss文件,并行度设为16,读数时有时会出现线程报错:Null IO stream
 Key: FLINK-25539
 URL: https://issues.apache.org/jira/browse/FLINK-25539
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.13.2
 Environment: linux:三台机器部署flink-taskmanage,16核内存32g
Reporter: 王康


Null IO stream:用的flink版本1.13.2,用连接器去读取oss文件,

BatchTableEnvironment环境并行度设为16,三台机器节点,然后其中一台节点服务器有时就会出现读数时报错Null IO 
stream,导致job失败
{code:java}
ExecutionEnvironment fbEnv = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv = BatchTableEnvironment.create(fbEnv);

fbEnv.setParallelism(16);
//
tableEnv.connect(new FileSystem().path(ossPath))
.withFormat(new Csv().fieldDelimiter(allTable.getSeparator().charAt(0)))
.withSchema(schema)
.createTemporaryTable(ossTableName); {code}
{code:java}
//以下就是flink报错的具体信息
org.apache.flink.runtime.JobException: Recovery is suppressed by 
NoRestartBackoffTimeStrategy   at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
   at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
   at 
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:216)
   at 
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:206)
   at 
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:197)
   at 
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:682)
   at 
org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79)
   at 
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:435)
   at sun.reflect.GeneratedMethodAccessor113.invoke(Unknown Source)   at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   at java.lang.reflect.Method.invoke(Method.java:498)   at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
   at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
   at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
   at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
   at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)   at 
akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)   at 
scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)   at 
akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)   at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)   at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)   at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)   at 
akka.actor.Actor$class.aroundReceive(Actor.scala:517)   at 
akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)   at 
akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)   at 
akka.actor.ActorCell.invoke(ActorCell.scala:561)   at 
akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)   at 
akka.dispatch.Mailbox.run(Mailbox.scala:225)   at 
akka.dispatch.Mailbox.exec(Mailbox.scala:235)   at 
akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)   at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)   
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)   at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 
Caused by: java.io.IOException: Null IO stream   at 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.aliyun.oss.AliyunOSSInputStream.reopen(AliyunOSSInputStream.java:176)
   at 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.aliyun.oss.AliyunOSSInputStream.read(AliyunOSSInputStream.java:235)
   at java.io.DataInputStream.read(DataInputStream.java:149)   at 
org.apache.flink.fs.osshadoop.common.HadoopDataInputStream.read(HadoopDataInputStream.java:96)
   at 
org.apache.flink.api.common.io.DelimitedInputFormat.fillBuffer(DelimitedInputFormat.java:742)
   at 
org.apache.flink.api.common.io.DelimitedInputFormat.readLine(DelimitedInputFormat.java:586)
   at 
org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:505)
   at 
org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:50)
   at 
org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:183)
   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)   at 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)   at 
java.lang.Thread.run(Thread.java:745)
{code}



--
This message was sent 

[jira] [Created] (FLINK-25537) Migrate flink-core to JUnit 5

2022-01-05 Thread Qingsheng Ren (Jira)
Qingsheng Ren created FLINK-25537:
-

 Summary: Migrate flink-core to JUnit 5
 Key: FLINK-25537
 URL: https://issues.apache.org/jira/browse/FLINK-25537
 Project: Flink
  Issue Type: Sub-task
  Components: API / Core
Reporter: Qingsheng Ren






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25536) Minor Fix: Adjust the order of variable declaration and comment in StateAssignmentOperation

2022-01-05 Thread Junfan Zhang (Jira)
Junfan Zhang created FLINK-25536:


 Summary: Minor Fix: Adjust the order of variable declaration and 
comment in StateAssignmentOperation
 Key: FLINK-25536
 URL: https://issues.apache.org/jira/browse/FLINK-25536
 Project: Flink
  Issue Type: Improvement
Reporter: Junfan Zhang






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25535) The JVM parameter does not take effect

2022-01-05 Thread Bo Cui (Jira)
Bo Cui created FLINK-25535:
--

 Summary: The JVM parameter does not take effect
 Key: FLINK-25535
 URL: https://issues.apache.org/jira/browse/FLINK-25535
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.14.0, 1.13.0, 1.12.0, 1.15.0
Reporter: Bo Cui


[https://github.com/apache/flink/blob/dd1fddb13b2d08ade580e5b3ec6b8e910974308d/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java#L188]
{code:java}
.map(e -> String.format("-D %s=%s", e.getKey(), e.getValue())) {code}
No space is required between -D and %s



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25534) execute pre-job throws org.apache.flink.table.api.TableException: Failed to execute sql

2022-01-05 Thread jychen (Jira)
jychen created FLINK-25534:
--

 Summary: execute pre-job throws 
org.apache.flink.table.api.TableException: Failed to execute sql
 Key: FLINK-25534
 URL: https://issues.apache.org/jira/browse/FLINK-25534
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Reporter: jychen


org.apache.flink.table.api.TableException: Failed to execute sql
    at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:777)
 ~[flink-table-blink_2.12-1.13.3.jar:1.13.3]
    at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:742)
 ~[flink-table-blink_2.12-1.13.3.jar:1.13.3]
    at 
org.apache.flink.table.api.internal.StatementSetImpl.execute(StatementSetImpl.java:99)
 ~[flink-table-blink_2.12-1.13.3.jar:1.13.3]
    at com.cig.cdp.flink.JobApplication.main(JobApplication.java:91) 
~[flink-streaming-core.jar:1.0.0-SNAPSHOT]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
~[?:1.8.0_312]
    at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
~[?:1.8.0_312]
    at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 ~[?:1.8.0_312]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_312]
    at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
 ~[flink-dist_2.12-1.13.3.jar:1.13.3]
    at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
 ~[flink-dist_2.12-1.13.3.jar:1.13.3]
    at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) 
~[flink-dist_2.12-1.13.3.jar:1.13.3]
    at 
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812) 
~[flink-dist_2.12-1.13.3.jar:1.13.3]
    at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246) 
~[flink-dist_2.12-1.13.3.jar:1.13.3]

...skipping 1 line
    at 
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132) 
~[flink-dist_2.12-1.13.3.jar:1.13.3]
    at java.security.AccessController.doPrivileged(Native Method) ~[?:1.8.0_312]
    at javax.security.auth.Subject.doAs(Subject.java:422) [?:1.8.0_312]
    at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1962)
 [hadoop-common-3.0.0.jar:?]
    at 
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
 [flink-dist_2.12-1.13.3.jar:1.13.3]
    at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132) 
[flink-dist_2.12-1.13.3.jar:1.13.3]
Caused by: org.apache.flink.client.deployment.ClusterDeploymentException: Could 
not deploy Yarn job cluster.
    at 
org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:481)
 ~[flink-dist_2.12-1.13.3.jar:1.13.3]
    at 
org.apache.flink.client.deployment.executors.AbstractJobClusterExecutor.execute(AbstractJobClusterExecutor.java:81)
 ~[flink-dist_2.12-1.13.3.jar:1.13.3]
    at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1957)
 ~[flink-dist_2.12-1.13.3.jar:1.13.3]
    at 
org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:137)
 ~[flink-dist_2.12-1.13.3.jar:1.13.3]
    at 
org.apache.flink.table.planner.delegation.ExecutorBase.executeAsync(ExecutorBase.java:55)
 ~[flink-table-blink_2.12-1.13.3.jar:1.13.3]
    at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:759)
 ~[flink-table-blink_2.12-1.13.3.jar:1.13.3]
    ... 19 more
Caused by: org.apache.flink.yarn.YarnClusterDescriptor$YarnDeploymentException: 
The YARN application unexpectedly switched to state FAILED during deployment. 
Diagnostics from YARN: Application application_1641278381631_0020 failed 2 
times in previous 1 milliseconds due to AM Container for 
appattempt_1641278381631_0020_02 exited with  exi
tCode: 1
Failing this attempt.Diagnostics: [2022-01-06 09:15:11.758]Exception from 
container-launch.
Container id: container_1641278381631_0020_02_01
Exit code: 1

[2022-01-06 09:15:11.759]Container exited with a non-zero exit code 1. Error 
file: prelaunch.err.
Last 4096 bytes of prelaunch.err :

[2022-01-06 09:15:11.760]Container exited with a non-zero exit code 1. Error 
file: prelaunch.err.
Last 4096 bytes of prelaunch.err :

For more detailed output, check the application tracking page: 
http://master:8088/cluster/app/application_1641278381631_0020 Then click on 
links to logs of each attempt.
. Failing the application.
If log aggregation is enabled on your cluster, use this command to further 
investigate the issue:
yarn logs -applicationId application_1641278381631_0020
    at 

Re: [DISCUSS] Disabling JNDI by default

2022-01-05 Thread Martijn Visser
Hi Till,

I think it would be great if we could achieve this so that Flink would be
'hardened' by default. Hopefully someone in the community has some ideas
how.

Best regards,

Martijn

On Tue, 4 Jan 2022 at 13:19, Till Rohrmann  wrote:

> Hi everyone,
>
> With the latest CVEs around log4j, we have seen that certain functionality
> of the JVM can be quite dangerous. Concretely, the JNDI functionality [1]
> seems to open quite a large attack vector against JVMs which has been used
> in the log4j CVE case.
>
> In order to avoid these kinds of security issues, Stephan had the idea of
> looking into disabling the JNDI functionality by default. It is not clear
> whether this is easily doable but there exist some projects that do it for
> dedicated libraries [2].
>
> That is why I wanted to reach out to the community to ask for help with
> this issue. Maybe you have encountered a similar problem in a different
> context and know how to deal with these issues.
>
> [1]
>
> https://docs.oracle.com/javase/jndi/tutorial/getStarted/overview/index.html#:~:text=The%20Java%20Naming%20and%20Directory,any%20specific%20directory%20service%20implementation
> .
> [2] https://github.com/nccgroup/log4j-jndi-be-gone
>
> Cheers,
> Till
>


[jira] [Created] (FLINK-25533) Preferred AllocationIDs are not respected when fulfilling pending slot requests

2022-01-05 Thread Till Rohrmann (Jira)
Till Rohrmann created FLINK-25533:
-

 Summary: Preferred AllocationIDs are not respected when fulfilling 
pending slot requests
 Key: FLINK-25533
 URL: https://issues.apache.org/jira/browse/FLINK-25533
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Affects Versions: 1.14.2, 1.13.5, 1.15.0
Reporter: Till Rohrmann
Assignee: Till Rohrmann
 Fix For: 1.15.0


In order to make best use of local recovery, we have to forward the set of 
preferred allocations to the {{DeclarativeSlotPoolBridge}} where new slots are 
matched with pending slot requests. At the moment this is not the case and this 
means that whenever we try to recover locally while not having all slots 
available, we might do wrong scheduling decisions.

In order to improve the situation, I propose to forward the set of preferred 
allocations to the {{DeclarativeSlotPoolBridge}}.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: [DISCUSS] FLIP-205: Support cache in DataStream for Batch Processing

2022-01-05 Thread Zhipeng Zhang
Hi Xuannnan,

Thanks for the reply.

Regarding whether and how to support cache sideoutput, I agree that the
second option might be better if there do exist a use case that users need
to cache only some certain side outputs.


Xuannan Su  于2022年1月4日周二 15:50写道:

> Hi Zhipeng and Gen,
>
> Thanks for joining the discussion.
>
> For Zhipeng:
>
> - Can we support side output
> Caching the side output is indeed a valid use case. However, with the
> current API, it is not straightforward to cache the side output. You
> can apply an identity map function to the DataStream returned by the
> getSideOutput method and then cache the result of the map
> transformation. In my opinion, it is not user-friendly. Therefore, we
> should think of a way to better support the use case.
> As you say, we can introduce a new class
> `CachedSingleOutputStreamOperator`, and overwrite the `getSideOutput`
> method to return a `CachedDatastream`. With this approach, the cache
> method implies that both output and the side output of the
> `SingleOutputStreamOperatior` are cached. The problem with this
> approach is that the user has no control over which side output should
> be cached.
> Another option would be to let the `getSideOuput` method return the
> `SingleOutputStreamOperator`. This way, users can decide which side
> output to cache. As the `getSideOutput` method returns a
> `SingleOutputStreamOperator`. Users can set properties of the
> transformation that produce the side output, e.g. parallelism, buffer
> timeout, etc. If users try to set different values of the same
> property of a transformation, an exception will be thrown. What do you
> think?
>
> - Can we support Stream Mode
> Running a job in stream mode doesn't guarantee the job will finish,
> while in batch mode, it does.  This is the main reason that prevents
> us from supporting cache in stream mode. The cache cannot be used
> unless the job can finish.
> If I understand correctly, by "run batch jobs in Stream Mode", you
> mean that you have a job with all bounded sources, but you want the
> intermediate data to shuffle in pipelined mode instead of blocking
> mode. If that is the case, the job can run in batch mode with
> "execution.batch-shuffle-mode" set to "ALL_EXCHANGES_PIPELINED" [1].
> And we can support caching in this case.
>
> - Change parallelism of CachedDataStream
> CachedDataStream extends from DataStream, which doesn't have the
> `setParallelism` method like the `SingleOutputStreamOperator`. Thus,
> it should not be a problem with CachedDataStream.
>
> For Gen:
>
> - Relation between FLIP-205 and FLIP-188
> Although it feels like dynamic table and caching are similar in the
> sense that they let user reuse come intermediate result, they target
> different use cases. The dynamic table is targeting the use case where
> users want to share a dynamic updating intermediate result across
> multiple applications. It is some meaningful data that can be consumed
> by different Flink applications and Flink jobs. While caching is
> targeting the use case where users know that all the sources are
> bounded and static, and caching is only used to avoid re-computing the
> intermediate result. And the cached intermediate result is only
> meaningful crossing jobs in the same application.
>
> Dynamic table and caching can be used together. For example, in a
> machine learning scenario, we can have a Stream job that is generating
> some training samples. And we can create a dynamic table for the
> training sample. And we run a Flink application every hour to do some
> data analysis on the training sample generated in the last hour. The
> Flink application consists of multiple batch jobs and the batch jobs
> share some intermediate results, so users can use cache to avoid
> re-computation. The intermediate result is not meaningful outside of
> the application. And the cache will be discarded after the application
> is finished.
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#execution-batch-shuffle-mode
>
>
> On Thu, Dec 30, 2021 at 7:00 PM Gen Luo  wrote:
> >
> > Hi Xuannan,
> >
> > I found FLIP-188[1] that is aiming to introduce a built-in dynamic table
> > storage, which provides a unified changelog & table representation.
> Tables
> > stored there can be used in further ad-hoc queries. To my understanding,
> > it's quite like an implementation of caching in Table API, and the ad-hoc
> > queries are somehow like further steps in an interactive program.
> >
> > As you replied, caching at Table/SQL API is the next step, as a part of
> > interactive programming in Table API, which we all agree is the major
> > scenario. What do you think about the relation between it and FLIP-188?
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-188%3A+Introduce+Built-in+Dynamic+Table+Storage
> >
> >
> > On Wed, Dec 29, 2021 at 7:53 PM Xuannan Su 
> wrote:
> >
> > > Hi David,
> > >
> > > Thanks for sharing your 

Re: [DISCUSS] FLIP-200: Support Multiple Rule and Dynamic Rule Changing (Flink CEP)

2022-01-05 Thread Till Rohrmann
I think I would scope the effort slightly differently. Note that I might be
missing some requirements or overlook something.

1. Enable Flink to support CEP dynamic patterns

Here I would define the commands and formats for a new CEP pattern. Then I
would extend the CEP operator to understand these commands so that we can
change the CEP patterns dynamically. This should give the building blocks
to set up a job where you can change the CEP patterns by reading from an
arbitrary side input the CEP pattern commands. Note that it will be the
responsibility of the user to ingest the data somehow. This is nothing
Flink is concerned with at this time.

2. Think about how to make the CEP dynamic pattern feature more ergonomic

Here we can pick up the discussion about OC and having a REST ingestion
endpoint in Flink vs. offering tools that live outside of Flink itself. I
could imagine that this could become a separate small project that builds
upon Flink, for example.

Cheers,
Till

On Wed, Jan 5, 2022 at 2:19 PM Becket Qin  wrote:

> Hi Till,
>
> Thanks for the prompt reply. Like you said, we are indeed using the dynamic
> CEP pattern use case to test the existing primitives in Flink to see if
> they can meet the requirements. I fully understand the concern of
> exposing OC as a user interface. Meanwhile I see CEP dynamic patterns as a
> good opportunity to battle test and enhance the OC as a user facing control
> plane which is currently missing. After all, there is no better person than
> ourselves to try it out first.
>
> It is not clear to me whether it is worth continuing the effort of
> supporting dynamic CEP pattern without concluding the control plane
> discussion. Let's say we have a CEP job reading from Kafka. To make this
> work with side-input, a few things need to be done.
>
>1. In order to support dynamic patterns, users would create another
>Kafka topic as side-input to receive dynamic patterns.
>2. In order to insert dynamic patterns, users would use a separate web
>server that is provided by us as a separate tool. The web server takes
> http
>requests and sends dynamic pattern records to Kafka via a Kafka sink
> (using
>a KafkaProducer is likely simpler here, though).
>3. Regarding querying the running dynamic patterns, given Kafka is not
>queryable, users would probably introduce a database and insert the
>patterns there so they can query the running patterns. Maybe this could
> be
>done by the CEP operator side-output, so there is less chance of
>inconsistency between Kafka and the database.
>4. If the Flink job is to be stopped, the dynamic pattern Kafka topic
>needs to be deleted, the companion web server also needs to be stopped
>(assuming it is not shared with other CEP jobs), and the database table
>storing the dynamic pattern needs to be dropped.
>
> Please correct me if I misunderstood something, but this seems quite
> involved. Moreover, all the work here is going to be thrown away after we
> have OC as a decent user facing control plane in place. And we will likely
> have a backwards incompatible API change here. Given that, I am wondering
> if we should wait until the OC discussion concludes before moving on with
> the dynamic patterns?
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
>
>
> On Wed, Jan 5, 2022 at 5:53 PM Till Rohrmann  wrote:
>
> > Thanks for the detailed explanation Becket.
> >
> > Do you think that an additional dependency is a deal breaker for people
> to
> > use dynamic CEP patterns? At the very least people have to operate some
> > kind of storage/queue system from which the CEP job can read anyway.
> Maybe
> > it could be good enough to provide a REST endpoint (as a separate tool)
> > that can be instantiated with a Flink sink to ingest REST requests
> > into some queue. A general concern I have is that by making the JM a REST
> > ingestion point for data will push another responsibility to Flink and
> > increase the surface area further.
> >
> > For how to handle data that cannot be processed by patterns, I think
> there
> > also exist other solutions. I could imagine that users could define
> > different failover strategies. E.g. one could simply ignore the record,
> the
> > pattern could get deactivated on the affected TM or the processing fails.
> >
> > Maybe we are coupling the dynamic CEP pattern effort too much on where
> the
> > new patterns come from. Maybe we can split these efforts into supporting
> > dynamic CEP patterns on the TM reading from some source and then fork off
> > the discussion about introducing a user controlled control plane to
> Flink.
> > That way we wouldn't block this effort and could discuss more about the
> > exact properties such a user control plane would need to have. What do
> you
> > think?
> >
> > Cheers,
> > Till
> >
> > On Wed, Jan 5, 2022 at 7:18 AM Becket Qin  wrote:
> >
> > > Hi Till,
> > >
> > > Thanks for the comments and questions. To be clear, I am not saying
> 

Re: [VOTE] Apache Flink ML Release 2.0.0, release candidate #3

2022-01-05 Thread Becket Qin
+1 (binding)

- Verified the checksum and signature
- Built java code and ran all the tests
- Installed python packages according to the instructions in README.md
(there are some dependency conflicts but it looks they are due to my local
environment issues.)

Regards,

Jiangjie (Becket) Qin

On Wed, Jan 5, 2022 at 8:51 PM Dian Fu  wrote:

> +1 (binding)
>
> - Verified the checksum and signature
> - Build the Java code and also run the tests using `mvn clean verify`
> - Checked the NOTICE file
> - Pip installed the python package in MacOS under Python 3.7
> - Reviewed the flink-web PR
>
> Regards,
> Dian
>
> On Tue, Jan 4, 2022 at 12:25 AM Dong Lin  wrote:
>
> > Thank you Till for the vote.
> >
> > FYI, if you were not able to use `pip3 install` to verify the python
> source
> > release, that might be because you are using Python 3.9 (or later
> versions)
> > with pip3. The issue could be fixed by using e.g. Python 3.8.
> >
> > The supported python versions are documented in flink-ml-python/README.md
> > but we forgot to enforce this check in the setup.py previously. We have
> > merged PR (link ) to improve
> > the error message in the master branch.
> >
> >
> >
> >
> > On Mon, Jan 3, 2022 at 10:10 PM Till Rohrmann 
> > wrote:
> >
> > > +1 (binding)
> > >
> > > - Checked the checksums and signatures
> > > - Built java part from source release
> > > - Ran all Java tests
> > > - Checked the blog post PR
> > >
> > > What I did not manage to do is to build the Python part locally. I
> assume
> > > that this was due to my local Python setup. Maybe somebody else can
> > double
> > > check this part if not already done.
> > >
> > > Cheers,
> > > Till
> > >
> > > On Fri, Dec 31, 2021 at 10:16 AM Xingbo Huang 
> > wrote:
> > >
> > > > +1 (non-binding)
> > > >
> > > > - Verified checksums and signatures
> > > > - Pip install the apache-flink-ml package
> > > > - Run the apache-flink-ml tests (We will add more python examples in
> > the
> > > > next release https://issues.apache.org/jira/browse/FLINK-25497).
> > > >
> > > > Best,
> > > > Xingbo
> > > >
> > > > Zhipeng Zhang  于2021年12月31日周五 15:01写道:
> > > >
> > > > > +1 (non-binding)
> > > > >
> > > > > - Verified that the checksums and GPG files match the corresponding
> > > > release
> > > > > files
> > > > > - Verified that the source distributions do not contain any
> binaries
> > > > > - Built the source distribution with Maven to ensure all source
> files
> > > > have
> > > > > Apache headers
> > > > > - Verified that all POM files point to the same version
> > > > > - Verified that the README.md file does not have anything
> unexpected
> > > > > - Verified the NOTICE and LICENSE follows the rules
> > > > > - Checked JIRA release notes
> > > > > - Checked source code tag "release-2.0.0-rc3"
> > > > >
> > > > >
> > > > > Dong Lin  于2021年12月31日周五 09:09写道:
> > > > >
> > > > > > +1 (non-binding)
> > > > > >
> > > > > > - Verified that the checksums and GPG files match the
> corresponding
> > > > > release
> > > > > > files
> > > > > > - Verified that the source distributions do not contain any
> > binaries
> > > > > > - Built the source distribution with Maven to ensure all source
> > files
> > > > > have
> > > > > > Apache headers
> > > > > > - Verified that all POM files point to the same version
> > > > > > - Verified that the README.md file does not have anything
> > unexpected
> > > > > > - Verified the NOTICE and LICENSE follows the rules specified in
> > the
> > > > wiki
> > > > > > <
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/Verifying+a+Flink+ML+Release
> > > > > > >
> > > > > > .
> > > > > > - Checked JIRA release notes
> > > > > > - Checked source code tag "release-2.0.0-rc2"
> > > > > > - Checked flink-web PR
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Fri, Dec 31, 2021 at 1:24 AM Yun Gao 
> > > wrote:
> > > > > >
> > > > > > > Hi everyone,
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > Please review and vote on the release candidate #3 for the
> > version
> > > > > 2.0.0
> > > > > > > of Apache Flink ML,
> > > > > > >
> > > > > > > as follows:
> > > > > > >
> > > > > > > [ ] +1, Approve the release
> > > > > > >
> > > > > > > [ ] -1, Do not approve the release (please provide specific
> > > comments)
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > **Testing Guideline**
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > You can find here [1] a page in the project wiki on
> instructions
> > > for
> > > > > > > testing. To cast a vote, it is not necessary to perform all
> > listed
> > > > > > checks,
> > > > > > > but please mention which checks you have performed when voting.
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > **Release Overview**
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > As an overview, the release consists of the following:
> > > > > > >
> > > > > > > a) Flink ML source release to be deployed to 

Re: [DISCUSS] FLIP-206: Support PyFlink Runtime Execution in Thread Mode

2022-01-05 Thread Till Rohrmann
Thanks for the detailed answer Xingbo. Quick question on the last figure in
the FLIP. You said that this is a real world Flink stream SQL job. The
title of the graph says UDF(String Upper). So do I understand correctly
that string upper is the real world use case you have measured? What I
wanted to ask is how a slightly more complex Flink Python job (involving
shuffles, with back pressure, etc.) performs using the thread and process
mode respectively.

If the mode solely needs changes in the Python part of Flink, then I don't
have any concerns from the runtime perspective.

Cheers,
Till

On Wed, Jan 5, 2022 at 1:55 PM Xingbo Huang  wrote:

> Hi Till and Thomas,
>
> Thanks a lot for joining the discussion.
>
> For Till:
>
> >>> Is the slower performance currently the biggest pain point for our
> Python users? What else are our Python users mainly complaining about?
>
> PyFlink users are most concerned about two parts, one is better usability,
> the other is performance. Users often make some benchmarks when they
> investigate pyflink[1][2] at the beginning to decide whether to use
> PyFlink. The performance of a PyFlink job depends on two parts, one is the
> overhead of the PyFlink framework, and the other is the Python function
> complexity implemented by the user. In the Python ecosystem, there are many
> libraries and tools that can help Python users improve the performance of
> their custom functions, such as pandas[3], numba[4] and cython[5]. So we
> hope that the framework overhead of PyFlink itself can also be reduced.
>
> >>> Concerning the proposed changes, are there any changes required on the
> runtime side (changes to Flink)? How will the deployment and memory
> management be affected when using the thread execution mode?
>
> The changes on PyFlink Runtime mentioned here are actually only
> modifications of PyFlink custom Operators, such as
> PythonScalarFunctionOperator[6], which won't affect deployment and memory
> management.
>
> >>> One more question that came to my mind: How much performance
> improvement dowe gain on a real-world Python use case? Were the
> measurements more like micro benchmarks where the Python UDF was called w/o
> the overhead of Flink? I would just be curious how much the Python
> component contributes to the overall runtime of a real world job. Do we
> have some data on this?
>
> The last figure I put in FLIP is the performance comparison of three real
> Flink Stream Sql Jobs. They are a Java UDF job, a Python UDF job in Process
> Mode, and a Python UDF job in Thread Mode. The calculated value of QPS is
> the end-to-end Flink job execution result. As shown in the performance
> comparison chart, the performance of Python udf with the same function can
> often only reach 20% of Java udf, so the performance of python udf will
> often become the performance bottleneck in a PyFlink job.
>
> For Thomas:
>
> The first time that I realized the framework overhead of various IPC
> (socket, grpc, shared memory) cannot be ignored in some scenarios is due to
> an image algorithm prediction job of PyFlink. Its input parameters are a
> series of huge image binary arrays, and its data size is bigger than 1G.
> The performance overhead of serialization/deserialization has become an
> important part of its poor performance. Although this job is a bit extreme,
> through measurement, we did find the impact of the
> serialization/deserialization overhead caused by larger size parameters on
> the performance of the IPC framework.
>
> >>> As I understand it, you measured the difference in throughput for UPPER
> between process and embedded mode and the difference is 50% increased
> throughput?
>
> This 50% is the result when the data size is less than 100byte. When the
> data size reaches 1k, the performance of the Embedded Mode will reach about
> 3.5 times the performance of the Process Mode shown in the FLIP. When the
> data reaches 1M, the performance of Embedded Mode can reach 5 times the
> performance of the Process Mode. The biggest difference here is that in
> Embedded Mode, input/result data does not need to be
> serialized/deserialized.
>
> >>> Is that a typical UDF in your usage?
>
> The reason for choosing UPPER is that a simpler udf implementation can make
> it easier to evaluate the performance of different execution modes.
>
> >>> What do you observe when the function becomes more complex?
>
> We can analyze the QPS of the framework (process mode or embedded mode) and
> the QPS of the UDF calculation logic separately. A more complex UDF means
> that it is a UDF with a smaller QPS. The main factors that affect the
> framework QPS are data type of parameters, number of parameters and size of
> parameters, which will greatly affect the serialization/deserialization
> overhead in Process Mode.
>
> The purpose of introducing thread mode is not to replace Process mode, but
> to supplement Python udf usage scenarios such as cep and join, and some
> scenarios where higher performance 

[jira] [Created] (FLINK-25532) Provide Flink SQL CLI as Docker image

2022-01-05 Thread Martijn Visser (Jira)
Martijn Visser created FLINK-25532:
--

 Summary: Provide Flink SQL CLI as Docker image
 Key: FLINK-25532
 URL: https://issues.apache.org/jira/browse/FLINK-25532
 Project: Flink
  Issue Type: New Feature
  Components: Table SQL / Client
Reporter: Martijn Visser


Flink is currently available via as Docker images. However, the Flink SQL CLI 
isn't available as a Docker image. We should also provide this. 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[DISCUSS] Moving connectors from Flink to external connector repositories

2022-01-05 Thread Martijn Visser
Hi everyone,

As already mentioned in the previous discussion thread [1] I'm opening up a
parallel discussion thread on moving connectors from Flink to external
connector repositories. If you haven't read up on this discussion before, I
recommend reading that one first.

The goal with the external connector repositories is to make it easier to
develop and release connectors by not being bound to the release cycle of
Flink itself. It should result in faster connector releases, a more active
connector community and a reduced build time for Flink.

We currently have the following connectors available in Flink itself:

* Kafka -> For DataStream & Table/SQL users
* Upsert-Kafka -> For Table/SQL users
* Cassandra -> For DataStream users
* Elasticsearch -> For DataStream & Table/SQL users
* Kinesis -> For DataStream users & Table/SQL users
* RabbitMQ -> For DataStream users
* Google Cloud PubSub -> For DataStream users
* Hybrid Source -> For DataStream users
* NiFi -> For DataStream users
* Pulsar -> For DataStream users
* Twitter -> For DataStream users
* JDBC -> For DataStream & Table/SQL users
* FileSystem -> For DataStream & Table/SQL users
* HBase -> For DataStream & Table/SQL users
* DataGen -> For Table/SQL users
* Print -> For Table/SQL users
* BlackHole -> For Table/SQL users
* Hive -> For Table/SQL users

I would propose to move out all connectors except Hybrid Source,
FileSystem, DataGen, Print and BlackHole because:

* We should avoid at all costs that certain connectors are considered as
'Core' connectors. If that happens, it creates a perception that there are
first-grade/high-quality connectors because they are in 'Core' Flink and
second-grade/lesser-quality connectors because they are outside of the
Flink codebase. It directly hurts the goal, because these connectors are
still bound to the release cycle of Flink. Last but not least, it risks any
success of external connector repositories since every connector
contributor would still want to be in 'Core' Flink.
* To continue on the quality of connectors, we should aim that all
connectors are of high quality. That means that we shouldn't have a
connector that's only available for either DataStream or Table/SQL users,
but for both. It also means that (if applicable) the connector should
support all options, like bounded and unbounded scan, lookup, batch and
streaming sink capabilities. In the end the quality should depend on the
maintainers of the connector, not on where the code is maintained.
* The Hybrid Source connector is a special connector because of its
purpose.
* The FileSystem, DataGen, Print and BlackHole connectors are important for
first time Flink users/testers. If you want to experiment with Flink, you
will most likely start with a local file before moving to one of the other
sources or sinks. These 4 connectors can help with either reading/writing
local files or generating/displaying/ignoring data.
* Some of the connectors haven't been maintained in a long time (for
example, NiFi and Google Cloud PubSub). An argument could be made that we
check if we actually want to move such a connector or make the decision to
drop the connector entirely.

I'm looking forward to your thoughts!

Best regards,

Martijn Visser | Product Manager

mart...@ververica.com

[1] https://lists.apache.org/thread/bywh947r2f5hfocxq598zhyh06zhksrm




Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time


Re: [VOTE] Release flink-shaded 15.0, release candidate #1

2022-01-05 Thread Matthias Pohl
+1 (binding)

- Verified the checksums
- Checked the website PR
- Diff'd the NOTICE files comparing it to 14.0 to check for anything
suspicious
- build Flink shaded

Thanks,
Chesnay


On Tue, Dec 14, 2021 at 9:57 AM Chesnay Schepler  wrote:

> Hi everyone,
> Please review and vote on the release candidate #1 for the version 15.0,
> as follows:
> [ ] +1, Approve the release
> [ ] -1, Do not approve the release (please provide specific comments)
>
>
> The complete staging area is available for your review, which includes:
> * JIRA release notes [1],
> * the official Apache source release to be deployed to dist.apache.org
> [2], which are signed with the key with fingerprint C2EED7B111D464BA [3],
> * all artifacts to be deployed to the Maven Central Repository [4],
> * source code tag [5],
> * website pull request listing the new release [6].
>
> The vote will be open for at least 72 hours. It is adopted by majority
> approval, with at least 3 PMC affirmative votes.
>
> Thanks,
> Release Manager
>
> [1]
>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12350665
> [2] https://dist.apache.org/repos/dist/dev/flink/flink-shaded-15.0-rc1/
> [3] https://dist.apache.org/repos/dist/release/flink/KEYS
> [4] https://repository.apache.org/content/repositories/orgapacheflink-1458
> [5] https://github.com/apache/flink-shaded/releases/tag/release-15.0-rc1
> [6] https://github.com/apache/flink-web/pull/490
>


Re: [DISCUSS] FLIP-200: Support Multiple Rule and Dynamic Rule Changing (Flink CEP)

2022-01-05 Thread Becket Qin
Hi Till,

Thanks for the prompt reply. Like you said, we are indeed using the dynamic
CEP pattern use case to test the existing primitives in Flink to see if
they can meet the requirements. I fully understand the concern of
exposing OC as a user interface. Meanwhile I see CEP dynamic patterns as a
good opportunity to battle test and enhance the OC as a user facing control
plane which is currently missing. After all, there is no better person than
ourselves to try it out first.

It is not clear to me whether it is worth continuing the effort of
supporting dynamic CEP pattern without concluding the control plane
discussion. Let's say we have a CEP job reading from Kafka. To make this
work with side-input, a few things need to be done.

   1. In order to support dynamic patterns, users would create another
   Kafka topic as side-input to receive dynamic patterns.
   2. In order to insert dynamic patterns, users would use a separate web
   server that is provided by us as a separate tool. The web server takes http
   requests and sends dynamic pattern records to Kafka via a Kafka sink (using
   a KafkaProducer is likely simpler here, though).
   3. Regarding querying the running dynamic patterns, given Kafka is not
   queryable, users would probably introduce a database and insert the
   patterns there so they can query the running patterns. Maybe this could be
   done by the CEP operator side-output, so there is less chance of
   inconsistency between Kafka and the database.
   4. If the Flink job is to be stopped, the dynamic pattern Kafka topic
   needs to be deleted, the companion web server also needs to be stopped
   (assuming it is not shared with other CEP jobs), and the database table
   storing the dynamic pattern needs to be dropped.

Please correct me if I misunderstood something, but this seems quite
involved. Moreover, all the work here is going to be thrown away after we
have OC as a decent user facing control plane in place. And we will likely
have a backwards incompatible API change here. Given that, I am wondering
if we should wait until the OC discussion concludes before moving on with
the dynamic patterns?

Thanks,

Jiangjie (Becket) Qin



On Wed, Jan 5, 2022 at 5:53 PM Till Rohrmann  wrote:

> Thanks for the detailed explanation Becket.
>
> Do you think that an additional dependency is a deal breaker for people to
> use dynamic CEP patterns? At the very least people have to operate some
> kind of storage/queue system from which the CEP job can read anyway. Maybe
> it could be good enough to provide a REST endpoint (as a separate tool)
> that can be instantiated with a Flink sink to ingest REST requests
> into some queue. A general concern I have is that by making the JM a REST
> ingestion point for data will push another responsibility to Flink and
> increase the surface area further.
>
> For how to handle data that cannot be processed by patterns, I think there
> also exist other solutions. I could imagine that users could define
> different failover strategies. E.g. one could simply ignore the record, the
> pattern could get deactivated on the affected TM or the processing fails.
>
> Maybe we are coupling the dynamic CEP pattern effort too much on where the
> new patterns come from. Maybe we can split these efforts into supporting
> dynamic CEP patterns on the TM reading from some source and then fork off
> the discussion about introducing a user controlled control plane to Flink.
> That way we wouldn't block this effort and could discuss more about the
> exact properties such a user control plane would need to have. What do you
> think?
>
> Cheers,
> Till
>
> On Wed, Jan 5, 2022 at 7:18 AM Becket Qin  wrote:
>
> > Hi Till,
> >
> > Thanks for the comments and questions. To be clear, I am not saying that
> > the side-input stream does not work for the dynamic pattern update use
> > case. But I think OC is a better solution. The design goal for CEP
> dynamic
> > pattern is not only make it work, but also make it user friendly and
> > extensible. So as a user, I expect the following:
> >
> > - Use CEP with dynamic pattern update without depending on external
> > systems. e.g. updating the pattern by directly talking to the Flink job
> > itself.
> > - Some sort of isolation between patterns. e.g. a pattern update failure
> > won't cause other running patterns to fail.
> > - Easy to query the currently running patterns in the Flink job.
> > - Extensible to add new features. e.g. apply some pattern to a subset of
> > subtasks. Disable dynamic pattern update during a service window, etc.
> >
> > It looks to me that OC is a more promising way to to achieve the above.
> >
> > Please also see the reply to your questions inline below.
> >
> > > You mentioned that a TM might find out that a pattern is invalid and
> then
> > > it could use the 2-way communication with the JM to tell the other TMs
> > that
> > > the pattern is invalid. How exactly would a TM detect that a pattern is
> > > invalid at 

Re: [DISCUSS] FLIP-206: Support PyFlink Runtime Execution in Thread Mode

2022-01-05 Thread Xingbo Huang
Hi Till and Thomas,

Thanks a lot for joining the discussion.

For Till:

>>> Is the slower performance currently the biggest pain point for our
Python users? What else are our Python users mainly complaining about?

PyFlink users are most concerned about two parts, one is better usability,
the other is performance. Users often make some benchmarks when they
investigate pyflink[1][2] at the beginning to decide whether to use
PyFlink. The performance of a PyFlink job depends on two parts, one is the
overhead of the PyFlink framework, and the other is the Python function
complexity implemented by the user. In the Python ecosystem, there are many
libraries and tools that can help Python users improve the performance of
their custom functions, such as pandas[3], numba[4] and cython[5]. So we
hope that the framework overhead of PyFlink itself can also be reduced.

>>> Concerning the proposed changes, are there any changes required on the
runtime side (changes to Flink)? How will the deployment and memory
management be affected when using the thread execution mode?

The changes on PyFlink Runtime mentioned here are actually only
modifications of PyFlink custom Operators, such as
PythonScalarFunctionOperator[6], which won't affect deployment and memory
management.

>>> One more question that came to my mind: How much performance
improvement dowe gain on a real-world Python use case? Were the
measurements more like micro benchmarks where the Python UDF was called w/o
the overhead of Flink? I would just be curious how much the Python
component contributes to the overall runtime of a real world job. Do we
have some data on this?

The last figure I put in FLIP is the performance comparison of three real
Flink Stream Sql Jobs. They are a Java UDF job, a Python UDF job in Process
Mode, and a Python UDF job in Thread Mode. The calculated value of QPS is
the end-to-end Flink job execution result. As shown in the performance
comparison chart, the performance of Python udf with the same function can
often only reach 20% of Java udf, so the performance of python udf will
often become the performance bottleneck in a PyFlink job.

For Thomas:

The first time that I realized the framework overhead of various IPC
(socket, grpc, shared memory) cannot be ignored in some scenarios is due to
an image algorithm prediction job of PyFlink. Its input parameters are a
series of huge image binary arrays, and its data size is bigger than 1G.
The performance overhead of serialization/deserialization has become an
important part of its poor performance. Although this job is a bit extreme,
through measurement, we did find the impact of the
serialization/deserialization overhead caused by larger size parameters on
the performance of the IPC framework.

>>> As I understand it, you measured the difference in throughput for UPPER
between process and embedded mode and the difference is 50% increased
throughput?

This 50% is the result when the data size is less than 100byte. When the
data size reaches 1k, the performance of the Embedded Mode will reach about
3.5 times the performance of the Process Mode shown in the FLIP. When the
data reaches 1M, the performance of Embedded Mode can reach 5 times the
performance of the Process Mode. The biggest difference here is that in
Embedded Mode, input/result data does not need to be
serialized/deserialized.

>>> Is that a typical UDF in your usage?

The reason for choosing UPPER is that a simpler udf implementation can make
it easier to evaluate the performance of different execution modes.

>>> What do you observe when the function becomes more complex?

We can analyze the QPS of the framework (process mode or embedded mode) and
the QPS of the UDF calculation logic separately. A more complex UDF means
that it is a UDF with a smaller QPS. The main factors that affect the
framework QPS are data type of parameters, number of parameters and size of
parameters, which will greatly affect the serialization/deserialization
overhead in Process Mode.

The purpose of introducing thread mode is not to replace Process mode, but
to supplement Python udf usage scenarios such as cep and join, and some
scenarios where higher performance is pursued. Compared with Thread mode,
Process Mode has better isolation, which can solve the limitation of thread
mode in some scenarios such as session mode.

[1] https://www.mail-archive.com/user@flink.apache.org/msg42760.html
[2] https://www.mail-archive.com/user@flink.apache.org/msg44975.html
[3] https://pandas.pydata.org/
[4] https://cython.org/
[5] https://numba.pydata.org/
[6]
https://github.com/apache/flink/blob/master/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/PythonScalarFunctionOperator.java

Best,
Xingbo

Thomas Weise  于2022年1月4日周二 04:23写道:

> Interesting discussion. It caught my attention because I was also
> interested in the Beam fn execution overhead a few years ago.
>
> We found back then that while in theory the fn protocol 

Re: [VOTE] Apache Flink ML Release 2.0.0, release candidate #3

2022-01-05 Thread Dian Fu
+1 (binding)

- Verified the checksum and signature
- Build the Java code and also run the tests using `mvn clean verify`
- Checked the NOTICE file
- Pip installed the python package in MacOS under Python 3.7
- Reviewed the flink-web PR

Regards,
Dian

On Tue, Jan 4, 2022 at 12:25 AM Dong Lin  wrote:

> Thank you Till for the vote.
>
> FYI, if you were not able to use `pip3 install` to verify the python source
> release, that might be because you are using Python 3.9 (or later versions)
> with pip3. The issue could be fixed by using e.g. Python 3.8.
>
> The supported python versions are documented in flink-ml-python/README.md
> but we forgot to enforce this check in the setup.py previously. We have
> merged PR (link ) to improve
> the error message in the master branch.
>
>
>
>
> On Mon, Jan 3, 2022 at 10:10 PM Till Rohrmann 
> wrote:
>
> > +1 (binding)
> >
> > - Checked the checksums and signatures
> > - Built java part from source release
> > - Ran all Java tests
> > - Checked the blog post PR
> >
> > What I did not manage to do is to build the Python part locally. I assume
> > that this was due to my local Python setup. Maybe somebody else can
> double
> > check this part if not already done.
> >
> > Cheers,
> > Till
> >
> > On Fri, Dec 31, 2021 at 10:16 AM Xingbo Huang 
> wrote:
> >
> > > +1 (non-binding)
> > >
> > > - Verified checksums and signatures
> > > - Pip install the apache-flink-ml package
> > > - Run the apache-flink-ml tests (We will add more python examples in
> the
> > > next release https://issues.apache.org/jira/browse/FLINK-25497).
> > >
> > > Best,
> > > Xingbo
> > >
> > > Zhipeng Zhang  于2021年12月31日周五 15:01写道:
> > >
> > > > +1 (non-binding)
> > > >
> > > > - Verified that the checksums and GPG files match the corresponding
> > > release
> > > > files
> > > > - Verified that the source distributions do not contain any binaries
> > > > - Built the source distribution with Maven to ensure all source files
> > > have
> > > > Apache headers
> > > > - Verified that all POM files point to the same version
> > > > - Verified that the README.md file does not have anything unexpected
> > > > - Verified the NOTICE and LICENSE follows the rules
> > > > - Checked JIRA release notes
> > > > - Checked source code tag "release-2.0.0-rc3"
> > > >
> > > >
> > > > Dong Lin  于2021年12月31日周五 09:09写道:
> > > >
> > > > > +1 (non-binding)
> > > > >
> > > > > - Verified that the checksums and GPG files match the corresponding
> > > > release
> > > > > files
> > > > > - Verified that the source distributions do not contain any
> binaries
> > > > > - Built the source distribution with Maven to ensure all source
> files
> > > > have
> > > > > Apache headers
> > > > > - Verified that all POM files point to the same version
> > > > > - Verified that the README.md file does not have anything
> unexpected
> > > > > - Verified the NOTICE and LICENSE follows the rules specified in
> the
> > > wiki
> > > > > <
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/Verifying+a+Flink+ML+Release
> > > > > >
> > > > > .
> > > > > - Checked JIRA release notes
> > > > > - Checked source code tag "release-2.0.0-rc2"
> > > > > - Checked flink-web PR
> > > > >
> > > > >
> > > > >
> > > > > On Fri, Dec 31, 2021 at 1:24 AM Yun Gao 
> > wrote:
> > > > >
> > > > > > Hi everyone,
> > > > > >
> > > > > >
> > > > > >
> > > > > > Please review and vote on the release candidate #3 for the
> version
> > > > 2.0.0
> > > > > > of Apache Flink ML,
> > > > > >
> > > > > > as follows:
> > > > > >
> > > > > > [ ] +1, Approve the release
> > > > > >
> > > > > > [ ] -1, Do not approve the release (please provide specific
> > comments)
> > > > > >
> > > > > >
> > > > > >
> > > > > > **Testing Guideline**
> > > > > >
> > > > > >
> > > > > >
> > > > > > You can find here [1] a page in the project wiki on instructions
> > for
> > > > > > testing. To cast a vote, it is not necessary to perform all
> listed
> > > > > checks,
> > > > > > but please mention which checks you have performed when voting.
> > > > > >
> > > > > >
> > > > > >
> > > > > > **Release Overview**
> > > > > >
> > > > > >
> > > > > >
> > > > > > As an overview, the release consists of the following:
> > > > > >
> > > > > > a) Flink ML source release to be deployed to dist.apache.org
> > > > > >
> > > > > > b) Flink ML Python source distributions to be deployed to PyPI
> > > > > >
> > > > > > c) Maven artifacts to be deployed to the Maven Central Repository
> > > > > >
> > > > > >
> > > > > >
> > > > > > **Staging Areas to Review**
> > > > > >
> > > > > >
> > > > > >
> > > > > > The staging areas containing the above mentioned artifacts are as
> > > > > follows,
> > > > > > for your review:
> > > > > >
> > > > > > * All artifacts for a) and b) can be found in the corresponding
> dev
> > > > > > repository at dist.apache.org [2], which are signed with the key
> > > with
> > > > > > fingerprint 

[jira] [Created] (FLINK-25531) The test testRetryCommittableOnRetriableError takes one hour before completing succesfully

2022-01-05 Thread Martijn Visser (Jira)
Martijn Visser created FLINK-25531:
--

 Summary: The test testRetryCommittableOnRetriableError takes one 
hour before completing succesfully
 Key: FLINK-25531
 URL: https://issues.apache.org/jira/browse/FLINK-25531
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.15.0
Reporter: Martijn Visser


When working on https://issues.apache.org/jira/browse/FLINK-25504 I noticed 
that the {{test_ci kafka_gelly}} run took more then 1:30 hours. 

When looking at the logs for this PR 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=28866=logs=ae4f8708-9994-57d3-c2d7-b892156e7812=c5f0071e-1851-543e-9a45-9ac140befc32
 I noticed that 
{{org.apache.flink.connector.kafka.sink.KafkaCommitterTest.testRetryCommittableOnRetriableError}}
 is running for an hour:


{code:java}
13:22:31,145 [kafka-producer-network-thread | producer-transactionalId] WARN  
org.apache.kafka.clients.NetworkClient   [] - [Producer 
clientId=producer-transactionalId, transactionalId=transactionalId] Connection 
to node -1 (localhost/127.0.0.1:1) could not be established. Broker may not be 
available.
13:22:31,145 [kafka-producer-network-thread | producer-transactionalId] WARN  
org.apache.kafka.clients.NetworkClient   [] - [Producer 
clientId=producer-transactionalId, transactionalId=transactionalId] Bootstrap 
broker localhost:1 (id: -1 rack: null) disconnected
13:22:31,347 [kafka-producer-network-thread | producer-transactionalId] WARN  
org.apache.kafka.clients.NetworkClient   [] - [Producer 
clientId=producer-transactionalId, transactionalId=transactionalId] Connection 
to node -1 (localhost/127.0.0.1:1) could not be established. Broker may not be 
available.

...

14:22:29,472 [kafka-producer-network-thread | producer-transactionalId] WARN  
org.apache.kafka.clients.NetworkClient   [] - [Producer 
clientId=producer-transactionalId, transactionalId=transactionalId] Bootstrap 
broker localhost:1 (id: -1 rack: null) disconnected
14:22:30,324 [kafka-producer-network-thread | producer-transactionalId] WARN  
org.apache.kafka.clients.NetworkClient   [] - [Producer 
clientId=producer-transactionalId, transactionalId=transactionalId] Connection 
to node -1 (localhost/127.0.0.1:1) could not be established. Broker may not be 
available.
14:22:30,324 [kafka-producer-network-thread | producer-transactionalId] WARN  
org.apache.kafka.clients.NetworkClient   [] - [Producer 
clientId=producer-transactionalId, transactionalId=transactionalId] Bootstrap 
broker localhost:1 (id: -1 rack: null) disconnected
14:22:31,144 [main] INFO  
org.apache.kafka.clients.producer.KafkaProducer  [] - [Producer 
clientId=producer-transactionalId, transactionalId=transactionalId] Proceeding 
to force close the producer since pending requests could not be completed 
within timeout 360 ms.
14:22:31,145 [kafka-producer-network-thread | producer-transactionalId] INFO  
org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer 
clientId=producer-transactionalId, transactionalId=transactionalId] Transiting 
to fatal error state due to org.apache.kafka.common.KafkaException: The 
producer closed forcefully
14:22:31,145 [kafka-producer-network-thread | producer-transactionalId] INFO  
org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer 
clientId=producer-transactionalId, transactionalId=transactionalId] Transiting 
to fatal error state due to org.apache.kafka.common.KafkaException: The 
producer closed forcefully
14:22:31,148 [main] INFO  
org.apache.flink.connectors.test.common.junit.extensions.TestLoggerExtension [] 
- 

Test 
org.apache.flink.connector.kafka.sink.KafkaCommitterTest.testRetryCommittableOnRetriableError
 successfully run.

{code}




--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25530) Support Pulsar source connector in Python DataStream API.

2022-01-05 Thread Ada Wong (Jira)
Ada Wong created FLINK-25530:


 Summary: Support Pulsar source connector in Python DataStream API.
 Key: FLINK-25530
 URL: https://issues.apache.org/jira/browse/FLINK-25530
 Project: Flink
  Issue Type: New Feature
  Components: API / Python
Affects Versions: 1.14.2
Reporter: Ada Wong


Flink have supported Pulsar source connector.

https://issues.apache.org/jira/browse/FLINK-20726



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: [DISCUSS] FLIP-201: Persist local state in working directory

2022-01-05 Thread David Morávek
+1 the general direction here seems pretty solid

D.


On Wed, Jan 5, 2022 at 11:57 AM Till Rohrmann  wrote:

> If there is no other larger feedback, I would start the vote soonish.
>
> Cheers,
> Till
>
> On Thu, Dec 30, 2021 at 4:28 PM Till Rohrmann 
> wrote:
>
> > Hi David,
> >
> > Thanks for your feedback.
> >
> > With the graceful shutdown I mean a way to stop the TaskManager and to
> > clean up the working directory. At the moment, I think we always kill the
> > process via SIGTERM or SIGKILL. This won't clean up the working directory
> > because it could also originate from a process failure. I think what
> Niklas
> > does is to introduce a signal handler to react to SIGTERM to disconnect
> > from the JobMaster.
> >
> > You are right that by default Flink will now set the RocksDB directory to
> > the working temp directory. Before it defaulted to the spilling
> > directories. I think this is not a problem because users can still
> manually
> > configure multiple RocksDB directories via
> state.backend.rocksdb.localdir.
> > Moreover, I am not sure how well this mechanism works in practice. Flink
> > will simply iterate through the directories when creating new RocksDB
> state
> > backends w/o a lot of smartness. If one of the directories is full, then
> > Flink won't use another one but simply fail.
> >
> > I do see the point of a proper serialization format and I agree that we
> > should eventually implement it. My reasoning was that the PR is already
> > quite big and I would prefer getting it in and then tackling this problem
> > as a follow-up instead of increasing the scope of the changes further
> > because the serialization format is not required for this feature
> (strictly
> > speaking). I hope that this makes sense.
> >
> > I will also respond to your PR comments.
> >
> > Cheers,
> > Till
> >
> > On Thu, Dec 30, 2021 at 4:00 PM David Morávek  wrote:
> >
> >> Hi Till,
> >>
> >> thanks for drafting the FLIP, it looks really good. I did a quick pass
> >> over
> >> the PR and it seems to be heading in a right direction.
> >>
> >> It might be required to introduce a graceful shutdown of the
> TaskExecutor
> >> > in order to support proper cleanup of resources.
> >> >
> >>
> >> This is actively being worked on by Niklas in FLINK-25277 [1].
> >>
> >> In the PR, I've seen that you're also replacing directories for storing
> >> the
> >> local state with the working directory. Should this be a concern? Is for
> >> example rocksdb able to leverage multiple mount paths for spreading the
> >> load?
> >>
> >> I'd also be in favor of introducing a proper (evolving) serialization
> >> format right away instead of the Java serialization, but no hard
> feelings
> >> if we don't.
> >>
> >> [1] https://issues.apache.org/jira/browse/FLINK-25277
> >>
> >> Best,
> >> D.
> >>
> >> On Wed, Dec 29, 2021 at 4:58 PM Till Rohrmann 
> >> wrote:
> >>
> >> > I've created draft PR for the desired changes [1]. It might be easier
> to
> >> > take a look at than the branch.
> >> >
> >> > [1] https://github.com/apache/flink/pull/18237
> >> >
> >> > Cheers,
> >> > Till
> >> >
> >> > On Tue, Dec 28, 2021 at 3:22 PM Till Rohrmann 
> >> > wrote:
> >> >
> >> > > Hi everyone,
> >> > >
> >> > > I would like to start a discussion about using the working directory
> >> to
> >> > > persist local state for faster recovery (FLIP-201) [1]. Persisting
> the
> >> > > local state will be beneficial if a crashed process is restarted
> with
> >> the
> >> > > same working directory. In this case, Flink does not have to
> download
> >> the
> >> > > state artifacts again and can recover locally.
> >> > >
> >> > > A POC can be found here [2].
> >> > >
> >> > > Looking forward to your feedback.
> >> > >
> >> > > [1] https://cwiki.apache.org/confluence/x/wJuqCw
> >> > > [2] https://github.com/tillrohrmann/flink/tree/FLIP-201
> >> > >
> >> > > Cheers,
> >> > > Till
> >> > >
> >> >
> >>
> >
>


Re: [DISCUSS] FLIP-201: Persist local state in working directory

2022-01-05 Thread Till Rohrmann
If there is no other larger feedback, I would start the vote soonish.

Cheers,
Till

On Thu, Dec 30, 2021 at 4:28 PM Till Rohrmann  wrote:

> Hi David,
>
> Thanks for your feedback.
>
> With the graceful shutdown I mean a way to stop the TaskManager and to
> clean up the working directory. At the moment, I think we always kill the
> process via SIGTERM or SIGKILL. This won't clean up the working directory
> because it could also originate from a process failure. I think what Niklas
> does is to introduce a signal handler to react to SIGTERM to disconnect
> from the JobMaster.
>
> You are right that by default Flink will now set the RocksDB directory to
> the working temp directory. Before it defaulted to the spilling
> directories. I think this is not a problem because users can still manually
> configure multiple RocksDB directories via state.backend.rocksdb.localdir.
> Moreover, I am not sure how well this mechanism works in practice. Flink
> will simply iterate through the directories when creating new RocksDB state
> backends w/o a lot of smartness. If one of the directories is full, then
> Flink won't use another one but simply fail.
>
> I do see the point of a proper serialization format and I agree that we
> should eventually implement it. My reasoning was that the PR is already
> quite big and I would prefer getting it in and then tackling this problem
> as a follow-up instead of increasing the scope of the changes further
> because the serialization format is not required for this feature (strictly
> speaking). I hope that this makes sense.
>
> I will also respond to your PR comments.
>
> Cheers,
> Till
>
> On Thu, Dec 30, 2021 at 4:00 PM David Morávek  wrote:
>
>> Hi Till,
>>
>> thanks for drafting the FLIP, it looks really good. I did a quick pass
>> over
>> the PR and it seems to be heading in a right direction.
>>
>> It might be required to introduce a graceful shutdown of the TaskExecutor
>> > in order to support proper cleanup of resources.
>> >
>>
>> This is actively being worked on by Niklas in FLINK-25277 [1].
>>
>> In the PR, I've seen that you're also replacing directories for storing
>> the
>> local state with the working directory. Should this be a concern? Is for
>> example rocksdb able to leverage multiple mount paths for spreading the
>> load?
>>
>> I'd also be in favor of introducing a proper (evolving) serialization
>> format right away instead of the Java serialization, but no hard feelings
>> if we don't.
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-25277
>>
>> Best,
>> D.
>>
>> On Wed, Dec 29, 2021 at 4:58 PM Till Rohrmann 
>> wrote:
>>
>> > I've created draft PR for the desired changes [1]. It might be easier to
>> > take a look at than the branch.
>> >
>> > [1] https://github.com/apache/flink/pull/18237
>> >
>> > Cheers,
>> > Till
>> >
>> > On Tue, Dec 28, 2021 at 3:22 PM Till Rohrmann 
>> > wrote:
>> >
>> > > Hi everyone,
>> > >
>> > > I would like to start a discussion about using the working directory
>> to
>> > > persist local state for faster recovery (FLIP-201) [1]. Persisting the
>> > > local state will be beneficial if a crashed process is restarted with
>> the
>> > > same working directory. In this case, Flink does not have to download
>> the
>> > > state artifacts again and can recover locally.
>> > >
>> > > A POC can be found here [2].
>> > >
>> > > Looking forward to your feedback.
>> > >
>> > > [1] https://cwiki.apache.org/confluence/x/wJuqCw
>> > > [2] https://github.com/tillrohrmann/flink/tree/FLIP-201
>> > >
>> > > Cheers,
>> > > Till
>> > >
>> >
>>
>


Re: [DISCUSS] Creating an external connector repository

2022-01-05 Thread Martijn Visser
Hi everyone,

I wanted to summarise the email thread and see if there are any open items
that still need to be discussed, before we can finalise the discussion in
this email thread:

1. About having multi connectors in one repo or each connector in its own
repository

As explained by @Arvid Heise  we ultimately propose to
have a single repository per connector, which seems to be favoured in the
community.

2. About having the connector repositories under ASF or not.

The consensus is that all connectors would remain under the ASF.

I think we can categorise the questions or concerns that are brought
forward as the following one:

3. How would we set up the testing?

We need to make sure that we provide a proper testing framework, which
means that we provide a public Source- and Sink testing framework. As
mentioned extensively in the thread, we need to make sure that the
necessary interfaces are properly annotated and at least @PublicEvolving.
This also includes the test infrastructure, like MiniCluster. For the
latter, we don't know exactly yet how to balance having publicly available
test infrastructure vs being able to iterate inside of Flink, but we can
all agree this has to be solved.

For testing infrastructure, we would like to use Github Actions. In the
current state, it probably makes sense for a connector repo to follow the
branching strategy of Flink. That will ensure a match between the released
connector and Flink version. This should change when all the Flink
interfaces have stabilised so you can use a connector with multiple Flink
versions. That means that we should have a nightly build test for:

- The `main` branch of the connector (which would be the unreleased
version) against the `master` branch of Flink (the unreleased version of
Flink).
- Any supported `release-X.YY` branch of the connector against the
`release-X.YY` branch of Flink.

We should also have a smoke test E2E tests in Flink (one for DataStream,
one for Table, one for SQL, one for Python) which loads all the connectors
and does an arbitrary test (post data on source, load into Flink, sink
output and compare that output is as expected.

4. How would we integrate documentation?

Documentation for a connector should probably end up in the connector
repository. The Flink website should contain one entrance to all connectors
(so not the current approach where we have connectors per DataStream API,
Table API etc). Each connector documentation should end up as one menu item
in connectors, containing all necessary information for all DataStream,
Table, SQL and Python implementations.

5. Which connectors should end up in the external connector repo?

I'll open up a separate thread on this topic to have a parallel discussion
on that. We should reach consensus on both threads before we can move
forward on this topic as a whole.

Best regards,

Martijn

On Fri, 10 Dec 2021 at 04:47, Thomas Weise  wrote:

> +1 for repo per connector from my side also
>
> Thanks for trying out the different approaches.
>
> Where would the common/infra pieces live? In a separate repository
> with its own release?
>
> Thomas
>
> On Thu, Dec 9, 2021 at 12:42 PM Till Rohrmann 
> wrote:
> >
> > Sorry if I was a bit unclear. +1 for the single repo per connector
> approach.
> >
> > Cheers,
> > Till
> >
> > On Thu, Dec 9, 2021 at 5:41 PM Till Rohrmann 
> wrote:
> >
> > > +1 for the single repo approach.
> > >
> > > Cheers,
> > > Till
> > >
> > > On Thu, Dec 9, 2021 at 3:54 PM Martijn Visser 
> > > wrote:
> > >
> > >> I also agree that it feels more natural to go with a repo for each
> > >> individual connector. Each repository can be made available at
> > >> flink-packages.org so users can find them, next to referring to them
> in
> > >> documentation. +1 from my side.
> > >>
> > >> On Thu, 9 Dec 2021 at 15:38, Arvid Heise  wrote:
> > >>
> > >> > Hi all,
> > >> >
> > >> > We tried out Chesnay's proposal and went with Option 2.
> Unfortunately,
> > >> we
> > >> > experienced tough nuts to crack and feel like we hit a dead end:
> > >> > - The main pain point with the outlined Frankensteinian connector
> repo
> > >> is
> > >> > how to handle shared code / infra code. If we have it in some
> 
> > >> > branch, then we need to merge the common branch in the connector
> branch
> > >> on
> > >> > update. However, it's unclear to me how improvements in the common
> > >> branch
> > >> > that naturally appear while working on a specific connector go back
> into
> > >> > the common branch. You can't use a pull request from your branch or
> else
> > >> > your connector code would poison the connector-less common branch.
> So
> > >> you
> > >> > would probably manually copy the files over to a common branch and
> > >> create a
> > >> > PR branch for that.
> > >> > - A weird solution could be to have the common branch as a
> submodule in
> > >> the
> > >> > repo itself (if that's even possible). I'm sure that this setup
> would
> > >> blow
> > >> > up the minds of all 

Re: [DISCUSS] Deprecate MapR FS

2022-01-05 Thread Till Rohrmann
+1 for dropping the MapR FS.

Cheers,
Till

On Wed, Jan 5, 2022 at 10:11 AM Martijn Visser 
wrote:

> Hi everyone,
>
> Thanks for your input. I've checked the MapR implementation and it has no
> annotation at all. Given the circumstances that we thought that MapR was
> already dropped, I would propose to immediately remove MapR in Flink 1.15
> instead of first marking it as deprecated and removing it in Flink 1.16.
>
> Please let me know what you think.
>
> Best regards,
>
> Martijn
>
> On Thu, 9 Dec 2021 at 17:27, David Morávek  wrote:
>
>> +1, agreed with Seth's reasoning. There has been no real activity in MapR
>> FS module for years [1], so the eventual users should be good with using
>> the jars from the older Flink versions for quite some time
>>
>> [1]
>> https://github.com/apache/flink/commits/master/flink-filesystems/flink-mapr-fs
>>
>> Best,
>> D.
>>
>> On Thu, Dec 9, 2021 at 4:28 PM Konstantin Knauf 
>> wrote:
>>
>>> +1 (what Seth said)
>>>
>>> On Thu, Dec 9, 2021 at 4:15 PM Seth Wiesman  wrote:
>>>
>>> > +1
>>> >
>>> > I actually thought we had already dropped this FS. If anyone is still
>>> > relying on it in production, the file system abstraction in Flink has
>>> been
>>> > incredibly stable over the years. They should be able to use the 1.14
>>> MapR
>>> > FS with later versions of Flink.
>>> >
>>> > Seth
>>> >
>>> > On Wed, Dec 8, 2021 at 10:03 AM Martijn Visser 
>>> > wrote:
>>> >
>>> >> Hi all,
>>> >>
>>> >> Flink supports multiple file systems [1] which includes MapR FS. MapR
>>> as
>>> >> a company doesn't exist anymore since 2019, the technology and
>>> intellectual
>>> >> property has been sold to Hewlett Packard.
>>> >>
>>> >> I don't think that there's anyone who's using MapR anymore and
>>> therefore
>>> >> I think it would be good to deprecate this for Flink 1.15 and then
>>> remove
>>> >> it in Flink 1.16. Removing this from Flink will slightly shrink the
>>> >> codebase and CI runtime.
>>> >>
>>> >> I'm also cross posting this to the User mailing list, in case there's
>>> >> still anyone who's using MapR.
>>> >>
>>> >> Best regards,
>>> >>
>>> >> Martijn
>>> >>
>>> >> [1]
>>> >>
>>> https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/filesystems/overview/
>>> >>
>>> >
>>>
>>> --
>>>
>>> Konstantin Knauf
>>>
>>> https://twitter.com/snntrable
>>>
>>> https://github.com/knaufk
>>>
>>


Re: [DISCUSS] JUnit 5 Migration

2022-01-05 Thread Ryan Skraba
Hello!  I can help out with the effort -- I've got a bit of experience with
JUnit 4 and 5 migration, and it looks like even with the AssertJ scripts
there's going to be a lot of mechanical and manual work to be done.  The
migration document looks pretty comprehensive!

For the remaining topics to be discussed:

I don't have a strong opinion on what to do about parameterized tests that
use inheritance, although Jing Ge's proposal sounds reasonable and easy to
follow.  I wouldn't be worried about temporarily redundant test code too
much if it simplifies getting us into a good final state, especially since
redundant code would be easy to spot and remove when we get rid of JUnit 4
artifacts.

Getting rid of PowerMock sounds fine to me.

I don't think it's necessary to have a common author for commits, given
that commits will have the [JUnit5 migration] tag.  I guess my preference
would be to have "one or a few" commits per module, merged progressively.

Is there an existing branch on a repo with some of the modules already
migrated?

All my best, Ryan

On Fri, Dec 17, 2021 at 5:19 PM Jing Ge  wrote:

> Thanks Hang and Qingsheng for your effort and starting this discussion. As
> additional information, I've created an umbrella ticket(
> https://issues.apache.org/jira/browse/FLINK-25325). It is recommended to
> create all JUnit5 migration related tasks under it, So we could track the
> whole migration easily.
>
> I think, for the parameterized test issue, the major problem is that, on
> one hand, JUnit 5 has its own approach to make parameterized tests and it
> does not allow to use parameterized fixtures at class level. This is a huge
> difference compared to JUnit4. On the other hand, currently, there are many
> cross module test class inheritances, which means that the migration could
> not be done in one shot. It must be allowed to run JUnit4 and JUnit5 tests
> simultaneously during the migration process. As long as there are sub
> parameterized test classes in JUnit4, it will be risky to migrate the
> parent class to JUnit5. And if the parent class has to stick with JUnit4
> during the migration, any migrated JUnit5 subclass might need to duplicate
> the test methods defined in the parent class. In this case, I would prefer
> to duplicate the test methods with different names in the parent class for
> both JUnit4 and JUnit5 only during the migration process as temporary
> solution and remove the test methods for JUnit4 once the migration process
> is finished, i.e. when all subclasses are JUnit5 tests. It is a trade-off
> solution. Hopefully we could find another better solution during the
> discussion.
>
> Speaking of replacing @Test with @TestTemplate, since I did read all tests,
> do we really need to replace all of them with @TestTemplate w.r.t. the
> parameterized tests?
>
> For the PowrMock tests, it is a good opportunity to remove them.
>
> best regards
> Jing
>
> On Fri, Dec 17, 2021 at 2:14 PM Hang Ruan  wrote:
>
> > Hi, all,
> >
> > Apache Flink is using JUnit for unit tests and integration tests widely
> in
> > the project, however, it binds to the legacy JUnit 4 deeply. It is
> > important to migrate existing cases to JUnit 5 in order to avoid
> splitting
> > the project into different JUnit versions.
> >
> > Qingsheng Ren and I have conducted some trials about the JUnit 5
> migration,
> > but there are too many modules that need to migrate. We would like to get
> > more help from the community. It is planned to migrate module by module,
> > and a JUnit 5 migration guide
> > <
> >
> https://docs.google.com/document/d/1514Wa_aNB9bJUen4xm5uiuXOooOJTtXqS_Jqk9KJitU/edit?usp=sharing
> > >[1]
> > has been provided to new helpers on the cooperation method and how to
> > migrate.
> >
> > There are still some problem to discuss:
> >
> > 1. About parameterized test:
> >
> > Some test classes inherit from other base test classes. We have discussed
> > different situations in the guide, but the situation where a
> parameterized
> > test subclass inherits from a non parameterized parent class has not been
> > resolved.
> >
> > In JUnit 4, the parent test class always has some test cases annotated by
> > @Test. And  the parameterized subclass will run these test cases in the
> > parent class in a parameterized way.
> >
> > In JUnit 5, if we want a test case to be invoked multiple times, the test
> > case must be annotated by @TestTemplate. A test case can not be annotated
> > by both @Test and @TestTemplate, which means a test case can not be
> invoked
> > as both a parameterized test and a non parameterized test.
> >
> > We thought of two ways to migrate this situation, but not good enough.
> Both
> > two ways will introduce redundant codes, and make it hard to maintain.
> >
> > The first way is to change the parent class to a parameterized test and
> > replace @Test tests to @TestTemplate tests. For its non parameterized
> > subclasses, we provide them a fake parameter method, which will provide
> > 

Re: [DISCUSS] FLIP-200: Support Multiple Rule and Dynamic Rule Changing (Flink CEP)

2022-01-05 Thread Till Rohrmann
Thanks for the detailed explanation Becket.

Do you think that an additional dependency is a deal breaker for people to
use dynamic CEP patterns? At the very least people have to operate some
kind of storage/queue system from which the CEP job can read anyway. Maybe
it could be good enough to provide a REST endpoint (as a separate tool)
that can be instantiated with a Flink sink to ingest REST requests
into some queue. A general concern I have is that by making the JM a REST
ingestion point for data will push another responsibility to Flink and
increase the surface area further.

For how to handle data that cannot be processed by patterns, I think there
also exist other solutions. I could imagine that users could define
different failover strategies. E.g. one could simply ignore the record, the
pattern could get deactivated on the affected TM or the processing fails.

Maybe we are coupling the dynamic CEP pattern effort too much on where the
new patterns come from. Maybe we can split these efforts into supporting
dynamic CEP patterns on the TM reading from some source and then fork off
the discussion about introducing a user controlled control plane to Flink.
That way we wouldn't block this effort and could discuss more about the
exact properties such a user control plane would need to have. What do you
think?

Cheers,
Till

On Wed, Jan 5, 2022 at 7:18 AM Becket Qin  wrote:

> Hi Till,
>
> Thanks for the comments and questions. To be clear, I am not saying that
> the side-input stream does not work for the dynamic pattern update use
> case. But I think OC is a better solution. The design goal for CEP dynamic
> pattern is not only make it work, but also make it user friendly and
> extensible. So as a user, I expect the following:
>
> - Use CEP with dynamic pattern update without depending on external
> systems. e.g. updating the pattern by directly talking to the Flink job
> itself.
> - Some sort of isolation between patterns. e.g. a pattern update failure
> won't cause other running patterns to fail.
> - Easy to query the currently running patterns in the Flink job.
> - Extensible to add new features. e.g. apply some pattern to a subset of
> subtasks. Disable dynamic pattern update during a service window, etc.
>
> It looks to me that OC is a more promising way to to achieve the above.
>
> Please also see the reply to your questions inline below.
>
> > You mentioned that a TM might find out that a pattern is invalid and then
> > it could use the 2-way communication with the JM to tell the other TMs
> that
> > the pattern is invalid. How exactly would a TM detect that a pattern is
> > invalid at data processing time? And assuming that this is possible for a
> > single TM, why can't the other TMs not detect this problem? Is it because
> > the validity of patterns might depend on data that is different between
> > TMs?
>
>
> Yes, the TMs are processing different records, so some may encounter issue
> while others are running fine, for example, a field may be missing from a
> record while the pattern requires it. Even if all the TMs have detected the
> problem, without a channel to report back the issue, the only thing TMs can
> do is either to throw exception and stop processing.
>
> You also mentioned that deep-learning-on-flink could benefit from an OC
> > control plane but I couldn't find the referenced link. Would
> > deep-learning-on-flink like to use OCs as a kind of parameter server?
> Could
> > you elaborate on deep-learning-on-flink's usage of such a feature?
>
>
> deep-learning-on-flink[1] has a general ML cluster abstraction that can run
> TF / PyTorch etc. This cluster has a ML master role to manage the
> liveliness of all the ML workers which are python processes running
> side-by-side with TMs. The ML workers can be PS or Worker of TensorFlow.
> For example, if 10 TF worker and 5 TF PS nodes are needed, a Flink UDF
> operator of parallelism 10 and a Flink Source operator with parallelism = 5
> will be created to run the TF worker and TF PS respectively. All the 15
> nodes running either PS or Worker are managed by the ML master. Prior to
> OC, the implementation was having a Source operator with parallelism = 1 to
> run the master; let the ML workers register themselves to the an external
> ZK so the master can discover them. So basically users have to build their
> own control plane. With OC, at very least, we no longer need ZK anymore.
>
> Concerning reprocessing command history and repeatable processing results I
> > think there is actually quite a big difference between using a Flink
> source
> > vs. offering a REST server that can receive command that are distributed
> > via the OCs. In the former case we can rely on the external system to
> > persist commands whereas in the latter approach we/users have to
> implement
> > a storage solution on their own (in the simplest case users will probably
> > store everything in state which might grow indefinitely if stored as a
> > changelog).
>
>

Re: [DISCUSS] Looking for maintainers for Google PubSub connector or discuss next step

2022-01-05 Thread Martijn Visser
Hi Ryan,

Like Till said, your help would be much appreciated. The open tickets are
the most pressing ones for PubSub. The first ticket also has an open PR
that could be interesting to go through. Feel free to reach out to the Dev
mailing list for any questions or review requests.

Best regards,

Martijn

On Wed, 5 Jan 2022 at 10:22, Till Rohrmann  wrote:

> Thanks a lot for helping us with the PubSub connector Ryan. This is highly
> appreciated! I think going through the open PRs and open issues might be a
> good first step.
>
> Cheers,
> Till
>
> On Tue, Jan 4, 2022 at 5:42 PM Ryan Skraba 
> wrote:
>
> > Hello,
> >
> > I'm familiar with the Pub/Sub connectors from the Apache Beam project,
> but
> > quite a bit less so with Flink.  This looks like a good learning
> > opportunity, and I'd be interested in helping out here.
> >
> > If we decide to keep the connector, I can start taking a look at the next
> > step: going through the existing PR, fixing conflicts with master.
> >
> > All my best, Ryan
> >
> > On Mon, Jan 3, 2022 at 3:41 PM Martijn Visser 
> > wrote:
> >
> > > Hi everyone,
> > >
> > > We're looking for community members, who would like to maintain Flink's
> > > Google PubSub connector [1] going forward. There are multiple
> improvement
> > > tickets open and the original contributors are currently unable to work
> > on
> > > further improvements.
> > >
> > > An overview of some of the open tickets:
> > >
> > > * https://issues.apache.org/jira/browse/FLINK-20625 -> Refactor PubSub
> > > Source to use new Source API (FLIP-27)
> > > * https://issues.apache.org/jira/browse/FLINK-24298 -> Refactor PubSub
> > > Sink
> > > to use new Sink API (FLIP-143 / FLIP-171)
> > > * https://issues.apache.org/jira/browse/FLINK-24299 -> Make PubSub
> > > available as Source and Sink for Table/SQL users
> > >
> > > Next to these tickets, the connector only supports Java 8 and we can
> > > improve on the tests for this connector.
> > >
> > > If you would like to take on this responsibility or can join this
> effort
> > > in a supporting role, please reach out!
> > >
> > > If we can't find maintainers for this connector, what do you think we
> > > should do? I would be in favour of dropping the connector from Flink.
> We
> > > could also consider moving the connector, either to the new external
> > > connector repository or Apache Bahir. I'm not sure if that would be
> > > valuable, since at some point connector won't work in Flink (since it
> > > doesn't use the target interfaces) and the source code can still be
> found
> > > in Flink's git repo by looking back to previous versions.
> > >
> > > I'm looking forward to your thoughts.
> > >
> > > Best regards,
> > >
> > > Martijn Visser
> > >
> > > [1]
> > >
> > >
> >
> https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/pubsub/
> > >
> >
>


Re: [DISCUSS] FLIP-208: Update KafkaSource to detect EOF based on de-serialized record

2022-01-05 Thread Qingsheng Ren
Hi Dong, 

Thanks for making this FLIP. I share the same concern with Martijn. This looks 
like a feature that could be shared across all sources so I think it’ll be 
great to make it a general one. 

Instead of passing the RecordEvaluator to SourceReaderBase, what about 
embedding the evaluator into SourceOperator? We can create a wrapper 
SourceOutput in SourceOperator and intercept records emitted by SourceReader. 
This could make this feature individual from implementation   of SourceReader 
so it's applicable for all sources. The API to users looks like:

env.fromSource(source, watermarkStrategy, name, recordEvaluator)

or

env.fromSource(…).withRecordEvaluator(evaluator)

What do you think?

Best regards, 

Qingsheng

> On Jan 4, 2022, at 3:31 PM, Martijn Visser  wrote:
> 
> Hi Dong,
> 
> Thanks for writing the FLIP. It focusses only on the KafkaSource, but I
> would expect that if such a functionality is desired, it should be made
> available for all unbounded sources (for example, Pulsar and Kinesis). If
> it's only available for Kafka, I see it as if we're increasing feature
> sparsity while we actually want to decrease that. What do you think?
> 
> Best regards,
> 
> Martijn
> 
> On Tue, 4 Jan 2022 at 08:04, Dong Lin  wrote:
> 
>> Hi all,
>> 
>> We created FLIP-208: Update KafkaSource to detect EOF based on
>> de-serialized records. Please find the KIP wiki in the link
>> 
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-208%3A+Update+KafkaSource+to+detect+EOF+based+on+de-serialized+records
>> .
>> 
>> This FLIP aims to address the use-case where users need to stop a Flink job
>> gracefully based on the content of de-serialized records observed in the
>> KafkaSource. This feature is needed by users who currently depend on
>> KafkaDeserializationSchema::isEndOfStream() to migrate their Flink job from
>> FlinkKafkaConsumer to KafkaSource.
>> 
>> Could you help review this FLIP when you get time? Your comments are
>> appreciated!
>> 
>> Cheers,
>> Dong
>> 



[jira] [Created] (FLINK-25529) java.lang.ClassNotFoundException: org.apache.orc.PhysicalWriter when write bulkly into hive-2.1.1 orc table

2022-01-05 Thread Yuan Zhu (Jira)
Yuan Zhu created FLINK-25529:


 Summary: java.lang.ClassNotFoundException: 
org.apache.orc.PhysicalWriter when write bulkly into hive-2.1.1 orc table
 Key: FLINK-25529
 URL: https://issues.apache.org/jira/browse/FLINK-25529
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Hive
 Environment: hive 2.1.1

flink 1.12.4
Reporter: Yuan Zhu
 Attachments: lib.jpg

I tried to write data bulkly into hive-2.1.1 with orc format, and encountered 
java.lang.ClassNotFoundException: org.apache.orc.PhysicalWriter

 

Using bulk writer by setting table.exec.hive.fallback-mapred-writer = false;

 
{code:java}
SET 'table.sql-dialect'='hive';
create table orders(
    order_id int,
    order_date timestamp,
    customer_name string,
    price decimal(10,3),
    product_id int,
    order_status boolean
)partitioned by (dt string)
stored as orc;
 
SET 'table.sql-dialect'='default';

create table datagen_source (
order_id int,
order_date timestamp(9),
customer_name varchar,
price decimal(10,3),
product_id int,
order_status boolean
)with('connector' = 'datagen');

create catalog myhive with ('type' = 'hive', 'hive-conf-dir' = '/mnt/conf');
set table.exec.hive.fallback-mapred-writer = false;

insert into myhive.`default`.orders
/*+ OPTIONS(
    'sink.partition-commit.trigger'='process-time',
    'sink.partition-commit.policy.kind'='metastore,success-file',
    'sink.rolling-policy.file-size'='128MB',
    'sink.rolling-policy.rollover-interval'='10s',
    'sink.rolling-policy.check-interval'='10s',
    'auto-compaction'='true',
    'compaction.file-size'='1MB'    ) */
select * , date_format(now(),'-MM-dd') as dt from datagen_source;  {code}
[ERROR] Could not execute SQL statement. Reason:
java.lang.ClassNotFoundException: org.apache.orc.PhysicalWriter

 

My jars in lib dir listed in attachment.

In HiveTableSink#createStreamSink(line:270), createBulkWriterFactory if 
table.exec.hive.fallback-mapred-writer is false.

If table is orc, HiveShimV200#createOrcBulkWriterFactory will be invoked. 

OrcBulkWriterFactory depends on org.apache.orc.PhysicalWriter in orc-core, but 
flink-connector-hive excludes orc-core for conflicting with hive-exec.

 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: [DISCUSS] Looking for maintainers for Google PubSub connector or discuss next step

2022-01-05 Thread Till Rohrmann
Thanks a lot for helping us with the PubSub connector Ryan. This is highly
appreciated! I think going through the open PRs and open issues might be a
good first step.

Cheers,
Till

On Tue, Jan 4, 2022 at 5:42 PM Ryan Skraba 
wrote:

> Hello,
>
> I'm familiar with the Pub/Sub connectors from the Apache Beam project, but
> quite a bit less so with Flink.  This looks like a good learning
> opportunity, and I'd be interested in helping out here.
>
> If we decide to keep the connector, I can start taking a look at the next
> step: going through the existing PR, fixing conflicts with master.
>
> All my best, Ryan
>
> On Mon, Jan 3, 2022 at 3:41 PM Martijn Visser 
> wrote:
>
> > Hi everyone,
> >
> > We're looking for community members, who would like to maintain Flink's
> > Google PubSub connector [1] going forward. There are multiple improvement
> > tickets open and the original contributors are currently unable to work
> on
> > further improvements.
> >
> > An overview of some of the open tickets:
> >
> > * https://issues.apache.org/jira/browse/FLINK-20625 -> Refactor PubSub
> > Source to use new Source API (FLIP-27)
> > * https://issues.apache.org/jira/browse/FLINK-24298 -> Refactor PubSub
> > Sink
> > to use new Sink API (FLIP-143 / FLIP-171)
> > * https://issues.apache.org/jira/browse/FLINK-24299 -> Make PubSub
> > available as Source and Sink for Table/SQL users
> >
> > Next to these tickets, the connector only supports Java 8 and we can
> > improve on the tests for this connector.
> >
> > If you would like to take on this responsibility or can join this effort
> > in a supporting role, please reach out!
> >
> > If we can't find maintainers for this connector, what do you think we
> > should do? I would be in favour of dropping the connector from Flink. We
> > could also consider moving the connector, either to the new external
> > connector repository or Apache Bahir. I'm not sure if that would be
> > valuable, since at some point connector won't work in Flink (since it
> > doesn't use the target interfaces) and the source code can still be found
> > in Flink's git repo by looking back to previous versions.
> >
> > I'm looking forward to your thoughts.
> >
> > Best regards,
> >
> > Martijn Visser
> >
> > [1]
> >
> >
> https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/pubsub/
> >
>


Re: [VOTE] FLIP-191: Extend unified Sink interface to support small file compaction

2022-01-05 Thread Jing Ge
+1 (non-binding).

Thanks for driving!

Best regards
Jing

On Wed, Jan 5, 2022 at 4:13 AM Guowei Ma  wrote:

> +1(binding).
> Thank you for driving this
> Best,
> Guowei
>
>
> On Wed, Jan 5, 2022 at 5:15 AM Arvid Heise  wrote:
>
> > +1 (binding).
> >
> > Thanks for driving!
> >
> > On Tue, Jan 4, 2022 at 10:31 AM Yun Gao 
> > wrote:
> >
> > > +1 (binding).
> > >
> > > Very thanks for proposing the FLIP!
> > >
> > > Best,
> > > Yun
> > >
> > >
> > > --
> > > From:Martijn Visser 
> > > Send Time:2022 Jan. 4 (Tue.) 17:22
> > > To:dev 
> > > Subject:Re: [VOTE] FLIP-191: Extend unified Sink interface to support
> > > small file compaction
> > >
> > > +1 (non-binding). Thanks for driving the FLIP!
> > >
> > > Best regards,
> > >
> > > Martijn
> > >
> > > On Tue, 4 Jan 2022 at 10:21, Fabian Paul  wrote:
> > >
> > > > Hi everyone,
> > > >
> > > > I'd like to start a vote on FLIP-191: Extend unified Sink interface
> to
> > > > support small file compaction [1] that has been discussed in this
> > > > thread [2].
> > > >
> > > > The vote will be open for at least 72 hours unless there is an
> > objection
> > > or
> > > > not enough votes.
> > > >
> > > > Best,
> > > > Fabian
> > > >
> > > > [1]
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-191%3A+Extend+unified+Sink+interface+to+support+small+file+compaction
> > > > [2] https://lists.apache.org/thread/97kwy315t9r4j02l8v0wotkll4tngb3m
> > > >
> > >
> > >
> >
>


Re: [DISCUSS] Deprecate MapR FS

2022-01-05 Thread Martijn Visser
Hi everyone,

Thanks for your input. I've checked the MapR implementation and it has no
annotation at all. Given the circumstances that we thought that MapR was
already dropped, I would propose to immediately remove MapR in Flink 1.15
instead of first marking it as deprecated and removing it in Flink 1.16.

Please let me know what you think.

Best regards,

Martijn

On Thu, 9 Dec 2021 at 17:27, David Morávek  wrote:

> +1, agreed with Seth's reasoning. There has been no real activity in MapR
> FS module for years [1], so the eventual users should be good with using
> the jars from the older Flink versions for quite some time
>
> [1]
> https://github.com/apache/flink/commits/master/flink-filesystems/flink-mapr-fs
>
> Best,
> D.
>
> On Thu, Dec 9, 2021 at 4:28 PM Konstantin Knauf  wrote:
>
>> +1 (what Seth said)
>>
>> On Thu, Dec 9, 2021 at 4:15 PM Seth Wiesman  wrote:
>>
>> > +1
>> >
>> > I actually thought we had already dropped this FS. If anyone is still
>> > relying on it in production, the file system abstraction in Flink has
>> been
>> > incredibly stable over the years. They should be able to use the 1.14
>> MapR
>> > FS with later versions of Flink.
>> >
>> > Seth
>> >
>> > On Wed, Dec 8, 2021 at 10:03 AM Martijn Visser 
>> > wrote:
>> >
>> >> Hi all,
>> >>
>> >> Flink supports multiple file systems [1] which includes MapR FS. MapR
>> as
>> >> a company doesn't exist anymore since 2019, the technology and
>> intellectual
>> >> property has been sold to Hewlett Packard.
>> >>
>> >> I don't think that there's anyone who's using MapR anymore and
>> therefore
>> >> I think it would be good to deprecate this for Flink 1.15 and then
>> remove
>> >> it in Flink 1.16. Removing this from Flink will slightly shrink the
>> >> codebase and CI runtime.
>> >>
>> >> I'm also cross posting this to the User mailing list, in case there's
>> >> still anyone who's using MapR.
>> >>
>> >> Best regards,
>> >>
>> >> Martijn
>> >>
>> >> [1]
>> >>
>> https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/filesystems/overview/
>> >>
>> >
>>
>> --
>>
>> Konstantin Knauf
>>
>> https://twitter.com/snntrable
>>
>> https://github.com/knaufk
>>
>


[jira] [Created] (FLINK-25528) state processor api do not support increment checkpoint

2022-01-05 Thread Jira
刘方奇 created FLINK-25528:
---

 Summary: state processor api do not support increment checkpoint
 Key: FLINK-25528
 URL: https://issues.apache.org/jira/browse/FLINK-25528
 Project: Flink
  Issue Type: Improvement
  Components: API / State Processor, Runtime / State Backends
Reporter: 刘方奇


As the title, in the state-processor-api, we use the savepoint opition to 
snapshot stete defaultly in org.apache.flink.state.api.output.SnapshotUtils.
But in many cases, we maybe need to snapshot state incremently which have 
better performance than savepoint.

Shall we add the config to chose the checkpoint way just like flink stream 
backend?



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25527) Add StringIndexer in FlinkML

2022-01-05 Thread Zhipeng Zhang (Jira)
Zhipeng Zhang created FLINK-25527:
-

 Summary: Add StringIndexer in FlinkML
 Key: FLINK-25527
 URL: https://issues.apache.org/jira/browse/FLINK-25527
 Project: Flink
  Issue Type: New Feature
  Components: Library / Machine Learning
Reporter: Zhipeng Zhang


Add Transformer and Estimator for StringIndexer in FlinkML



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: [DISCUSS] FLIP-205: Support cache in DataStream for Batch Processing

2022-01-05 Thread Yun Gao
Hi Xuannan,

Very thanks for drafting the FLIP and initiating the discussion!

I have several issues, sorry if I have misunderstandings:

1. With the cached stream, when would the compile and job submission
happens? Does it happen on calling execute_and_cache() ? If so, could
we support the job with multiple concurrent cached stream like

DataStream a = ...
DataStream b = ...
a.cache()
b.cache()
// could we run a/b in a same job ? 

If not, perhaps we might have part of graphs that would not be executed? 

2. If the part of graphs using the cache partition is executed as a second
job, would the job be executed after its precedent jobs get finished?
Would the StreamExecutionEnviornment does this tracking? 

3. Do the execution would have the same compile result when running on per-job 
v.s. application / session mode ? Since for per-job mode, when executing the 
part of the graph that using the cached result, we might need to run the whole 
graph from the sources; but for application / session mode, it would be
compiled to a separate job reading from the cached result partitions. If the 
compile result is different, perhaps currently we could not get the execution 
mode when compiling ? 

4. For the part of graph using the cached result, do we support the stream 
case? Like we have a part of graph that have two sources, one source is a 
cached result partition and the other one is an unbounded job.

For remote shuffle service, It seems to me currently we do not have
complete process for them to support the cache ResultPartition, since
in JobMasterPartitionTrackerImpl we have not support prompt a result
partition via pluggable ShuffleMaster yet. But we should be able to further
complete this part.

Best,
Yun


--
From:Xuannan Su 
Send Time:2022 Jan. 5 (Wed.) 14:04
To:dev 
Subject:Re: [DISCUSS] FLIP-205: Support cache in DataStream for Batch Processing

Hi David,

We have a description in the FLIP about the case of TM failure without
the remote shuffle service. Basically, since the partitions are stored
at the TM, a TM failure requires recomputing the intermediate result.

If a Flink job uses the remote shuffle service, the partitions are
stored at the remote shuffle service. In this case, the failure of TM
will not cause any partition loss. Therefore, recomputing the
intermediate result is not required. In case of partition lost at the
remote shuffle service, even without a TM failure, the cached
intermediate result is not valid anymore, so the intermediate result
has to be recomputed.

To summarize, no matter where the partitions are stored, locally at TM
or remotely at remote shuffle service, recomputing is only required if
the consuming job finds some partitions lost.

I updated the FLIP to include the description of failover when using
remote shuffle service.

Best,
Xuannan


On Mon, Jan 3, 2022 at 4:17 PM David Morávek  wrote:
>
> One more question from my side, should we make sure this plays well with
> the remote shuffle service [1] in case of TM failure?
>
> [1] https://github.com/flink-extended/flink-remote-shuffle
>
> D.
>
> On Thu, Dec 30, 2021 at 11:59 AM Gen Luo  wrote:
>
> > Hi Xuannan,
> >
> > I found FLIP-188[1] that is aiming to introduce a built-in dynamic table
> > storage, which provides a unified changelog & table representation. Tables
> > stored there can be used in further ad-hoc queries. To my understanding,
> > it's quite like an implementation of caching in Table API, and the ad-hoc
> > queries are somehow like further steps in an interactive program.
> >
> > As you replied, caching at Table/SQL API is the next step, as a part of
> > interactive programming in Table API, which we all agree is the major
> > scenario. What do you think about the relation between it and FLIP-188?
> >
> > [1]
> >
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-188%3A+Introduce+Built-in+Dynamic+Table+Storage
> >
> >
> > On Wed, Dec 29, 2021 at 7:53 PM Xuannan Su  wrote:
> >
> > > Hi David,
> > >
> > > Thanks for sharing your thoughts.
> > >
> > > You are right that most people tend to use high-level API for
> > > interactive data exploration. Actually, there is
> > > the FLIP-36 [1] covering the cache API at Table/SQL API. As far as I
> > > know, it has been accepted but hasn’t been implemented. At the time
> > > when it is drafted, DataStream did not support Batch mode but Table
> > > API does.
> > >
> > > Now that the DataStream API does support batch processing, I think we
> > > can focus on supporting cache at DataStream first. It is still
> > > valuable for DataStream users and most of the work we do in this FLIP
> > > can be reused. So I want to limit the scope of this FLIP.
> > >
> > > After caching is supported at DataStream, we can continue from where
> > > FLIP-36 left off to support caching at Table/SQL API. We might have to
> > > re-vote FLIP-36 or draft a new FLIP. What do you think?
> > >
> > > Best,
> > > Xuannan

[jira] [Created] (FLINK-25526) Deprecate TableSinkFactory, TableSourceFactory and TableFormatFactory

2022-01-05 Thread Francesco Guardiani (Jira)
Francesco Guardiani created FLINK-25526:
---

 Summary: Deprecate TableSinkFactory, TableSourceFactory and 
TableFormatFactory
 Key: FLINK-25526
 URL: https://issues.apache.org/jira/browse/FLINK-25526
 Project: Flink
  Issue Type: Technical Debt
  Components: Table SQL / API
Reporter: Francesco Guardiani
 Fix For: 1.15.0


These factories are part of the old type system/sink & source stack and should 
not be used anymore, as users should work with 
DynamicTableSink/DynamicTableSource factories instead.

This task should deprecate {{TableSinkFactory}}, {{TableSourceFactory}} and 
{{TableFormatFactory}} and all the related types.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25525) flink-examples-table is not runnable in the IDE

2022-01-05 Thread Timo Walther (Jira)
Timo Walther created FLINK-25525:


 Summary: flink-examples-table is not runnable in the IDE
 Key: FLINK-25525
 URL: https://issues.apache.org/jira/browse/FLINK-25525
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Ecosystem
Reporter: Timo Walther


The following exception is thrown:

 

{code}

Exception in thread "main" org.apache.flink.table.api.ValidationException: 
Could not find any factories that implement 
'org.apache.flink.table.delegation.ExecutorFactory' in the classpath.
    at 
org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:453)
    at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.create(TableEnvironmentImpl.java:295)
    at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.create(TableEnvironmentImpl.java:266)
    at 
org.apache.flink.table.api.TableEnvironment.create(TableEnvironment.java:95)
    at 
org.apache.flink.table.examples.scala.basics.GettingStartedExample$.main(GettingStartedExample.scala:55)
    at 
org.apache.flink.table.examples.scala.basics.GettingStartedExample.main(GettingStartedExample.scala)

{code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25524) If enabled changelog, RocksDB incremental checkpoint would always be full

2022-01-05 Thread Yun Tang (Jira)
Yun Tang created FLINK-25524:


 Summary: If enabled changelog, RocksDB incremental checkpoint 
would always be full
 Key: FLINK-25524
 URL: https://issues.apache.org/jira/browse/FLINK-25524
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Checkpointing, Runtime / State Backends
Reporter: Yun Tang


Once changelog is enabled, RocksDB incremental checkpoint would only be 
executed during materialization. During this phase, it will leverage the 
{{materization id}} as the checkpoint id for RocksDB state backend's snapshot 
method.

However, current incremental checkpoint mechanism heavily depends on the 
checkpoint id. And {{SortedMap> uploadedStateIDs}} 
with checkpoint id as the key within {{RocksIncrementalSnapshotStrategy}} is 
the kernel for incremental checkpoint. Once we notify checkpoint complete of 
previous checkpoint, it will then remove the uploaded stateIds of that 
checkpoint, leading to we cannot get proper checkpoint information on the next 
RocksDBKeyedStateBackend#snapshot. That is to say, we will always upload all 
RocksDB artifacts.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25523) KafkaSourceITCase$KafkaSpecificTests.testTimestamp fails on AZP

2022-01-05 Thread Till Rohrmann (Jira)
Till Rohrmann created FLINK-25523:
-

 Summary: KafkaSourceITCase$KafkaSpecificTests.testTimestamp fails 
on AZP
 Key: FLINK-25523
 URL: https://issues.apache.org/jira/browse/FLINK-25523
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kafka
Affects Versions: 1.15.0
Reporter: Till Rohrmann
 Fix For: 1.15.0


The test {{KafkaSourceITCase$KafkaSpecificTests.testTimestamp}} fails on AZP 
with

{code}
2022-01-05T03:08:57.1647316Z java.util.concurrent.TimeoutException: The topic 
metadata failed to propagate to Kafka broker.
2022-01-05T03:08:57.1660635Zat 
org.apache.flink.core.testutils.CommonTestUtils.waitUtil(CommonTestUtils.java:214)
2022-01-05T03:08:57.1667856Zat 
org.apache.flink.core.testutils.CommonTestUtils.waitUtil(CommonTestUtils.java:230)
2022-01-05T03:08:57.1668778Zat 
org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl.createTestTopic(KafkaTestEnvironmentImpl.java:216)
2022-01-05T03:08:57.1670072Zat 
org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironment.createTestTopic(KafkaTestEnvironment.java:98)
2022-01-05T03:08:57.1671078Zat 
org.apache.flink.streaming.connectors.kafka.KafkaTestBase.createTestTopic(KafkaTestBase.java:216)
2022-01-05T03:08:57.1671942Zat 
org.apache.flink.connector.kafka.source.KafkaSourceITCase$KafkaSpecificTests.testTimestamp(KafkaSourceITCase.java:104)
2022-01-05T03:08:57.1672619Zat 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
2022-01-05T03:08:57.1673715Zat 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
2022-01-05T03:08:57.1675000Zat 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
2022-01-05T03:08:57.1675907Zat 
java.base/java.lang.reflect.Method.invoke(Method.java:566)
2022-01-05T03:08:57.1676587Zat 
org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:688)
2022-01-05T03:08:57.1677316Zat 
org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
2022-01-05T03:08:57.1678380Zat 
org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
2022-01-05T03:08:57.1679264Zat 
org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
2022-01-05T03:08:57.1680002Zat 
org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
2022-01-05T03:08:57.1680776Zat 
org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestTemplateMethod(TimeoutExtension.java:92)
2022-01-05T03:08:57.1681682Zat 
org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
2022-01-05T03:08:57.1682442Zat 
org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
2022-01-05T03:08:57.1683450Zat 
org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
2022-01-05T03:08:57.1685362Zat 
org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
2022-01-05T03:08:57.1686284Zat 
org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
2022-01-05T03:08:57.1687152Zat 
org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
2022-01-05T03:08:57.1687818Zat 
org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)
2022-01-05T03:08:57.1688479Zat 
org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)
2022-01-05T03:08:57.1689376Zat 
org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$6(TestMethodTestDescriptor.java:210)
2022-01-05T03:08:57.1690108Zat 
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
2022-01-05T03:08:57.1690825Zat 
org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:206)
2022-01-05T03:08:57.1691470Zat 
org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:131)
2022-01-05T03:08:57.1692151Zat 
org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:65)
2022-01-05T03:08:57.1693014Zat 
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:139)
2022-01-05T03:08:57.1693762Zat 
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
2022-01-05T03:08:57.1694461Zat 

[jira] [Created] (FLINK-25522) KafkaShuffleExactlyOnceITCase.testAssignedToPartitionFailureRecoveryProcessingTime

2022-01-05 Thread Till Rohrmann (Jira)
Till Rohrmann created FLINK-25522:
-

 Summary: 
KafkaShuffleExactlyOnceITCase.testAssignedToPartitionFailureRecoveryProcessingTime
 Key: FLINK-25522
 URL: https://issues.apache.org/jira/browse/FLINK-25522
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kafka
Affects Versions: 1.15.0
Reporter: Till Rohrmann
 Fix For: 1.15.0


The test 
{{KafkaShuffleExactlyOnceITCase.testAssignedToPartitionFailureRecoveryProcessingTime}}
 failed on AZP with:

{code}
2022-01-05T04:31:25.7208273Z java.util.concurrent.TimeoutException: The topic 
metadata failed to propagate to Kafka broker.
2022-01-05T04:31:25.7210543Zat 
org.apache.flink.core.testutils.CommonTestUtils.waitUtil(CommonTestUtils.java:214)
2022-01-05T04:31:25.7211289Zat 
org.apache.flink.core.testutils.CommonTestUtils.waitUtil(CommonTestUtils.java:230)
2022-01-05T04:31:25.7212025Zat 
org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl.createTestTopic(KafkaTestEnvironmentImpl.java:216)
2022-01-05T04:31:25.7212944Zat 
org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironment.createTestTopic(KafkaTestEnvironment.java:98)
2022-01-05T04:31:25.7213794Zat 
org.apache.flink.streaming.connectors.kafka.KafkaTestBase.createTestTopic(KafkaTestBase.java:216)
2022-01-05T04:31:25.7214854Zat 
org.apache.flink.streaming.connectors.kafka.shuffle.KafkaShuffleExactlyOnceITCase.testAssignedToPartitionFailureRecovery(KafkaShuffleExactlyOnceITCase.java:158)
2022-01-05T04:31:25.7215823Zat 
org.apache.flink.streaming.connectors.kafka.shuffle.KafkaShuffleExactlyOnceITCase.testAssignedToPartitionFailureRecoveryProcessingTime(KafkaShuffleExactlyOnceITCase.java:81)
2022-01-05T04:31:25.7216532Zat 
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
2022-01-05T04:31:25.7217307Zat 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
2022-01-05T04:31:25.7217917Zat 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
2022-01-05T04:31:25.7218437Zat 
java.lang.reflect.Method.invoke(Method.java:498)
2022-01-05T04:31:25.7218969Zat 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
2022-01-05T04:31:25.7219572Zat 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
2022-01-05T04:31:25.7220183Zat 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
2022-01-05T04:31:25.7220770Zat 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
2022-01-05T04:31:25.7221346Zat 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
2022-01-05T04:31:25.7221959Zat 
org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:299)
2022-01-05T04:31:25.7222603Zat 
org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:293)
2022-01-05T04:31:25.7223413Zat 
java.util.concurrent.FutureTask.run(FutureTask.java:266)
2022-01-05T04:31:25.7223871Zat java.lang.Thread.run(Thread.java:748)
2022-01-05T04:31:25.7321823Z java.util.concurrent.ExecutionException: 
org.apache.kafka.common.errors.TopicExistsException: Topic 
'partition_failure_recovery_ProcessingTime' already exists.
2022-01-05T04:31:25.7323411Zat 
org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
2022-01-05T04:31:25.7324069Zat 
org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
2022-01-05T04:31:25.7324696Zat 
org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
2022-01-05T04:31:25.7325309Zat 
org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
2022-01-05T04:31:25.7326077Zat 
org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl.createTestTopic(KafkaTestEnvironmentImpl.java:214)
2022-01-05T04:31:25.7326999Zat 
org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironment.createTestTopic(KafkaTestEnvironment.java:98)
2022-01-05T04:31:25.7327659Zat 
org.apache.flink.streaming.connectors.kafka.KafkaTestBase.createTestTopic(KafkaTestBase.java:216)
2022-01-05T04:31:25.7328418Zat 
org.apache.flink.streaming.connectors.kafka.shuffle.KafkaShuffleExactlyOnceITCase.testAssignedToPartitionFailureRecovery(KafkaShuffleExactlyOnceITCase.java:158)
2022-01-05T04:31:25.7329328Zat 
org.apache.flink.streaming.connectors.kafka.shuffle.KafkaShuffleExactlyOnceITCase.testAssignedToPartitionFailureRecoveryProcessingTime(KafkaShuffleExactlyOnceITCase.java:81)
2022-01-05T04:31:25.7330013Zat 
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
2022-01-05T04:31:25.7330507Zat 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

Re: [DISCUSS] Change some default config values of blocking shuffle

2022-01-05 Thread Yun Gao
Very thanks @Yingjie for completing the experiments! 

Also +1 for changing the default config values. From the experiments, 
Changing the default config values would largely increase the open box
experience of the flink batch, thus it seems worth changing from my side
even if it would cause some compatibility issue under some scenarios. In
addition, if we finally have to break compatibility, we might do it early to 
avoid affecting more users. 

Best,
Yun


--
From:刘建刚 
Send Time:2022 Jan. 4 (Tue.) 20:43
To:user-zh 
Cc:dev ; user 
Subject:Re: [DISCUSS] Change some default config values of blocking shuffle

Thanks for the experiment. +1 for the changes.

Yingjie Cao  于2022年1月4日周二 17:35写道:

> Hi all,
>
> After running some tests with the proposed default value (
> taskmanager.network.sort-shuffle.min-parallelism: 1,
> taskmanager.network.sort-shuffle.min-buffers: 512,
> taskmanager.memory.framework.off-heap.batch-shuffle.size: 64m,
> taskmanager.network.blocking-shuffle.compression.enabled: true), I'd to
> share some test results.
>
> 1. TPC-DS performance and stability test (I the TPC-DS benchmark using 512
> default parallelism and several different settings multiple times):
> 1) Stability:
> Compared to the current default values, the proposed default values can
> improve the TPC-DS query stability a lot. With the current default, there
> are many queries suffering from blocking shuffle relevant failures. With
> the proposed default values, only three queries fail because of the
> "Insufficient number of network buffers:" error. With 512 parallelism, the
> current default configuration will incur the same issue. Part of the reason
> is that the network buffer consumed by InputGate is  proportional to
> parallelism and can use 32M network memory by default and many tasks has
> several InputGate but we only has 128M network memory per TaskManager by
> default.
> 2) Performance:
> Compared to the current default values, the proposed default values can
> improve the TPC-DS query performance a lot. Except for those queries of
> small shuffle data amount which consume really short time, the proposed
> default values can bring 2-10 times performance gain. About the default
> value of taskmanager.network.sort-shuffle.min-parallelism  proposed by
> Yun, I tested both 1 and 128 and 1 is better for performance which is as
> expected.
>
> 2. Flink pre-commit stability test:
> I have run all Flink tests with the proposed default value for more than
> 20 times. The only instability is the "Insufficient number of network
> buffers:" error for batch several test cases. This error occurs because
> some tests have really limited network buffers and the proposed default
> config values may increase the network buffer consumption for cases. After
> increase the total network size for these test cases, the issue can be
> solved.
>
> Summary:
> 1. The proposed default value can improve both the performance and
> stability of Flink batch shuffle a lot.
> 2. Some batch jobs may fail because of the "Insufficient number of network
> buffers:" error for this default value change will increase the network
> buffer consumption a little for jobs less than 512 parallelism (for jobs
> more than 512 parallelism network buffer consumption will be reduced).
> 3. Setting taskmanager.network.sort-shuffle.min-parallelism to 1 has
> better performance than setting that to 128, both settings may incur the
> "Insufficient number of network buffers:" error.
> 4. After changing the default value and fixing several test cases, all
> Flink tests (except for those known unstable cases) can run stably.
>
> Personally, I am +1 to make the change. Though the change may cause some
> batch jobs fail because of the "Insufficient number of network buffers:",
> the possibility is small enough (only 3 TPC-DS out of about 100 queries
> fails, these queries will also fail with the current default configuration
> because it is the InputGate which takes the most network buffers and cost
> the error). Compared to this small regression, the performance and
> stability gains are big. Any feedbacks especially those from Flink batch
> users are highly appreciated.
>
> BTW, aside from the above tests, I also tries to tune some more config
> options to try to make the TPC-DS test faster. I copied these tuned config
> options from our daily TPC-DS settings. The results show that the optimized
> configuration can improve the TPC-DS performance about 30%. Though these
> settings may not the best, they really help compared to the default value.
> I attached some settings in this may, I guess some Flink batch users may be
> interested in this. Based on my limited knowledge, I guess that increasing
> the total TaskManager size and network memory size is important for
> performance, because more memory (managed and network) can make operators
> and shuffle faster.
>
> Best,
> Yingjie
>
>
>
> Yingjie Cao