[jira] [Created] (FLINK-21216) StreamPandasConversionTests Fails

2021-01-30 Thread Guowei Ma (Jira)
Guowei Ma created FLINK-21216:
-

 Summary: StreamPandasConversionTests Fails
 Key: FLINK-21216
 URL: https://issues.apache.org/jira/browse/FLINK-21216
 Project: Flink
  Issue Type: Bug
  Components: API / Python
Affects Versions: 1.13.0
Reporter: Guowei Ma


[https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=12699&view=logs&j=9cada3cb-c1d3-5621-16da-0f718fb86602&t=8d78fe4f-d658-5c70-12f8-4921589024c3]

 
=== FAILURES 
=== 
___ StreamPandasConversionTests.test_empty_to_pandas 
___ 
 
self =  
 
 def test_empty_to_pandas(self): 
> table = self.t_env.from_pandas(self.pdf, self.data_type) 
 
pyflink/table/tests/test_pandas_conversion.py:144: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
pyflink/table/table_environment.py:1462: in from_pandas 
 arrow_schema = pa.Schema.from_pandas(pdf, preserve_index=False) 
pyarrow/types.pxi:1315: in pyarrow.lib.Schema.from_pandas 
 ??? 
.tox/py37-cython/lib/python3.7/site-packages/pyarrow/pandas_compat.py:519: in 
dataframe_to_types 
 type_ = pa.lib._ndarray_to_arrow_type(values, type_) 
pyarrow/array.pxi:53: in pyarrow.lib._ndarray_to_arrow_type 
 ??? 
pyarrow/array.pxi:64: in pyarrow.lib._ndarray_to_type 
 ??? 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
 
> ??? 
E pyarrow.lib.ArrowTypeError: Did not pass numpy.dtype object 
 
pyarrow/error.pxi:108: ArrowTypeError 
_ StreamPandasConversionTests.test_from_pandas 
_ 
 
self =  
 
 def test_from_pandas(self): 
> table = self.t_env.from_pandas(self.pdf, self.data_type, 5) 
 
pyflink/table/tests/test_pandas_conversion.py:120: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21215) UnalignedCheckpointITCase.execute

2021-01-30 Thread Guowei Ma (Jira)
Guowei Ma created FLINK-21215:
-

 Summary: UnalignedCheckpointITCase.execute
 Key: FLINK-21215
 URL: https://issues.apache.org/jira/browse/FLINK-21215
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Checkpointing
Affects Versions: 1.13.0
Reporter: Guowei Ma


[https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=12691&view=logs&j=34f41360-6c0d-54d3-11a1-0292a2def1d9&t=2d56e022-1ace-542f-bf1a-b37dd63243f2&l=9146]
 
 
ERROR] Tests run: 11, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 101.136 
s <<< FAILURE! - in 
org.apache.flink.test.checkpointing.UnalignedCheckpointITCase 
[ERROR] execute[parallel pipeline with local channels, p = 
5](org.apache.flink.test.checkpointing.UnalignedCheckpointITCase) Time elapsed: 
2.263 s <<< ERROR! 
org.apache.flink.runtime.client.JobExecutionException: Job execution failed. 
 at 
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
 
 at 
org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:137)
 
 at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) 
 at 
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
 
 at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) 
 at 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) 
 at 
org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:238)
 
 at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
 
 at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
 
 at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) 
 at 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) 
 at 
org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:1046)
 
 at akka.dispatch.OnComplete.internal(Future.scala:264) 
 at akka.dispatch.OnComplete.internal(Future.scala:261) 
 at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191) 
 at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188) 
 at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36) 
 at 
org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:73)
 
 at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44) 
 at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252) 
 at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572) 
 at 
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)
 
 at 
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
 
 at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436) 
 at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435) 
 at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36) 
 at 
akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
 
 at 
akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
 
 at 
akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
 
 at 
akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
 
 at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) 
 
 
 
 
 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21214) FlinkKafkaProducerITCase.testScaleDownBeforeFirstCheckpoint Failed

2021-01-30 Thread Guowei Ma (Jira)
Guowei Ma created FLINK-21214:
-

 Summary: 
FlinkKafkaProducerITCase.testScaleDownBeforeFirstCheckpoint Failed
 Key: FLINK-21214
 URL: https://issues.apache.org/jira/browse/FLINK-21214
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kafka
Affects Versions: 1.12.0
Reporter: Guowei Ma


[https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=12687&view=logs&j=c5f0071e-1851-543e-9a45-9ac140befc32&t=1fb1a56f-e8b5-5a82-00a0-a2db7757b4f5]

 
[ERROR] 
testScaleDownBeforeFirstCheckpoint(org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerITCase)
 Time elapsed: 62.857 s <<< ERROR! 
org.apache.kafka.common.errors.TimeoutException: 
org.apache.kafka.common.errors.TimeoutException: Timeout expired after 
6milliseconds while awaiting InitProducerId 
Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout expired 
after 6milliseconds while awaiting InitProducerId 
 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21213) Elasticsearch (v5.3.3) sink end-to-end test' failed

2021-01-30 Thread Guowei Ma (Jira)
Guowei Ma created FLINK-21213:
-

 Summary: Elasticsearch (v5.3.3) sink end-to-end test' failed 
 Key: FLINK-21213
 URL: https://issues.apache.org/jira/browse/FLINK-21213
 Project: Flink
  Issue Type: Bug
  Components: Connectors / ElasticSearch
Affects Versions: 1.13.0
Reporter: Guowei Ma


[https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=12686&view=logs&j=68a897ab-3047-5660-245a-cce8f83859f6&t=d47e27f5-9721-5d5f-1cf3-62adbf3d115d]

 
Checking for non-empty .out files... 
No non-empty .out files. 
 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21194) TaskExecutorITCase.testJobRecoveryWithFailingTaskExecutor Fail

2021-01-28 Thread Guowei Ma (Jira)
Guowei Ma created FLINK-21194:
-

 Summary: TaskExecutorITCase.testJobRecoveryWithFailingTaskExecutor 
Fail
 Key: FLINK-21194
 URL: https://issues.apache.org/jira/browse/FLINK-21194
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Affects Versions: 1.13.0
Reporter: Guowei Ma


 
 
 
 
 at 
org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:345)
 
 at 
org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:126) 
 at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:418) 
Caused by: akka.pattern.AskTimeoutException: Ask timed out on 
[Actor[akka://flink/user/rpc/dispatcher_3#376758379]] after [1 ms]. Message 
of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical 
reason for `AskTimeoutException` is that the recipient actor didn't send a 
reply. 
 at akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635) 
 at akka.pattern.PromiseActorRef$$anonfun$2.apply(AskSupport.scala:635) 
 at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:648) 
 at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:205) 
 at 
scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
 
 at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109) 
 at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599) 
 at 
akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:328)
 
 at 
akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:279)
 
 at 
akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:283)
 
 at 
akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:235)
 
 at java.lang.Thread.run(Thread.java:748) 
 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=12622&view=logs&j=59c257d0-c525-593b-261d-e96a86f1926b&t=b93980e3-753f-5433-6a19-13747adae66a



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21175) OneInputStreamTaskTest.testWatermarkMetrics:914 expected:<1> but was:<-9223372036854775808>

2021-01-27 Thread Guowei Ma (Jira)
Guowei Ma created FLINK-21175:
-

 Summary:  OneInputStreamTaskTest.testWatermarkMetrics:914 
expected:<1> but was:<-9223372036854775808>
 Key: FLINK-21175
 URL: https://issues.apache.org/jira/browse/FLINK-21175
 Project: Flink
  Issue Type: Bug
  Components: API / Core
Affects Versions: 1.13.0
    Reporter: Guowei Ma


[https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=12572&view=logs&j=d89de3df-4600-5585-dadc-9bbc9a5e661c&t=66b5c59a-0094-561d-0e44-b149dfdd586d]

 

[ERROR] OneInputStreamTaskTest.testWatermarkMetrics:914 expected:<1> but 
was:<-9223372036854775808>



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21173) "Streaming SQL end-to-end test (Old planner)" e2e test failed

2021-01-27 Thread Guowei Ma (Jira)
Guowei Ma created FLINK-21173:
-

 Summary: "Streaming SQL end-to-end test (Old planner)" e2e test 
failed
 Key: FLINK-21173
 URL: https://issues.apache.org/jira/browse/FLINK-21173
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.12.0
Reporter: Guowei Ma


Jan 28 00:03:37 FAIL StreamSQL: Output hash mismatch. Got 
e7a02d99d5ac2a4ef4792b3b7e6f54bf, expected b29f14ed221a936211202ff65b51ee26. 

 

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=12573&view=logs&j=c88eea3b-64a0-564d-0031-9fdcd7b8abee&t=ff888d9b-cd34-53cc-d90f-3e446d355529



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21157) "Running HA per-job cluster (rocks, incremental) end-to-end test" gets stuck

2021-01-26 Thread Guowei Ma (Jira)
Guowei Ma created FLINK-21157:
-

 Summary: "Running HA per-job cluster (rocks, incremental) 
end-to-end test" gets stuck
 Key: FLINK-21157
 URL: https://issues.apache.org/jira/browse/FLINK-21157
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Affects Versions: 1.11.3
Reporter: Guowei Ma


https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=12533&view=logs&j=91bf6583-3fb2-592f-e4d4-d79d79c3230a&t=03dbd840-5430-533d-d1a7-05d0ebe03873



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21156) org.apache.flink.connector.jdbc.catalog.PostgresCatalogITCase Fail

2021-01-26 Thread Guowei Ma (Jira)
Guowei Ma created FLINK-21156:
-

 Summary: 
org.apache.flink.connector.jdbc.catalog.PostgresCatalogITCase Fail
 Key: FLINK-21156
 URL: https://issues.apache.org/jira/browse/FLINK-21156
 Project: Flink
  Issue Type: Test
  Components: Table SQL / Ecosystem
Affects Versions: 1.11.2
Reporter: Guowei Ma






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [ANNOUNCE] Apache Flink 1.12.1 released

2021-01-19 Thread Guowei Ma
Thanks Xintong's effort!
Best,
Guowei


On Tue, Jan 19, 2021 at 5:37 PM Yangze Guo  wrote:

> Thanks Xintong for the great work!
>
> Best,
> Yangze Guo
>
> On Tue, Jan 19, 2021 at 4:47 PM Till Rohrmann 
> wrote:
> >
> > Thanks a lot for driving this release Xintong. This was indeed a release
> with some obstacles to overcome and you did it very well!
> >
> > Cheers,
> > Till
> >
> > On Tue, Jan 19, 2021 at 5:59 AM Xingbo Huang  wrote:
> >>
> >> Thanks Xintong for the great work!
> >>
> >> Best,
> >> Xingbo
> >>
> >> Peter Huang  于2021年1月19日周二 下午12:51写道:
> >>
> >> > Thanks for the great effort to make this happen. It paves us from
> using
> >> > 1.12 soon.
> >> >
> >> > Best Regards
> >> > Peter Huang
> >> >
> >> > On Mon, Jan 18, 2021 at 8:16 PM Yang Wang 
> wrote:
> >> >
> >> > > Thanks Xintong for the great work as our release manager!
> >> > >
> >> > >
> >> > > Best,
> >> > > Yang
> >> > >
> >> > > Xintong Song  于2021年1月19日周二 上午11:53写道:
> >> > >
> >> > >> The Apache Flink community is very happy to announce the release of
> >> > >> Apache Flink 1.12.1, which is the first bugfix release for the
> Apache
> >> > Flink
> >> > >> 1.12 series.
> >> > >>
> >> > >> Apache Flink® is an open-source stream processing framework for
> >> > >> distributed, high-performing, always-available, and accurate data
> >> > streaming
> >> > >> applications.
> >> > >>
> >> > >> The release is available for download at:
> >> > >> https://flink.apache.org/downloads.html
> >> > >>
> >> > >> Please check out the release blog post for an overview of the
> >> > >> improvements for this bugfix release:
> >> > >> https://flink.apache.org/news/2021/01/19/release-1.12.1.html
> >> > >>
> >> > >> The full release notes are available in Jira:
> >> > >>
> >> > >>
> >> >
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12349459
> >> > >>
> >> > >> We would like to thank all contributors of the Apache Flink
> community
> >> > who
> >> > >> made this release possible!
> >> > >>
> >> > >> Regards,
> >> > >> Xintong
> >> > >>
> >> > >
> >> >
>


Re: [VOTE] FLIP-147: Support Checkpoint After Tasks Finished

2021-01-18 Thread Guowei Ma
+1 non-binding
Best,
Guowei


On Fri, Jan 15, 2021 at 10:56 PM Yun Gao 
wrote:

>
> Hi all,
>
> I would like to start the vote for FLIP-147[1], which propose to support
> checkpoints after
> tasks finished and is discussed in [2].
>
> The vote will last at least 72 hours (Jan 20th due to weekend), following
> the consensus
> voting process.
>
> thanks,
>  Yun
>
>
> [1] https://cwiki.apache.org/confluence/x/mw-ZCQ
> [2]
> https://lists.apache.org/thread.html/r2780b46267af6e98c7427cb98b36de8218f1499ae098044e1f24c527%40%3Cdev.flink.apache.org%3E


Re: [DISCUSS] Releasing Apache Flink 1.12.1

2020-12-17 Thread Guowei Ma
Thanks for driving this release Xintong.
I think https://issues.apache.org/jira/browse/FLINK-20652 should be
addressed.

Best,
Guowei


On Fri, Dec 18, 2020 at 11:53 AM Jingsong Li  wrote:

> Thanks for volunteering as our release manager Xintong. +1 for releasing
> Flink 1.12.1 soon.
>
> I think https://issues.apache.org/jira/browse/FLINK-20665 should be
> addressed, I marked it as a Blocker.
>
> Best,
> Jingsong
>
> On Fri, Dec 18, 2020 at 11:16 AM Yang Wang  wrote:
>
> > Hi David,
> >
> > I will take a look this ticket FLINK-20648 and try to get it resolved in
> > this release cycle.
> >
> > @Xintong Song 
> > One more Kubernetes HA related issue. We need to support setting service
> > account for TaskManager pod[1]. Even though we have a work around for
> this
> > issue, but it is not acceptable to always let the default service account
> > with enough permissions.
> >
> > [1]. https://issues.apache.org/jira/browse/FLINK-20664
> >
> > Best,
> > Yang
> >
> >
> > David Morávek  于2020年12月18日周五 上午12:47写道:
> >
> > > Hi, I think https://issues.apache.org/jira/browse/FLINK-20648 should
> be
> > > addressed, as Kubernetes HA was one of the main selling points of this
> > > release. WDYT?
> > >
> > > D.
> > >
> > > Sent from my iPhone
> > >
> > > > On 17. 12. 2020, at 13:54, Yun Tang  wrote:
> > > >
> > > > Thanks for driving this quick-fix release.
> > > > +1 for fixing the bug of RocksDB state-backend with reduce operators.
> > > >
> > > > Best
> > > > Yun Tang
> > > > 
> > > > From: Till Rohrmann 
> > > > Sent: Thursday, December 17, 2020 20:51
> > > > To: dev 
> > > > Subject: Re: [DISCUSS] Releasing Apache Flink 1.12.1
> > > >
> > > > Thanks for volunteering as our release manager Xintong. +1 for a
> swift
> > > bug
> > > > fix release.
> > > >
> > > > Cheers,
> > > > Till
> > > >
> > > >> On Thu, Dec 17, 2020 at 1:20 PM Xintong Song 
> > wrote:
> > > >>
> > > >> Hi devs,
> > > >>
> > > >> It's been one week since we announced Apache Flink 1.12.0, and there
> > are
> > > >> already many issues reported, some of which are quite critical.
> Thus,
> > I
> > > >> would like to start a discussion on releasing Flink 1.12.1 soon.
> > > >>
> > > >> I would like to volunteer for managing this release.
> > > >>
> > > >> I've noticed the following issues that need to be included in the
> new
> > > >> bugfix release.
> > > >>
> > > >>   - The entrypoint script for the official docker image does not
> meet
> > > the
> > > >>   standards of docker-library/official-images repo. [1]
> > > >>   - Streaming jobs with window-less reduce operation do now work
> with
> > > >>   RocksDB state backend. [2]
> > > >>   - @Stephan mentioned some Kafka fixes ([3] and maybe more) that he
> > > would
> > > >>   try to make into this release.
> > > >>   - @Kurt mentioned a batch workload instability related to managed
> > > memory
> > > >>   being released slowly, which his team is currently investigating
> and
> > > >> would
> > > >>   try to fix in this release.
> > > >>
> > > >> Apart from the issues above, please let us know in this thread if
> > there
> > > are
> > > >> any other fixes that we should try to include. I'll try to
> communicate
> > > with
> > > >> the issue owners and come up with a time estimation early next week.
> > > >>
> > > >> Thanks,
> > > >> Xintong
> > > >>
> > > >> [1] https://issues.apache.org/jira/browse/FLINK-20650
> > > >> [2] https://issues.apache.org/jira/browse/FLINK-20646
> > > >> [3] https://issues.apache.org/jira/browse/FLINK-20379
> > > >>
> > >
> >
>
>
> --
> Best, Jingsong Lee
>


[jira] [Created] (FLINK-20652) Improve the document for making the user could write a DataStream job that could be execute in the batch execution mode.

2020-12-17 Thread Guowei Ma (Jira)
Guowei Ma created FLINK-20652:
-

 Summary: Improve the document for making the user could write a 
DataStream job that could be execute in the batch execution mode.
 Key: FLINK-20652
 URL: https://issues.apache.org/jira/browse/FLINK-20652
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Affects Versions: 1.12.0
Reporter: Guowei Ma


There are some users, that are from the `DataSet` or new Flink user, want to 
write a `DataStream` job executed in the batch mode.

But they could not find a really source could do it.  Some guys ask me offline 
"How to change the `HadoopInputFormat` so that it could run in the batch 
execution mode?"

 

So I would propose to add some description about which source are support the 
batch execution mode in the "DataStream/Connector" section or 
"DataStream/execution" section.

 

Of course there might be some other way that could get this purpose.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [ANNOUNCE] Apache Flink 1.12.0 released

2020-12-13 Thread Guowei Ma
Congratulations to everyone who contributed!
Thanks for being our release managers Dian and Robert!

Best,
Guowei


On Mon, Dec 14, 2020 at 11:41 AM Yang Wang  wrote:

> Congratulations!
> Thanks Robert and Dian for being our release manager and everyone who
> contributed.
>
>
> Best,
> Yang
>
> Zhu Zhu  于2020年12月14日周一 上午11:08写道:
>
> > Congratulations!
> > Thanks Robert and Dian for being the release manager and thanks everyone
> > who helped with the release!
> >
> > Thanks,
> > Zhu
> >
> > Maximilian Michels  于2020年12月11日周五 下午11:47写道:
> >
> > > Great work! It's exciting to see batch execution in streaming
> pipelines.
> > >
> > > -Max
> > >
> > > On 11.12.20 13:34, Niels Basjes wrote:
> > > > Well done everyone!
> > > >
> > > > On Thu, Dec 10, 2020 at 1:17 PM Robert Metzger 
> > > wrote:
> > > >
> > > >> The Apache Flink community is very happy to announce the release of
> > > Apache
> > > >> Flink 1.12.0, which is the latest major release.
> > > >>
> > > >> Apache Flink® is an open-source stream processing framework for
> > > >> distributed, high-performing, always-available, and accurate data
> > > streaming
> > > >> applications.
> > > >>
> > > >> The release is available for download at:
> > > >> https://flink.apache.org/downloads.html
> > > >>
> > > >> Please check out the release blog post for an overview of the
> > > improvements
> > > >> for this bugfix release:
> > > >> https://flink.apache.org/news/2020/12/10/release-1.12.0.html
> > > >>
> > > >> The full release notes are available in Jira:
> > > >>
> > > >>
> > >
> >
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12348263
> > > >>
> > > >> We would like to thank all contributors of the Apache Flink
> community
> > > who
> > > >> made this release possible!
> > > >>
> > > >> Regards,
> > > >> Dian & Robert
> > > >>
> > > >>
> > > >
> > >
> >
>


Re: [VOTE] Release 1.12.0, release candidate #3

2020-12-06 Thread Guowei Ma
+1(non-binding)
- build from source
- build a docker image
- start a session from local k8s cluster
- submit a wordcount job in streaming mode.
- submit a wordcount job in batch mode.
Best,
Guowei


On Sat, Dec 5, 2020 at 3:13 PM Zhu Zhu  wrote:

> +1 (binding)
>
> - verified signature and checksum
> - built from source
> - run testing jobs on yarn with manually triggered failures. checked logs
> and WebUI of those jobs
>   * DataStream job (paralelism=1000) with multiple disjoint pipelined
> regions
>   * DataSet job (paralelism=1000) with all edges blocking
>
> Thanks,
> Zhu
>
> Till Rohrmann  于2020年12月4日周五 下午11:45写道:
>
> > +1 (binding)
> >
> > * Verified the checksums
> > * Ran RC on Minikube cluster
> > ** Session mode
> > ** Application mode
> > * Built Flink from sources
> >
> > Cheers,
> > Till
> >
> > On Fri, Dec 4, 2020 at 2:15 PM Wei Zhong  wrote:
> >
> > > +1 (non-binding)
> > >
> > > - verified checksums and signatures
> > > - build Flink with Scala 2.11
> > > - pip install pyflink on Windows python 3.7
> > > - run a python job with udfs on Windows
> > > - pyflink shell works well on local mode and remote mode
> > >
> > > Best,
> > > Wei
> > >
> > > > 在 2020年12月4日,17:21,Yang Wang  写道:
> > > >
> > > > +1 (non-binding)
> > > >
> > > > * Build from source
> > > > * Deploy Flink cluster in following deployments with HA
> > enabled(ZooKeeper
> > > > and K8s), including kill JobManager and check failover
> > > >  * Native K8s Session
> > > >  * Native K8s Application
> > > >  * Yarn Session
> > > >  * Yarn Per-Job
> > > >  * Yarn Application
> > > > * Check webUI and logs in different deployments especially via
> `kubectl
> > > > logs` in K8s
> > > >
> > > > Best,
> > > > Yang
> > > >
> > > > Xintong Song  于2020年12月4日周五 下午3:00写道:
> > > >
> > > >> +1 (non-binding)
> > > >>
> > > >>   - Verified checksums and signatures
> > > >>   - No binaries found in source archive
> > > >>   - Build from source
> > > >>   - Tried a couple of example jobs in various deployment mode
> > > >>  - Local
> > > >>  - Standalone
> > > >>  - Native Kubernetes Application
> > > >>  - Native Kubernetes Session
> > > >>  - Yarn Job
> > > >>  - Yarn Session
> > > >>   - Changing memory configurations, things work as expected
> > > >>   - UI looks good
> > > >>   - Logs look good
> > > >>
> > > >>
> > > >>
> > > >> Thank you~
> > > >>
> > > >> Xintong Song
> > > >>
> > > >>
> > > >>
> > > >> On Thu, Dec 3, 2020 at 9:18 PM Rui Li 
> wrote:
> > > >>
> > > >>> +1 (non-binding)
> > > >>>
> > > >>> Built from source and verified hive connector tests for different
> > hive
> > > >>> versions.
> > > >>> Setup a cluster to connect to a real hive warehouse and run some
> > > queries
> > > >>> successfully.
> > > >>>
> > > >>> On Thu, Dec 3, 2020 at 8:44 PM Xingbo Huang 
> > > wrote:
> > > >>>
> > >  +1 (non-binding)
> > > 
> > >  Checks:
> > >  1. verified checksums and signatures
> > >  2. build Flink with Scala 2.11
> > >  3. pip install pyflink in MacOS/CentOS under py35,py36,py37,py38
> > >  4. test Pandas UDAF/General UDAF/Python DataStream MapFunction
> > >  5. start standalone cluster and submit a python udf job.
> > >  6. verified NOTICE/LICENSE files of some regular modules
> > > 
> > >  I observed that the NOTICE file of flink-sql-connector-hbase-2.2
> > lists
> > > >> 3
> > >  dependencies that are not bundled in:
> > >  commons-lang:commons-lang:2.6
> > >  org.apache.hbase:hbase-hadoop-compat:2.2.3
> > >  org.apache.hbase:hbase-hadoop2-compat:2.2.3
> > > 
> > >  I guess listing more than dependencies with apache licensed
> > shouldn't
> > > >> be
> > > >>> a
> > >  blocker issue. I have opened a PR[1] to fix it.
> > > 
> > >  [1] https://github.com/apache/flink/pull/14299
> > > 
> > >  Best,
> > >  Xingbo
> > > 
> > >  Robert Metzger  于2020年12月3日周四 下午5:36写道:
> > > 
> > > > There's now a pull request for the announcement blog post, please
> > > >> help
> > > > checking it: https://github.com/apache/flink-web/pull/397
> > > >
> > > > On Thu, Dec 3, 2020 at 9:03 AM Robert Metzger <
> rmetz...@apache.org
> > >
> > >  wrote:
> > > >
> > > >> +1 (binding)
> > > >>
> > > >>
> > > >> Checks:
> > > >> - checksums seem correct
> > > >> - source archive code compiles
> > > >> - Compiled a test job against the staging repository
> > > >> - launched a standalone cluster, ran some test jobs against it
> > > >> - quickstart contains correct version
> > > >> - regular jars contain correct NOTICE file
> > > >> - Looked a bit over the output of
> > > >> git diff release-1.11.2...release-1.12 --  "**/pom.xml"
> > > >>
> > > >>
> > > >>
> > > >> I noticed that at least one more jar file contains an invalid
> > > >> LICENSE
> > > > file
> > > >> in it's root. This has already been the case with Flink 1.11,
> and
> > > >>> 

[jira] [Created] (FLINK-20339) `FileWriter` support to load StreamingFileSink's state.

2020-11-24 Thread Guowei Ma (Jira)
Guowei Ma created FLINK-20339:
-

 Summary: `FileWriter` support to load StreamingFileSink's state.
 Key: FLINK-20339
 URL: https://issues.apache.org/jira/browse/FLINK-20339
 Project: Flink
  Issue Type: Sub-task
  Components: API / DataStream
Affects Versions: 1.12.0
Reporter: Guowei Ma






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20338) Make the SinkWriter load previous sink's state.

2020-11-24 Thread Guowei Ma (Jira)
Guowei Ma created FLINK-20338:
-

 Summary: Make the SinkWriter load previous sink's state.
 Key: FLINK-20338
 URL: https://issues.apache.org/jira/browse/FLINK-20338
 Project: Flink
  Issue Type: Sub-task
  Components: API / DataStream
Affects Versions: 1.12.0
Reporter: Guowei Ma






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20337) Make migrate `StreamingFileSink` to `FileSink` possible

2020-11-24 Thread Guowei Ma (Jira)
Guowei Ma created FLINK-20337:
-

 Summary: Make migrate `StreamingFileSink` to `FileSink` possible
 Key: FLINK-20337
 URL: https://issues.apache.org/jira/browse/FLINK-20337
 Project: Flink
  Issue Type: Improvement
  Components: API / DataStream
Affects Versions: 1.12.0
Reporter: Guowei Ma


Flink-1.12 introduces the `FileSink` in FLINK-19510, which can guarantee the 
exactly once semantics both in the streaming and batch execution mode.

We need to figure out how to migrate from `StreamingFileSink` to `FileSink` for 
the user who uses the `StreamingFileSink` currently.

 

The pr wants provide a way that make it possible.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20297) Make `SerializerTestBase::getTestData` return List

2020-11-23 Thread Guowei Ma (Jira)
Guowei Ma created FLINK-20297:
-

 Summary: Make `SerializerTestBase::getTestData` return List
 Key: FLINK-20297
 URL: https://issues.apache.org/jira/browse/FLINK-20297
 Project: Flink
  Issue Type: Improvement
  Components: API / Type Serialization System
Reporter: Guowei Ma


Currently `SerializerTestBase::getTestData` return T[], which can not be 
override by the Scala. It means that developer could not add scala serializer 
test based on `SerializerTestBase`

So I would propose to change the `SerializerTestBase::getTestData` to return 
List



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20225) Break link in the document

2020-11-18 Thread Guowei Ma (Jira)
Guowei Ma created FLINK-20225:
-

 Summary: Break link in the document
 Key: FLINK-20225
 URL: https://issues.apache.org/jira/browse/FLINK-20225
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Affects Versions: 1.12.0
Reporter: Guowei Ma


In the 
[doc|https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/kubernetes.html#start-a-job-cluster]
 there is a broken link:

A _Flink Job cluster_ is a dedicated cluster which runs a single job. You can 
find more details 
[here|https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/kubernetes.html#start-a-job-cluster].



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20215) Keep the "Access the Flink UI " document same in “Kubernetes Setup” and "Native Kubernetes Setup Beta"

2020-11-18 Thread Guowei Ma (Jira)
Guowei Ma created FLINK-20215:
-

 Summary: Keep the "Access the Flink UI " document same in 
“Kubernetes Setup” and "Native Kubernetes Setup Beta"
 Key: FLINK-20215
 URL: https://issues.apache.org/jira/browse/FLINK-20215
 Project: Flink
  Issue Type: Improvement
  Components: Deployment / Kubernetes
Affects Versions: 1.12.0
        Reporter: Guowei Ma


Both the 
"https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/native_kubernetes.html";
 and 
"https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/native_kubernetes.html#accessing-job-manager-ui";
 have section about "how to access the Flink UI".

But the description is a little different from each other, I think maybe we 
could make them same.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20214) Unnecessary warning log when starting a k8s session cluster

2020-11-18 Thread Guowei Ma (Jira)
Guowei Ma created FLINK-20214:
-

 Summary: Unnecessary warning log when starting a k8s session 
cluster
 Key: FLINK-20214
 URL: https://issues.apache.org/jira/browse/FLINK-20214
 Project: Flink
  Issue Type: Improvement
  Components: Deployment / Kubernetes
Affects Versions: 1.12.0
Reporter: Guowei Ma


2020-11-18 17:46:36,727 WARN 
org.apache.flink.kubernetes.kubeclient.decorators.HadoopConfMountDecorator [] - 
Found 0 files in directory null/etc/hadoop, skip to mount the Hadoop 
Configuration ConfigMap.
2020-11-18 17:46:36,727 WARN 
org.apache.flink.kubernetes.kubeclient.decorators.HadoopConfMountDecorator [] - 
Found 0 files in directory null/etc/hadoop, skip to create the Hadoop 
Configuration ConfigMap.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20211) Can not get the JobManager web ip according to the document

2020-11-18 Thread Guowei Ma (Jira)
Guowei Ma created FLINK-20211:
-

 Summary: Can not get the JobManager web ip according to the 
document
 Key: FLINK-20211
 URL: https://issues.apache.org/jira/browse/FLINK-20211
 Project: Flink
  Issue Type: Improvement
  Components: Deployment / Kubernetes
Affects Versions: 1.12.0
Reporter: Guowei Ma


According to 
[https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/native_kubernetes.html#accessing-job-manager-ui]
 's LoadBalancer section I use the following cmd

kubectl get services/cluster-id

But I could not get the EXTERNAL-IP it always non

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20206) Failed to start the session, but there is no clear prompt.

2020-11-17 Thread Guowei Ma (Jira)
Guowei Ma created FLINK-20206:
-

 Summary: Failed to start the session, but there is no clear prompt.
 Key: FLINK-20206
 URL: https://issues.apache.org/jira/browse/FLINK-20206
 Project: Flink
  Issue Type: Improvement
  Components: Deployment / Kubernetes
Affects Versions: 1.12.0
Reporter: Guowei Ma
 Attachments: image-2020-11-18-15-12-13-530.png

Use ./bin/kubernetes-session.sh to start a k8s session clustter. The log showes 
the session cluster successfully start but it not.

Personally I prefer the yarn-session way, which could make me have a clear 
expectation.

So I would like to propose that Flink could give more detail information about 
whether session cluster create success or not.

!image-2020-11-18-15-12-13-530.png!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20203) Could not find any document about how to build a Flink image from local build.

2020-11-17 Thread Guowei Ma (Jira)
Guowei Ma created FLINK-20203:
-

 Summary: Could not find any document about how to build a Flink 
image from local build.
 Key: FLINK-20203
 URL: https://issues.apache.org/jira/browse/FLINK-20203
 Project: Flink
  Issue Type: Improvement
Affects Versions: 1.12.0
Reporter: Guowei Ma


If user wants to use or try some feature that does not include in the 
"official" Flink image the user might need to build a docker image based on his 
local build. But there is such 
document([https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/docker.html)]

So I would like to propose that we might need to introduce some documentation 
about how to build the image from local build.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20031) Keep the uid of SinkWriter same as the SinkTransformation

2020-11-06 Thread Guowei Ma (Jira)
Guowei Ma created FLINK-20031:
-

 Summary: Keep the uid of SinkWriter same as the SinkTransformation
 Key: FLINK-20031
 URL: https://issues.apache.org/jira/browse/FLINK-20031
 Project: Flink
  Issue Type: Sub-task
  Components: API / DataStream
Reporter: Guowei Ma


In this case that we want to migrate the StreamingFileSink to the new sink api 
we might need to let user set the SinkWriter's uid same as the 
StreamingFileSink's. So that SinkWriter operator has the opportunity to reuse 
the old state. (This is just a option.)

 

For this we need to let SinkWriter operator's uid is the same as the 
SinkTransformation.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20007) SinkTransformationTranslator fail to handle the PartitionTransformation

2020-11-05 Thread Guowei Ma (Jira)
Guowei Ma created FLINK-20007:
-

 Summary: SinkTransformationTranslator fail to handle the 
PartitionTransformation
 Key: FLINK-20007
 URL: https://issues.apache.org/jira/browse/FLINK-20007
 Project: Flink
  Issue Type: Sub-task
  Components: API / DataStream
Reporter: Guowei Ma


In current version `SinkTransformationTranslator` connects the `SinkWriter` 
with a `PartitionerTransformation` if the input transformation of 
`SinkTransformation` is `PartitionTransformation`. This would lead to 
`NullPointExcetion`.

 

Actually `SinkTransformationTranslator` should connect the `Writer` to the real 
upstream node if input of the `SinkTransformation` is 
`PartitionTransformation`.  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19963) Let the `SinkWriter` support using the `TimerService`

2020-11-04 Thread Guowei Ma (Jira)
Guowei Ma created FLINK-19963:
-

 Summary: Let the `SinkWriter` support using the `TimerService`
 Key: FLINK-19963
 URL: https://issues.apache.org/jira/browse/FLINK-19963
 Project: Flink
  Issue Type: Sub-task
  Components: API / DataStream
Reporter: Guowei Ma


Some `Sink` needs to register `TimeService`, for example `StreamingFileWriter`

So this pr exposes the `TimeService` to the `SinkWriter`.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19958) Unified exception signature in Sink API

2020-11-03 Thread Guowei Ma (Jira)
Guowei Ma created FLINK-19958:
-

 Summary: Unified exception signature in Sink API
 Key: FLINK-19958
 URL: https://issues.apache.org/jira/browse/FLINK-19958
 Project: Flink
  Issue Type: Sub-task
  Components: API / DataStream
Reporter: Guowei Ma


In the current Sink API some method does not throw any exception, which should 
throw intuitive for example `SinkWriter::write`. Some method throw the normal 
`Exception`, which might be too general.

 

So in this pr we want to change all the methods that needed throw exception 
with IOException.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [ANNOUNCE] New Apache Flink Committer - Congxian Qiu

2020-11-03 Thread Guowei Ma
Congratulations! Congxian

Best,
Guowei


On Wed, Nov 4, 2020 at 10:36 AM godfrey he  wrote:

> Congratulations! Congxian
>
> Best,
> Godfrey
>
> Fabian Hueske  于2020年11月2日周一 下午7:00写道:
>
> > Congrats Congxian!
> >
> > Cheers, Fabian
> >
> > Am Mo., 2. Nov. 2020 um 10:33 Uhr schrieb Yang Wang <
> danrtsey...@gmail.com
> > >:
> >
> > > Congratulations Congxian!
> > >
> > > Best,
> > > Yang
> > >
> > > Zhu Zhu  于2020年11月2日周一 下午5:14写道:
> > >
> > > > Congrats Congxian!
> > > >
> > > > Thanks,
> > > > Zhu
> > > >
> > > > Pengcheng Liu  于2020年11月2日周一 下午5:01写道:
> > > >
> > > > > Congratulations Congxian!
> > > > >
> > > > > Matthias Pohl  于2020年11月2日周一 下午3:57写道:
> > > > >
> > > > > > Yup, congratulations Congxian!
> > > > > >
> > > > > > On Mon, Nov 2, 2020 at 8:46 AM Danny Chan 
> > > > wrote:
> > > > > >
> > > > > > > Congrats, Doctor Qiu! Well deserved!
> > > > > > >
> > > > > > > Congxian Qiu  于2020年10月31日周六 下午9:45写道:
> > > > > > >
> > > > > > > > Thanks all for the support. It's a great honor for me.
> > > > > > > >
> > > > > > > > Best,
> > > > > > > > Congxian
> > > > > > > >
> > > > > > > >
> > > > > > > > Paul Lam  于2020年10月30日周五 下午3:18写道:
> > > > > > > >
> > > > > > > > > Congrats, Congxian! Well deserved!
> > > > > > > > >
> > > > > > > > > Best,
> > > > > > > > > Paul Lam
> > > > > > > > >
> > > > > > > > > > 2020年10月30日 15:12,Zhijiang  > > > .INVALID>
> > > > > > 写道:
> > > > > > > > > >
> > > > > > > > > > Congrats, Congxian!
> > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>


[jira] [Created] (FLINK-19936) Make SinkITCase more stable

2020-11-02 Thread Guowei Ma (Jira)
Guowei Ma created FLINK-19936:
-

 Summary: Make SinkITCase more stable
 Key: FLINK-19936
 URL: https://issues.apache.org/jira/browse/FLINK-19936
 Project: Flink
  Issue Type: Sub-task
  Components: API / DataStream
Reporter: Guowei Ma


In the streaming execution mode there are two uncertain things:
 # The order of receiving the checkpoint complete.(`Committer` receive first or 
`GlobalCommitter` receive first)
 # Whether the operator can accept the last checkpoint

These lead to the `SinkITCase` unstable. 

This pr resolves the `SinkITCase`s unstable by these two uncertain stuff.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [VOTE] Remove flink-connector-filesystem module.

2020-10-30 Thread Guowei Ma
+1
Best,
Guowei


On Fri, Oct 30, 2020 at 6:02 PM Aljoscha Krettek 
wrote:

> +1
>
> Aljoscha
>
> On 29.10.20 09:18, Kostas Kloudas wrote:
> > Hi all,
> >
> > Following the discussion in [1], I would like to start a vote on
> > removing the flink-connector-filesystem module which includes the
> > BucketingSink.
> >
> > The vote will be open till November 3rd (72h, excluding the weekend)
> > unless there is an objection or not enough votes.
> >
> > Cheers,
> > Kostas
> >
> > [1]
> https://lists.apache.org/thread.html/re24ceedc02402ac9a6ce1e07b690852320a265b081f416ebac543aaf%40%3Cuser.flink.apache.org%3E
> >
>
>


[jira] [Created] (FLINK-19841) Rename GlobalStreamingCommitterOperator to StreamingGlobalCommitterOperator

2020-10-27 Thread Guowei Ma (Jira)
Guowei Ma created FLINK-19841:
-

 Summary: Rename GlobalStreamingCommitterOperator to 
StreamingGlobalCommitterOperator
 Key: FLINK-19841
 URL: https://issues.apache.org/jira/browse/FLINK-19841
 Project: Flink
  Issue Type: Sub-task
  Components: API / DataStream
Reporter: Guowei Ma






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19836) Serialize the committable by the serializer provided by the user during network shuffle

2020-10-27 Thread Guowei Ma (Jira)
Guowei Ma created FLINK-19836:
-

 Summary: Serialize the committable by the serializer provided by 
the user during network shuffle
 Key: FLINK-19836
 URL: https://issues.apache.org/jira/browse/FLINK-19836
 Project: Flink
  Issue Type: Sub-task
  Components: API / DataStream
Reporter: Guowei Ma






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19834) Make the TestSink reusable in all the sink related tests.

2020-10-27 Thread Guowei Ma (Jira)
Guowei Ma created FLINK-19834:
-

 Summary: Make the TestSink reusable in all the sink related tests.
 Key: FLINK-19834
 URL: https://issues.apache.org/jira/browse/FLINK-19834
 Project: Flink
  Issue Type: Sub-task
  Components: API / DataStream
Reporter: Guowei Ma






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19760) Make the `GlobalCommitter` a standalone interface that does not extend the `Committer`

2020-10-21 Thread Guowei Ma (Jira)
Guowei Ma created FLINK-19760:
-

 Summary: Make the `GlobalCommitter` a standalone interface that 
does not extend the `Committer`
 Key: FLINK-19760
 URL: https://issues.apache.org/jira/browse/FLINK-19760
 Project: Flink
  Issue Type: Sub-task
Reporter: Guowei Ma






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19738) Remove the `getCommitter` from the `AbstractStreamingCommitterOperator`

2020-10-20 Thread Guowei Ma (Jira)
Guowei Ma created FLINK-19738:
-

 Summary: Remove the `getCommitter` from the 
`AbstractStreamingCommitterOperator`
 Key: FLINK-19738
 URL: https://issues.apache.org/jira/browse/FLINK-19738
 Project: Flink
  Issue Type: Sub-task
  Components: API / DataStream
Reporter: Guowei Ma






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19736) Implement the `SinkTransformation`

2020-10-20 Thread Guowei Ma (Jira)
Guowei Ma created FLINK-19736:
-

 Summary: Implement the `SinkTransformation`
 Key: FLINK-19736
 URL: https://issues.apache.org/jira/browse/FLINK-19736
 Project: Flink
  Issue Type: Sub-task
  Components: API / DataStream
Reporter: Guowei Ma






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19697) Make the streaming committer retry-able

2020-10-18 Thread Guowei Ma (Jira)
Guowei Ma created FLINK-19697:
-

 Summary: Make the streaming committer retry-able
 Key: FLINK-19697
 URL: https://issues.apache.org/jira/browse/FLINK-19697
 Project: Flink
  Issue Type: Sub-task
  Components: API / DataStream
Reporter: Guowei Ma






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19696) Implement Batch committer for the new API

2020-10-18 Thread Guowei Ma (Jira)
Guowei Ma created FLINK-19696:
-

 Summary: Implement Batch committer for the new API
 Key: FLINK-19696
 URL: https://issues.apache.org/jira/browse/FLINK-19696
 Project: Flink
  Issue Type: Sub-task
  Components: API / DataStream
Reporter: Guowei Ma






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19586) Implement the stream committer operator

2020-10-12 Thread Guowei Ma (Jira)
Guowei Ma created FLINK-19586:
-

 Summary: Implement the stream committer operator
 Key: FLINK-19586
 URL: https://issues.apache.org/jira/browse/FLINK-19586
 Project: Flink
  Issue Type: Sub-task
  Components: API / DataStream
Reporter: Guowei Ma






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19531) Implement the `WriterOperator`

2020-10-07 Thread Guowei Ma (Jira)
Guowei Ma created FLINK-19531:
-

 Summary: Implement the `WriterOperator`
 Key: FLINK-19531
 URL: https://issues.apache.org/jira/browse/FLINK-19531
 Project: Flink
  Issue Type: Sub-task
  Components: API / DataStream
Reporter: Guowei Ma






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19512) Introduce the new sink api

2020-10-06 Thread Guowei Ma (Jira)
Guowei Ma created FLINK-19512:
-

 Summary: Introduce the new sink api
 Key: FLINK-19512
 URL: https://issues.apache.org/jira/browse/FLINK-19512
 Project: Flink
  Issue Type: Sub-task
  Components: API / DataStream
Reporter: Guowei Ma






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19511) Rename the `SinkTransformation` to `LegacySinkTransformation`

2020-10-06 Thread Guowei Ma (Jira)
Guowei Ma created FLINK-19511:
-

 Summary: Rename the `SinkTransformation` to 
`LegacySinkTransformation`
 Key: FLINK-19511
 URL: https://issues.apache.org/jira/browse/FLINK-19511
 Project: Flink
  Issue Type: Sub-task
  Components: API / DataStream
Reporter: Guowei Ma






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19510) Introduce new unified sink api

2020-10-06 Thread Guowei Ma (Jira)
Guowei Ma created FLINK-19510:
-

 Summary: Introduce new unified sink api
 Key: FLINK-19510
 URL: https://issues.apache.org/jira/browse/FLINK-19510
 Project: Flink
  Issue Type: New Feature
  Components: API / DataStream
Reporter: Guowei Ma
 Fix For: 1.12.0


As described in FLIP-143: 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API 
we want to introduce a new unified sink api.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [VOTE] FLIP-143: Unified Sink API

2020-10-05 Thread Guowei Ma
I have to correct the vote result as following:

There were 5 votes, 3 of which are binding:

- Aljoscha Krettek (binding)
- Steven Wu
- Kostas Kloudas (binding)
- Jingsong Li (binding)
- Jiangang Liu

Changes still have been accepted.

Sorry to disturb everyone!

Best,
Guowei


On Wed, Sep 30, 2020 at 10:11 AM Guowei Ma  wrote:

> Hi all,
>
> The voting time for FLIP-143 [1] has passed. I'm closing the vote now.
>
> There were 5 votes, 4 of which are binding:
>
> - Aljoscha Krettek (binding)
> - Steven Wu
> - Kostas Kloudas (binding)
> - Jingsong Li (binding)
> - Jiangang Liu (binding)
>
> There were no -1 votes.
>
> Thus, changes have been accepted. I'll update the FLIP doc accordingly.
>
> There is still a discussion about the `CommitResult/GlobaCommitter
> interface, which we could do more discussion at the implementation phase.
>
> Thanks everyone for participating and discussing
>
> [1] https://cwiki.apache.org/confluence/x/KEJ4CQ
>
> Best,
> Guowei
>
>
> On Tue, Sep 29, 2020 at 6:32 PM 刘建刚  wrote:
>
>> +1 (binding)
>>
>> Best,
>> Liu Jiangang
>>
>> Jingsong Li  于2020年9月29日周二 下午1:36写道:
>>
>> > +1 (binding)
>> >
>> > Best,
>> > Jingsong
>> >
>> > On Mon, Sep 28, 2020 at 3:21 AM Kostas Kloudas 
>> wrote:
>> >
>> > > +1 (binding)
>> > >
>> > > @Steven Wu I think there will be opportunities to fine tune the API
>> > > during the implementation.
>> > >
>> > > Cheers,
>> > > Kostas
>> > >
>> > > On Sun, Sep 27, 2020 at 7:56 PM Steven Wu 
>> wrote:
>> > > >
>> > > > +1 (non-binding)
>> > > >
>> > > > Although I would love to continue the discussion for tweaking the
>> > > > CommitResult/GlobaCommitter interface maybe during the
>> implementation
>> > > phase.
>> > > >
>> > > > On Fri, Sep 25, 2020 at 5:35 AM Aljoscha Krettek <
>> aljos...@apache.org>
>> > > > wrote:
>> > > >
>> > > > > +1 (binding)
>> > > > >
>> > > > > Aljoscha
>> > > > >
>> > > > > On 25.09.20 14:26, Guowei Ma wrote:
>> > > > > >  From the discussion[1] we could find that FLIP focuses on
>> > providing
>> > > an
>> > > > > > unified transactional sink API. So I updated the FLIP's title to
>> > > "Unified
>> > > > > > Transactional Sink API". But I found that the old link could
>> not be
>> > > > > opened
>> > > > > > again.
>> > > > > >
>> > > > > > I would update the link[2] here. Sorry for the inconvenience.
>> > > > > >
>> > > > > > [1]
>> > > > > >
>> > > > >
>> > >
>> >
>> https://lists.apache.org/thread.html/rf09dfeeaf35da5ee98afe559b5a6e955c9f03ade0262727f6b5c4c1e%40%3Cdev.flink.apache.org%3E
>> > > > > > [2] https://cwiki.apache.org/confluence/x/KEJ4CQ
>> > > > > >
>> > > > > > Best,
>> > > > > > Guowei
>> > > > > >
>> > > > > >
>> > > > > > On Thu, Sep 24, 2020 at 8:13 PM Guowei Ma > >
>> > > wrote:
>> > > > > >
>> > > > > >> Hi, all
>> > > > > >>
>> > > > > >> After the discussion in [1], I would like to open a voting
>> thread
>> > > for
>> > > > > >> FLIP-143 [2], which proposes a unified sink api.
>> > > > > >>
>> > > > > >> The vote will be open until September 29th (72h + weekend),
>> unless
>> > > there
>> > > > > >> is an objection or not enough votes.
>> > > > > >>
>> > > > > >> [1]
>> > > > > >>
>> > > > >
>> > >
>> >
>> https://lists.apache.org/thread.html/rf09dfeeaf35da5ee98afe559b5a6e955c9f03ade0262727f6b5c4c1e%40%3Cdev.flink.apache.org%3E
>> > > > > >> [2]
>> > > > > >>
>> > > > >
>> > >
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API
>> > > > > >>
>> > > > > >> Best,
>> > > > > >> Guowei
>> > > > > >>
>> > > > > >
>> > > > >
>> > > > >
>> > >
>> >
>> >
>> > --
>> > Best, Jingsong Lee
>> >
>>
>


Re: Re: [ANNOUNCE] New PMC member: Zhu Zhu

2020-10-05 Thread Guowei Ma
Congratulations Zhu Zhu!
Best,
Guowei


On Tue, Oct 6, 2020 at 5:47 AM Fabian Hueske  wrote:

> Congrats and welcome Zhu!
>
> Best, Fabian
>
> Am Mo., 5. Okt. 2020 um 14:33 Uhr schrieb Yun Gao
> :
>
> > Congratulations Zhu!
> >
> > Best,
> > Yun--
> > Sender:Arvid Heise
> > Date:2020/10/05 15:53:01
> > Recipient:dev
> > Cc:
> > Theme:Re: [ANNOUNCE] New PMC member: Zhu Zhu
> >
> > Congratulations Zhu Zhu!
> >
> > On Mon, Oct 5, 2020 at 9:28 AM Till Rohrmann 
> wrote:
> >
> > > Congrats and welcome Zhu Zhu!
> > >
> > > Cheers,
> > > Till
> > >
> > > On Mon, Oct 5, 2020 at 7:32 AM Robert Metzger 
> > wrote:
> > >
> > > > Hi all!
> > > >
> > > > I'm very happy to announce that Zhu Zhu has joined the Flink PMC!
> > > >
> > > > Zhu is helping the community a lot with creating and validating
> > releases,
> > > > contributing to FLIP discussions and good code contributions to the
> > > > scheduler and related components.
> > > >
> > > > Congratulations and welcome Zhu Zhu!
> > > >
> > > > Regards,
> > > > Robert
> > > >
> > >
> >
> >
> > --
> >
> > Arvid Heise | Senior Java Developer
> >
> > 
> >
> > Follow us @VervericaData
> >
> > --
> >
> > Join Flink Forward  - The Apache Flink
> > Conference
> >
> > Stream Processing | Event Driven | Real Time
> >
> > --
> >
> > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
> >
> > --
> > Ververica GmbH
> > Registered at Amtsgericht Charlottenburg: HRB 158244 B
> > Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> > (Toni) Cheng
> >
> >
>


Re: [VOTE] FLIP-143: Unified Sink API

2020-09-29 Thread Guowei Ma
Hi all,

The voting time for FLIP-143 [1] has passed. I'm closing the vote now.

There were 5 votes, 4 of which are binding:

- Aljoscha Krettek (binding)
- Steven Wu
- Kostas Kloudas (binding)
- Jingsong Li (binding)
- Jiangang Liu (binding)

There were no -1 votes.

Thus, changes have been accepted. I'll update the FLIP doc accordingly.

There is still a discussion about the `CommitResult/GlobaCommitter
interface, which we could do more discussion at the implementation phase.

Thanks everyone for participating and discussing

[1] https://cwiki.apache.org/confluence/x/KEJ4CQ

Best,
Guowei


On Tue, Sep 29, 2020 at 6:32 PM 刘建刚  wrote:

> +1 (binding)
>
> Best,
> Liu Jiangang
>
> Jingsong Li  于2020年9月29日周二 下午1:36写道:
>
> > +1 (binding)
> >
> > Best,
> > Jingsong
> >
> > On Mon, Sep 28, 2020 at 3:21 AM Kostas Kloudas 
> wrote:
> >
> > > +1 (binding)
> > >
> > > @Steven Wu I think there will be opportunities to fine tune the API
> > > during the implementation.
> > >
> > > Cheers,
> > > Kostas
> > >
> > > On Sun, Sep 27, 2020 at 7:56 PM Steven Wu 
> wrote:
> > > >
> > > > +1 (non-binding)
> > > >
> > > > Although I would love to continue the discussion for tweaking the
> > > > CommitResult/GlobaCommitter interface maybe during the implementation
> > > phase.
> > > >
> > > > On Fri, Sep 25, 2020 at 5:35 AM Aljoscha Krettek <
> aljos...@apache.org>
> > > > wrote:
> > > >
> > > > > +1 (binding)
> > > > >
> > > > > Aljoscha
> > > > >
> > > > > On 25.09.20 14:26, Guowei Ma wrote:
> > > > > >  From the discussion[1] we could find that FLIP focuses on
> > providing
> > > an
> > > > > > unified transactional sink API. So I updated the FLIP's title to
> > > "Unified
> > > > > > Transactional Sink API". But I found that the old link could not
> be
> > > > > opened
> > > > > > again.
> > > > > >
> > > > > > I would update the link[2] here. Sorry for the inconvenience.
> > > > > >
> > > > > > [1]
> > > > > >
> > > > >
> > >
> >
> https://lists.apache.org/thread.html/rf09dfeeaf35da5ee98afe559b5a6e955c9f03ade0262727f6b5c4c1e%40%3Cdev.flink.apache.org%3E
> > > > > > [2] https://cwiki.apache.org/confluence/x/KEJ4CQ
> > > > > >
> > > > > > Best,
> > > > > > Guowei
> > > > > >
> > > > > >
> > > > > > On Thu, Sep 24, 2020 at 8:13 PM Guowei Ma 
> > > wrote:
> > > > > >
> > > > > >> Hi, all
> > > > > >>
> > > > > >> After the discussion in [1], I would like to open a voting
> thread
> > > for
> > > > > >> FLIP-143 [2], which proposes a unified sink api.
> > > > > >>
> > > > > >> The vote will be open until September 29th (72h + weekend),
> unless
> > > there
> > > > > >> is an objection or not enough votes.
> > > > > >>
> > > > > >> [1]
> > > > > >>
> > > > >
> > >
> >
> https://lists.apache.org/thread.html/rf09dfeeaf35da5ee98afe559b5a6e955c9f03ade0262727f6b5c4c1e%40%3Cdev.flink.apache.org%3E
> > > > > >> [2]
> > > > > >>
> > > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API
> > > > > >>
> > > > > >> Best,
> > > > > >> Guowei
> > > > > >>
> > > > > >
> > > > >
> > > > >
> > >
> >
> >
> > --
> > Best, Jingsong Lee
> >
>


Re: [DISCUSS] FLIP-143: Unified Sink API

2020-09-26 Thread Guowei Ma
Hi Steven

Thank you very much for your detailed explanation.

Now I got your point, I could see that there are benefits from committing a
collection of `GlobalCommT` as a whole when the external metastore
environment is unstable at some time.

But I have two little concern about introducing committing the collection
of `GlobalCommit`:

1. For Option1: CommitResult commit(List). This option
implies that users should commit to the collection of `GlobalCommit` as a
whole.
But maybe not all the system could do it as a whole, for example changing
some file names could not do it. If it is the case I think maybe some guy
would always ask the same question as I asked in the previous mail.

2. For Option2: List commit(List). This option
is more clear than the first one. But IMHO this option has only benefits
when the external metastore is unstable and we want to retry many times and
not fail the job. Maybe we should not rety so many times and end up with a
lot of the uncommitted `GlobalCommitT`. If this is the case maybe we should
make the api more clear/simple for the normal scenario. In addition there
is only a globalcommit instance so I think the external system could bear
the pressure.

So personally I would like to say we might keep the API simpler at the
beginning in 1.12

What do you think?

Best,
Guowei


On Fri, Sep 25, 2020 at 9:30 PM Steven Wu  wrote:

> I should clarify my last email a little more.
>
> For the example of commits for checkpoints 1-100 failed, the job is still
> up (processing records and uploading files). When commit for checkpoint 101
> came, IcebergSink would prefer the framework to pass in all 101 GlobalCommT
> (100 old + 1 new) so that it can commit all of them in one transaction. it
> is more efficient than 101 separate transactions.
>
> Maybe the GlobalCommitter#commit semantics is to give the sink all
> uncommitted GlobalCommT items and let sink implementation decide whether to
> retry one by one or in a single transaction. It could mean that we need to
> expand the CommitResult (e.g. a list for each result type, SUCCESS,
> FAILURE, RETRY) interface. We can also start with the simple enum style
> result for the whole list for now. If we need to break the experimental
> API, it is also not a big deal since we only need to update a few sink
> implementations.
>
> Thanks,
> Steven
>
> On Fri, Sep 25, 2020 at 5:56 AM Steven Wu  wrote:
>
> > > 1. The frame can not know which `GlobalCommT` to retry if we use the
> > > List as parameter when the `commit` returns `RETRY`.
> > > 2. Of course we can let the `commit` return more detailed info but it
> > might
> > > be too complicated.
> >
> > If commit(List) returns RETRY, it means the whole list needs
> > to be retried. E.g. we have some outage with metadata service, commits
> for
> > checkpoints 1-100 failed. We can accumulate 100 GlobalCommT items. we
> don't
> > want to commit them one by one. It is faster to commit the whole list as
> > one batch.
> >
> > > 3. On the other hand, I think only when restoring IcebergSink needs a
> > > collection of `GlobalCommT` and giving back another collection of
> > > `GlobalCommT` that are not committed
> >
> > That is when the job restarted due to failure or deployment.
> >
> >
> > On Fri, Sep 25, 2020 at 5:24 AM Guowei Ma  wrote:
> >
> >> Hi, all
> >>
> >> From the above discussion we could find that FLIP focuses on providing
> an
> >> unified transactional sink API. So I updated the FLIP's title to
> "Unified
> >> Transactional Sink API". But I found that the old link could not be
> opened
> >> again.
> >>
> >> I would update the link[1] here. Sorry for the inconvenience.
> >>
> >> [1]https://cwiki.apache.org/confluence/x/KEJ4CQ
> >>
> >> Best,
> >> Guowei
> >>
> >>
> >> On Fri, Sep 25, 2020 at 3:26 PM Guowei Ma  wrote:
> >>
> >> > Hi, Steven
> >> >
> >> > >>I also have a clarifying question regarding the WriterStateT. Since
> >> > >>IcebergWriter won't need to checkpoint any state, should we set it
> to
> >> > *Void*
> >> > >>type? Since getWriterStateSerializer() returns Optional, that is
> clear
> >> > and
> >> > >>we can return Optional.empty().
> >> >
> >> > Yes I think you could do it. If you return Optional.empty() we would
> >> > ignore all the state you return.
> >> >
> >> > Best,
> >> > Guowei
> >> >
> >> >
> >> > On Fri, Sep 25, 2020 at 3:14 PM Guowei Ma 
&g

Re: [VOTE] FLIP-143: Unified Sink API

2020-09-25 Thread Guowei Ma
>From the discussion[1] we could find that FLIP focuses on providing an
unified transactional sink API. So I updated the FLIP's title to "Unified
Transactional Sink API". But I found that the old link could not be opened
again.

I would update the link[2] here. Sorry for the inconvenience.

[1]
https://lists.apache.org/thread.html/rf09dfeeaf35da5ee98afe559b5a6e955c9f03ade0262727f6b5c4c1e%40%3Cdev.flink.apache.org%3E
[2] https://cwiki.apache.org/confluence/x/KEJ4CQ

Best,
Guowei


On Thu, Sep 24, 2020 at 8:13 PM Guowei Ma  wrote:

> Hi, all
>
> After the discussion in [1], I would like to open a voting thread for
> FLIP-143 [2], which proposes a unified sink api.
>
> The vote will be open until September 29th (72h + weekend), unless there
> is an objection or not enough votes.
>
> [1]
> https://lists.apache.org/thread.html/rf09dfeeaf35da5ee98afe559b5a6e955c9f03ade0262727f6b5c4c1e%40%3Cdev.flink.apache.org%3E
> [2]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API
>
> Best,
> Guowei
>


Re: [DISCUSS] FLIP-143: Unified Sink API

2020-09-25 Thread Guowei Ma
Hi, all

>From the above discussion we could find that FLIP focuses on providing an
unified transactional sink API. So I updated the FLIP's title to "Unified
Transactional Sink API". But I found that the old link could not be opened
again.

I would update the link[1] here. Sorry for the inconvenience.

[1]https://cwiki.apache.org/confluence/x/KEJ4CQ

Best,
Guowei


On Fri, Sep 25, 2020 at 3:26 PM Guowei Ma  wrote:

> Hi, Steven
>
> >>I also have a clarifying question regarding the WriterStateT. Since
> >>IcebergWriter won't need to checkpoint any state, should we set it to
> *Void*
> >>type? Since getWriterStateSerializer() returns Optional, that is clear
> and
> >>we can return Optional.empty().
>
> Yes I think you could do it. If you return Optional.empty() we would
> ignore all the state you return.
>
> Best,
> Guowei
>
>
> On Fri, Sep 25, 2020 at 3:14 PM Guowei Ma  wrote:
>
>> Hi,Steven
>>
>> Thank you for reading the FLIP so carefully.
>> 1. The frame can not know which `GlobalCommT` to retry if we use the
>> List as parameter when the `commit` returns `RETRY`.
>> 2. Of course we can let the `commit` return more detailed info but it
>> might be too complicated.
>> 3. On the other hand, I think only when restoring IcebergSink needs a
>> collection of `GlobalCommT` and giving back another collection of
>> `GlobalCommT` that are not committed.
>>
>> Best,
>> Guowei
>>
>>
>> On Fri, Sep 25, 2020 at 1:45 AM Steven Wu  wrote:
>>
>>> Guowei,
>>>
>>> Thanks a lot for updating the wiki page. It looks great.
>>>
>>> I noticed one inconsistency in the wiki with your last summary email for
>>> GlobalCommitter interface. I think the version in the summary email is
>>> the
>>> intended one, because rollover from previous failed commits can
>>> accumulate
>>> a list.
>>> CommitResult commit(GlobalCommT globalCommittable); // in the wiki
>>> =>
>>> CommitResult commit(List globalCommittable);  // in the
>>> summary email
>>>
>>> I also have a clarifying question regarding the WriterStateT. Since
>>> IcebergWriter won't need to checkpoint any state, should we set it to
>>> *Void*
>>> type? Since getWriterStateSerializer() returns Optional, that is clear
>>> and
>>> we can return Optional.empty().
>>>
>>> Thanks,
>>> Steven
>>>
>>> On Wed, Sep 23, 2020 at 6:59 PM Guowei Ma  wrote:
>>>
>>> > Thanks Aljoscha for your suggestion.  I have updated FLIP. Any
>>> comments are
>>> > welcome.
>>> >
>>> > Best,
>>> > Guowei
>>> >
>>> >
>>> > On Wed, Sep 23, 2020 at 4:25 PM Aljoscha Krettek 
>>> > wrote:
>>> >
>>> > > Yes, that sounds good! I'll probably have some comments on the FLIP
>>> > > about the names of generic parameters and the Javadoc but we can
>>> address
>>> > > them later or during implementation.
>>> > >
>>> > > I also think that we probably need the FAIL,RETRY,SUCCESS result for
>>> > > globalCommit() but we can also do that as a later addition.
>>> > >
>>> > > So I think we're good to go to update the FLIP, do any last minute
>>> > > changes and then vote.
>>> > >
>>> > > Best,
>>> > > Aljoscha
>>> > >
>>> > > On 23.09.20 06:13, Guowei Ma wrote:
>>> > > > Hi, all
>>> > > >
>>> > > > Thank everyone very much for your ideas and suggestions. I would
>>> try to
>>> > > > summarize again the consensus :). Correct me if I am wrong or
>>> > > misunderstand
>>> > > > you.
>>> > > >
>>> > > > ## Consensus-1
>>> > > >
>>> > > > 1. The motivation of the unified sink API is to decouple the sink
>>> > > > implementation from the different runtime execution mode.
>>> > > > 2. The initial scope of the unified sink API only covers the file
>>> > system
>>> > > > type, which supports the real transactions. The FLIP focuses more
>>> on
>>> > the
>>> > > > semantics the new sink api should support.
>>> > > > 3. We prefer the first alternative API, which could give the
>>> framework
>>> > a
>>> > &

Re: [DISCUSS] FLIP-143: Unified Sink API

2020-09-25 Thread Guowei Ma
Hi, Steven

>>I also have a clarifying question regarding the WriterStateT. Since
>>IcebergWriter won't need to checkpoint any state, should we set it to
*Void*
>>type? Since getWriterStateSerializer() returns Optional, that is clear and
>>we can return Optional.empty().

Yes I think you could do it. If you return Optional.empty() we would ignore
all the state you return.

Best,
Guowei


On Fri, Sep 25, 2020 at 3:14 PM Guowei Ma  wrote:

> Hi,Steven
>
> Thank you for reading the FLIP so carefully.
> 1. The frame can not know which `GlobalCommT` to retry if we use the
> List as parameter when the `commit` returns `RETRY`.
> 2. Of course we can let the `commit` return more detailed info but it
> might be too complicated.
> 3. On the other hand, I think only when restoring IcebergSink needs a
> collection of `GlobalCommT` and giving back another collection of
> `GlobalCommT` that are not committed.
>
> Best,
> Guowei
>
>
> On Fri, Sep 25, 2020 at 1:45 AM Steven Wu  wrote:
>
>> Guowei,
>>
>> Thanks a lot for updating the wiki page. It looks great.
>>
>> I noticed one inconsistency in the wiki with your last summary email for
>> GlobalCommitter interface. I think the version in the summary email is the
>> intended one, because rollover from previous failed commits can accumulate
>> a list.
>> CommitResult commit(GlobalCommT globalCommittable); // in the wiki
>> =>
>> CommitResult commit(List globalCommittable);  // in the
>> summary email
>>
>> I also have a clarifying question regarding the WriterStateT. Since
>> IcebergWriter won't need to checkpoint any state, should we set it to
>> *Void*
>> type? Since getWriterStateSerializer() returns Optional, that is clear and
>> we can return Optional.empty().
>>
>> Thanks,
>> Steven
>>
>> On Wed, Sep 23, 2020 at 6:59 PM Guowei Ma  wrote:
>>
>> > Thanks Aljoscha for your suggestion.  I have updated FLIP. Any comments
>> are
>> > welcome.
>> >
>> > Best,
>> > Guowei
>> >
>> >
>> > On Wed, Sep 23, 2020 at 4:25 PM Aljoscha Krettek 
>> > wrote:
>> >
>> > > Yes, that sounds good! I'll probably have some comments on the FLIP
>> > > about the names of generic parameters and the Javadoc but we can
>> address
>> > > them later or during implementation.
>> > >
>> > > I also think that we probably need the FAIL,RETRY,SUCCESS result for
>> > > globalCommit() but we can also do that as a later addition.
>> > >
>> > > So I think we're good to go to update the FLIP, do any last minute
>> > > changes and then vote.
>> > >
>> > > Best,
>> > > Aljoscha
>> > >
>> > > On 23.09.20 06:13, Guowei Ma wrote:
>> > > > Hi, all
>> > > >
>> > > > Thank everyone very much for your ideas and suggestions. I would
>> try to
>> > > > summarize again the consensus :). Correct me if I am wrong or
>> > > misunderstand
>> > > > you.
>> > > >
>> > > > ## Consensus-1
>> > > >
>> > > > 1. The motivation of the unified sink API is to decouple the sink
>> > > > implementation from the different runtime execution mode.
>> > > > 2. The initial scope of the unified sink API only covers the file
>> > system
>> > > > type, which supports the real transactions. The FLIP focuses more on
>> > the
>> > > > semantics the new sink api should support.
>> > > > 3. We prefer the first alternative API, which could give the
>> framework
>> > a
>> > > > greater opportunity to optimize.
>> > > > 4. The `Writer` needs to add a method `prepareCommit`, which would
>> be
>> > > > called from `prepareSnapshotPreBarrier`. And remove the `Flush`
>> method.
>> > > > 5. The FLIP could move the `Snapshot & Drain` section in order to be
>> > more
>> > > > focused.
>> > > >
>> > > > ## Consensus-2
>> > > >
>> > > > 1. What should the “Unified Sink API” support/cover? It includes two
>> > > > aspects. 1. The same sink implementation would work for both the
>> batch
>> > > and
>> > > > stream execution mode. 2. In the long run we should give the sink
>> > > developer
>> > > > the ability of building “arbitrary” topologies. But for Flink-1.12
>> 

Re: [DISCUSS] FLIP-143: Unified Sink API

2020-09-25 Thread Guowei Ma
Hi,Steven

Thank you for reading the FLIP so carefully.
1. The frame can not know which `GlobalCommT` to retry if we use the
List as parameter when the `commit` returns `RETRY`.
2. Of course we can let the `commit` return more detailed info but it might
be too complicated.
3. On the other hand, I think only when restoring IcebergSink needs a
collection of `GlobalCommT` and giving back another collection of
`GlobalCommT` that are not committed.

Best,
Guowei


On Fri, Sep 25, 2020 at 1:45 AM Steven Wu  wrote:

> Guowei,
>
> Thanks a lot for updating the wiki page. It looks great.
>
> I noticed one inconsistency in the wiki with your last summary email for
> GlobalCommitter interface. I think the version in the summary email is the
> intended one, because rollover from previous failed commits can accumulate
> a list.
> CommitResult commit(GlobalCommT globalCommittable); // in the wiki
> =>
> CommitResult commit(List globalCommittable);  // in the
> summary email
>
> I also have a clarifying question regarding the WriterStateT. Since
> IcebergWriter won't need to checkpoint any state, should we set it to
> *Void*
> type? Since getWriterStateSerializer() returns Optional, that is clear and
> we can return Optional.empty().
>
> Thanks,
> Steven
>
> On Wed, Sep 23, 2020 at 6:59 PM Guowei Ma  wrote:
>
> > Thanks Aljoscha for your suggestion.  I have updated FLIP. Any comments
> are
> > welcome.
> >
> > Best,
> > Guowei
> >
> >
> > On Wed, Sep 23, 2020 at 4:25 PM Aljoscha Krettek 
> > wrote:
> >
> > > Yes, that sounds good! I'll probably have some comments on the FLIP
> > > about the names of generic parameters and the Javadoc but we can
> address
> > > them later or during implementation.
> > >
> > > I also think that we probably need the FAIL,RETRY,SUCCESS result for
> > > globalCommit() but we can also do that as a later addition.
> > >
> > > So I think we're good to go to update the FLIP, do any last minute
> > > changes and then vote.
> > >
> > > Best,
> > > Aljoscha
> > >
> > > On 23.09.20 06:13, Guowei Ma wrote:
> > > > Hi, all
> > > >
> > > > Thank everyone very much for your ideas and suggestions. I would try
> to
> > > > summarize again the consensus :). Correct me if I am wrong or
> > > misunderstand
> > > > you.
> > > >
> > > > ## Consensus-1
> > > >
> > > > 1. The motivation of the unified sink API is to decouple the sink
> > > > implementation from the different runtime execution mode.
> > > > 2. The initial scope of the unified sink API only covers the file
> > system
> > > > type, which supports the real transactions. The FLIP focuses more on
> > the
> > > > semantics the new sink api should support.
> > > > 3. We prefer the first alternative API, which could give the
> framework
> > a
> > > > greater opportunity to optimize.
> > > > 4. The `Writer` needs to add a method `prepareCommit`, which would be
> > > > called from `prepareSnapshotPreBarrier`. And remove the `Flush`
> method.
> > > > 5. The FLIP could move the `Snapshot & Drain` section in order to be
> > more
> > > > focused.
> > > >
> > > > ## Consensus-2
> > > >
> > > > 1. What should the “Unified Sink API” support/cover? It includes two
> > > > aspects. 1. The same sink implementation would work for both the
> batch
> > > and
> > > > stream execution mode. 2. In the long run we should give the sink
> > > developer
> > > > the ability of building “arbitrary” topologies. But for Flink-1.12 we
> > > > should be more focused on only satisfying the S3/HDFS/Iceberg sink.
> > > > 2. Because the batch execution mode does not have the normal
> checkpoint
> > > the
> > > > sink developer should not depend on it any more if we want a unified
> > > sink.
> > > > 3. We can benefit by providing an asynchronous `Writer` version. But
> > > > because the unified sink is already very complicated, we don’t add
> this
> > > in
> > > > the first version.
> > > >
> > > >
> > > > According to these consensus I would propose the first version of the
> > new
> > > > sink api as follows. What do you think? Any comments are welcome.
> > > >
> > > > /**
> > > >   * This interface lets the sink developer build a simple
>

[VOTE] FLIP-143: Unified Sink API

2020-09-24 Thread Guowei Ma
Hi, all

After the discussion in [1], I would like to open a voting thread for
FLIP-143 [2], which proposes a unified sink api.

The vote will be open until September 29th (72h + weekend), unless there is
an objection or not enough votes.

[1]
https://lists.apache.org/thread.html/rf09dfeeaf35da5ee98afe559b5a6e955c9f03ade0262727f6b5c4c1e%40%3Cdev.flink.apache.org%3E
[2]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API

Best,
Guowei


Re: [DISCUSS] FLIP-143: Unified Sink API

2020-09-23 Thread Guowei Ma
Thanks Aljoscha for your suggestion.  I have updated FLIP. Any comments are
welcome.

Best,
Guowei


On Wed, Sep 23, 2020 at 4:25 PM Aljoscha Krettek 
wrote:

> Yes, that sounds good! I'll probably have some comments on the FLIP
> about the names of generic parameters and the Javadoc but we can address
> them later or during implementation.
>
> I also think that we probably need the FAIL,RETRY,SUCCESS result for
> globalCommit() but we can also do that as a later addition.
>
> So I think we're good to go to update the FLIP, do any last minute
> changes and then vote.
>
> Best,
> Aljoscha
>
> On 23.09.20 06:13, Guowei Ma wrote:
> > Hi, all
> >
> > Thank everyone very much for your ideas and suggestions. I would try to
> > summarize again the consensus :). Correct me if I am wrong or
> misunderstand
> > you.
> >
> > ## Consensus-1
> >
> > 1. The motivation of the unified sink API is to decouple the sink
> > implementation from the different runtime execution mode.
> > 2. The initial scope of the unified sink API only covers the file system
> > type, which supports the real transactions. The FLIP focuses more on the
> > semantics the new sink api should support.
> > 3. We prefer the first alternative API, which could give the framework a
> > greater opportunity to optimize.
> > 4. The `Writer` needs to add a method `prepareCommit`, which would be
> > called from `prepareSnapshotPreBarrier`. And remove the `Flush` method.
> > 5. The FLIP could move the `Snapshot & Drain` section in order to be more
> > focused.
> >
> > ## Consensus-2
> >
> > 1. What should the “Unified Sink API” support/cover? It includes two
> > aspects. 1. The same sink implementation would work for both the batch
> and
> > stream execution mode. 2. In the long run we should give the sink
> developer
> > the ability of building “arbitrary” topologies. But for Flink-1.12 we
> > should be more focused on only satisfying the S3/HDFS/Iceberg sink.
> > 2. Because the batch execution mode does not have the normal checkpoint
> the
> > sink developer should not depend on it any more if we want a unified
> sink.
> > 3. We can benefit by providing an asynchronous `Writer` version. But
> > because the unified sink is already very complicated, we don’t add this
> in
> > the first version.
> >
> >
> > According to these consensus I would propose the first version of the new
> > sink api as follows. What do you think? Any comments are welcome.
> >
> > /**
> >   * This interface lets the sink developer build a simple transactional
> sink
> > topology pattern, which satisfies the HDFS/S3/Iceberg sink.
> >   * This sink topology includes one {@link Writer} + one {@link
> Committer} +
> > one {@link GlobalCommitter}.
> >   * The {@link Writer} is responsible for producing the committable.
> >   * The {@link Committer} is responsible for committing a single
> > committables.
> >   * The {@link GlobalCommitter} is responsible for committing an
> aggregated
> > committable, which we called global committables.
> >   *
> >   * But both the {@link Committer} and the {@link GlobalCommitter} are
> > optional.
> >   */
> > interface TSink {
> >
> >  Writer createWriter(InitContext
> initContext);
> >
> >  Writer restoreWriter(InitContext
> initContext,
> > List states);
> >
> >  Optional> createCommitter();
> >
> >  Optional>
> createGlobalCommitter();
> >
> >  SimpleVersionedSerializer getCommittableSerializer();
> >
> >  Optional>
> > getGlobalCommittableSerializer();
> > }
> >
> > /**
> >   * The {@link GlobalCommitter} is responsible for committing an
> aggregated
> > committable, which we called global committables.
> >   */
> > interface GlobalCommitter {
> >
> >  /**
> >   * This method is called when restoring from a failover.
> >   * @param globalCommittables the global committables that are
> not
> > committed in the previous session.
> >   * @return the global committables that should be committed
> again
> > in the current session.
> >   */
> >  List filterRecoveredCommittables(List
> > globalCommittables);
> >
> >  /**
> >   * Compute an aggregated committable from a collection of
> > committables.
> >   * @param committables a collection of committables that are
> needed
> > to combine
> >   * @return an aggregated committable
> >   */
> >  GCommT combine(List committables);
> >
> >  void commit(List globalCommittables);
> >
> >  /**
> >   * There are no committables any more.
> >   */
> >  void endOfInput();
> > }
> >
> > Best,
> > Guowei
>
>


[jira] [Created] (FLINK-19376) `table.generated-code.max-length` and `table.sql-dialec` 's document is needed to be updated

2020-09-23 Thread Guowei Ma (Jira)
Guowei Ma created FLINK-19376:
-

 Summary: `table.generated-code.max-length` and `table.sql-dialec` 
's document is needed to be updated
 Key: FLINK-19376
 URL: https://issues.apache.org/jira/browse/FLINK-19376
 Project: Flink
  Issue Type: Task
  Components: Documentation
Affects Versions: 1.12.0
Reporter: Guowei Ma






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19375) The `kubernetes.secrets` and `kubernetes.env.secretKeyRef`'s document is missing

2020-09-23 Thread Guowei Ma (Jira)
Guowei Ma created FLINK-19375:
-

 Summary: The `kubernetes.secrets` and 
`kubernetes.env.secretKeyRef`'s document is missing
 Key: FLINK-19375
 URL: https://issues.apache.org/jira/browse/FLINK-19375
 Project: Flink
  Issue Type: Task
  Components: Documentation
Affects Versions: 1.12.0
Reporter: Guowei Ma






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19374) update the `table.exec.state.ttl`'s documentation

2020-09-23 Thread Guowei Ma (Jira)
Guowei Ma created FLINK-19374:
-

 Summary: update the `table.exec.state.ttl`'s documentation
 Key: FLINK-19374
 URL: https://issues.apache.org/jira/browse/FLINK-19374
 Project: Flink
  Issue Type: Task
  Components: Documentation
Affects Versions: 1.12.0
Reporter: Guowei Ma






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19373) `jmx.server.port`'s document is missing.

2020-09-23 Thread Guowei Ma (Jira)
Guowei Ma created FLINK-19373:
-

 Summary: `jmx.server.port`'s document is missing.
 Key: FLINK-19373
 URL: https://issues.apache.org/jira/browse/FLINK-19373
 Project: Flink
  Issue Type: Bug
  Components: Documentation
Affects Versions: 1.12.0
Reporter: Guowei Ma






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] FLIP-143: Unified Sink API

2020-09-22 Thread Guowei Ma
Hi, all

Thank everyone very much for your ideas and suggestions. I would try to
summarize again the consensus :). Correct me if I am wrong or misunderstand
you.

## Consensus-1

1. The motivation of the unified sink API is to decouple the sink
implementation from the different runtime execution mode.
2. The initial scope of the unified sink API only covers the file system
type, which supports the real transactions. The FLIP focuses more on the
semantics the new sink api should support.
3. We prefer the first alternative API, which could give the framework a
greater opportunity to optimize.
4. The `Writer` needs to add a method `prepareCommit`, which would be
called from `prepareSnapshotPreBarrier`. And remove the `Flush` method.
5. The FLIP could move the `Snapshot & Drain` section in order to be more
focused.

## Consensus-2

1. What should the “Unified Sink API” support/cover? It includes two
aspects. 1. The same sink implementation would work for both the batch and
stream execution mode. 2. In the long run we should give the sink developer
the ability of building “arbitrary” topologies. But for Flink-1.12 we
should be more focused on only satisfying the S3/HDFS/Iceberg sink.
2. Because the batch execution mode does not have the normal checkpoint the
sink developer should not depend on it any more if we want a unified sink.
3. We can benefit by providing an asynchronous `Writer` version. But
because the unified sink is already very complicated, we don’t add this in
the first version.


According to these consensus I would propose the first version of the new
sink api as follows. What do you think? Any comments are welcome.

/**
 * This interface lets the sink developer build a simple transactional sink
topology pattern, which satisfies the HDFS/S3/Iceberg sink.
 * This sink topology includes one {@link Writer} + one {@link Committer} +
one {@link GlobalCommitter}.
 * The {@link Writer} is responsible for producing the committable.
 * The {@link Committer} is responsible for committing a single
committables.
 * The {@link GlobalCommitter} is responsible for committing an aggregated
committable, which we called global committables.
 *
 * But both the {@link Committer} and the {@link GlobalCommitter} are
optional.
 */
interface TSink {

Writer createWriter(InitContext initContext);

Writer restoreWriter(InitContext initContext,
List states);

Optional> createCommitter();

Optional> createGlobalCommitter();

SimpleVersionedSerializer getCommittableSerializer();

Optional>
getGlobalCommittableSerializer();
}

/**
 * The {@link GlobalCommitter} is responsible for committing an aggregated
committable, which we called global committables.
 */
interface GlobalCommitter {

/**
 * This method is called when restoring from a failover.
 * @param globalCommittables the global committables that are not
committed in the previous session.
 * @return the global committables that should be committed again
in the current session.
 */
List filterRecoveredCommittables(List
globalCommittables);

/**
 * Compute an aggregated committable from a collection of
committables.
 * @param committables a collection of committables that are needed
to combine
 * @return an aggregated committable
 */
GCommT combine(List committables);

void commit(List globalCommittables);

/**
 * There are no committables any more.
 */
void endOfInput();
}

Best,
Guowei


On Wed, Sep 23, 2020 at 12:03 PM Guowei Ma  wrote:

> >> I think we should go with something like
>
> >> List filterRecoveredCommittables(List<>)
>
> >> to keep things simple. This should also be easy to do from the framework
> >> side and then the sink doesn't need to do any custom state handling.
>
> I second Aljoscha's  proposal. For the first version there is already much
> stuff to do.
> For now I think it would be satisfied with IceBerg Sink.
>
> Best,
> Guowei
>
>
> On Tue, Sep 22, 2020 at 10:54 PM Aljoscha Krettek 
> wrote:
>
>> I think we should go with something like
>>
>> List filterRecoveredCommittables(List<>)
>>
>> to keep things simple. This should also be easy to do from the framework
>> side and then the sink doesn't need to do any custom state handling.
>>
>> Best,
>> Aljoscha
>>
>> On 22.09.20 16:03, Steven Wu wrote:
>> > Previous APIs discussed have been trying to do more in the framework.
>> If we
>> > take a different approach to a lighter framework, these sets of
>> > minimal APIs are probably good enough. Sink can handle the bookkeeping,
>> > merge, retry logics.
>> >
>> > /**
>> >   * CommT is the DataFile in Iceberg
>> > 

Re: [DISCUSS] FLIP-143: Unified Sink API

2020-09-22 Thread Guowei Ma
>> I think we should go with something like

>> List filterRecoveredCommittables(List<>)

>> to keep things simple. This should also be easy to do from the framework
>> side and then the sink doesn't need to do any custom state handling.

I second Aljoscha's  proposal. For the first version there is already much
stuff to do.
For now I think it would be satisfied with IceBerg Sink.

Best,
Guowei


On Tue, Sep 22, 2020 at 10:54 PM Aljoscha Krettek 
wrote:

> I think we should go with something like
>
> List filterRecoveredCommittables(List<>)
>
> to keep things simple. This should also be easy to do from the framework
> side and then the sink doesn't need to do any custom state handling.
>
> Best,
> Aljoscha
>
> On 22.09.20 16:03, Steven Wu wrote:
> > Previous APIs discussed have been trying to do more in the framework. If
> we
> > take a different approach to a lighter framework, these sets of
> > minimal APIs are probably good enough. Sink can handle the bookkeeping,
> > merge, retry logics.
> >
> > /**
> >   * CommT is the DataFile in Iceberg
> >   * GlobalCommT is the checkpoint data type, like ManifestFile in Iceberg
> > */
> > interface GlobalCommitter {
> >
> >void collect(CommT);
> >
> >void commit();
> >
> >List snapshotState();
> >
> >// this is just a callback to sink so that it can perform filter and
> > retain the uncommitted GlobalCommT in the internal bookkeeping
> >void recoveredCommittables(List) ;
> > }
> >
> > The most important need from the framework is to run GlobalCommitter in
> the
> > jobmanager. It involves the topology creation, checkpoint handling,
> > serializing the executions of commit() calls etc.
> >
> > Thanks,
> > Steven
> >
> > On Tue, Sep 22, 2020 at 6:39 AM Steven Wu  wrote:
> >
> >> It is fine to leave the CommitResult/RETRY outside the scope of
> framework.
> >> Then the framework might need to provide some hooks in the
> >> checkpoint/restore logic. because the commit happened in the post
> >> checkpoint completion step, sink needs to update the internal state when
> >> the commit is successful so that the next checkpoint won't include the
> >> committed GlobalCommT.
> >>
> >> Maybe GlobalCommitter can have an API like this?
> >>> List snapshotState();
> >>
> >> But then we still need the recover API if we don't let sink directly
> >> manage the state.
> >>> List recoverCommittables(List)
> >>
> >> Thanks,
> >> Steven
> >>
> >> On Tue, Sep 22, 2020 at 6:33 AM Aljoscha Krettek 
> >> wrote:
> >>
> >>> On 22.09.20 13:26, Guowei Ma wrote:
> >>>> Actually I am not sure adding `isAvailable` is enough. Maybe it is
> not.
> >>>> But for the initial version I hope we could make the sink api sync
> >>> because
> >>>> there is already a lot of stuff that has to finish. :--)
> >>>
> >>> I agree, for the first version we should stick to a simpler synchronous
> >>> interface.
> >>>
> >>> Aljoscha
> >>>
> >>
> >
>
>


Re: [DISCUSS] FLIP-143: Unified Sink API

2020-09-22 Thread Guowei Ma
>> I believe that we could support such an async sink writer
>> very easily in the future. What do you think?

>> How would you see the expansion in the future? Do you mean just adding
`isAvailable()` method with a default implementation later on?

Hi @piotr 

Actually I am not sure adding `isAvailable` is enough. Maybe it is not.
But for the initial version I hope we could make the sink api sync because
there is already a lot of stuff that has to finish. :--)

What do you think?

Best,
Guowei


On Tue, Sep 22, 2020 at 5:25 PM Aljoscha Krettek 
wrote:

> On 22.09.20 11:10, Guowei Ma wrote:
> > 1. I think maybe we could add a EOI interface to the `GlobalCommit`. So
> > that we could make `write success file` be available in both batch and
> > stream execution mode.
>
> We could, yes. I'm now hesitant because we're adding more things but I
> think it should be fine.
>
> > 2.  If we choose to let the two types of committer appear at the same
> time
> > in the API we have to figure out how to express the relation between the
> > two committers. I think the Sink API may look like the following: What do
> > you think?
> > Sink {
> >  Writer createWriter();
> >  Optional> createCommitter();
> >  Optional>
> > createGlobalCommitter();
> > }
>
> Yes, I think this is what we should do. Though I think that we should
> initially not support shared state. The FileSink only uses this to
> create unique file names and I think we can do without it. If we see
> that we need it later we can add it but I would like to keep things
> minimal initially. It's always easy to add things later but it's hard to
> take things away once you added them.
>
> > 3. Maybe a silly question: Why do we need `commit` return
> `CommitResult`? I
> > think the sink developer could rety himself. Why need the framework to do
> > the retry?
>
> It's not a silly question at all! I think we need the retry to support
> such problems as Steven mentioned. If a commit fails a RETRY tells the
> framework that it should keep the commits in state and retry them on the
> next checkpoint. When the committer returns FAILURE we should just fail
> the job. It's to support temporary outages of the external metastore.
>
> I'm open to leaving it out of the initial version for the same reasons I
> mentioned above but I think it could be valuable.
>
> Best,
> Aljoscha
>
>


Re: [DISCUSS] FLIP-143: Unified Sink API

2020-09-22 Thread Guowei Ma
Thanks @aljoscha summary. I agree we should postpone the discussion of the
sink topology first and focus on the normal file sink and IcebergSink in
the Flink 1.12.

I have three little questions:

1. I think maybe we could add a EOI interface to the `GlobalCommit`. So
that we could make `write success file` be available in both batch and
stream execution mode.
2.  If we choose to let the two types of committer appear at the same time
in the API we have to figure out how to express the relation between the
two committers. I think the Sink API may look like the following: What do
you think?
Sink {
Writer createWriter();
Optional> createCommitter();
Optional>
createGlobalCommitter();
}
3. Maybe a silly question: Why do we need `commit` return `CommitResult`? I
think the sink developer could rety himself. Why need the framework to do
the retry?

Best,
Guowei


On Tue, Sep 22, 2020 at 4:47 PM Aljoscha Krettek 
wrote:

> Ah sorry, I think I now see what you mean. I think it's ok to add a
> `List recoverCommittables(List)`
> method.
>
>
> On 22.09.20 09:42, Aljoscha Krettek wrote:
> > On 22.09.20 06:06, Steven Wu wrote:
> >> In addition, it is undesirable to do the committed-or-not check in the
> >> commit method, which happens for each checkpoint cycle. CommitResult
> >> already indicates SUCCESS or not. when framework calls commit with a
> list
> >> of GlobalCommittableT, it should be certain they are uncommitted. The
> >> only
> >> time we aren't sure is when a list of  GlobalCommittableT is restored
> >> from
> >> a checkpoint. `*recoverGlobalCommittables*` is the ideal place to do
> >> such a
> >> check and filter out the ones that were already committed. Retained ones
> >> will be committed in the next checkpoint cycle. Since framework takes
> >> care
> >> of the checkpoint and restore, we need some hook for the sink to add the
> >> custom logic on the restored list.
> >
> > I think we don't need the `recoverGlobalCommittables()` hook. The sink
> > implementation would have to do the filtering once, so it can either do
> > it in the recover hook or it could do it in the next `commit()` call.
> > Both of these would mean we only have to do one pass through the list
> > and connect to Iceberg. Doing the check in `commit()` would mean the
> > interface of GlobalCommittable is simpler and to me it seems natural
> > that we do the check in the commit() method to ensure that commits are
> > idempotent.
> >
> > What do you think?
> >
> > Aljoscha
>
>


Re: [DISCUSS] FLIP-143: Unified Sink API

2020-09-20 Thread Guowei Ma
I would like to summarize the file type sink in the thread and their
possible topologies.  I also try to give pros and cons of every topology
option. Correct me if I am wrong.

### FileSink

Topology Option: TmpFileWriter + Committer.

### IceBerg Sink

 Topology Option1: `DataFileWriter` + `GlobalCommitterV0`.
Pro:
1. Same group has some id.
Cons:
1. May limit users’ optimization space;
2. The topology does not meet the Hive’s requirements.

 Topology Option 2: `DataFileWriter` + `GlobalCommitterV1`
Pro:
1. User has the opportunity to optimize the implementation of idempotence
Cons:
2. Make the GlobalCommit more complicated.
3. The topology does not meets the Hive’s requirements

### Topology Option3: DataFileWriter + AggWriter + Committer

Pros:
1. Use two basic `Writer` & `Commiter` to meet the IceBerge’s requirements.
2. Opportunity to optimize the implementation of idempotence
3. The topology meets the Hive’s requirements.(See flowing)
Con:
1. It introduce a relative complex topologies

## HiveSink

### Topology Option1: `TmpFileWriter` + `Committer` + `GlobalCommitterV2`.
Pro:
1. Could skip the cleanup problem at first.
Con:
1. This style topology does not meet the CompactHiveSink requirements.

### Topology Option2: `TmpFileWriter` + `Committer` + `AggWriter` +
`Committer`
Pros
1. Could skip the clean up problem at first.
2. Decouple the GlobalCommitterV2 to `AggWriter` + `Committer`
Cons
1. This style topology does not meet the CompactHiveSink requirements.
2. There are two general `Committers` in the topology. For Hive’s case
there might be no problem. But there might be a problem in 1.12. For
example where to execute the sub-topology following the `Committer` in
batch execution mode for the general case. Because the topology is built
from `Writer` and `Committer` we might put all the sub-topology in the
OperatorCoordinator. But if the topology is too complicated it might be
very complicated. See following.

### Topology Option3 `FileWriter` + `AggWriter` + `Committer`
Pro
1. There is only one general committer.
Cons
1. It has to consider the cleanup problem. (In theory both the Option1 and
Option2 need to cleanup)
2. This style topology does not meet the CompactHiveSink requirements.
3. Have to figure out how to make the current version compatible.

### CompactHiveSink/MergeHiveSink

 Topology Option1 `TmpFileWriter` + `Committer` + `MergerCoordinator` +
`MergeWriter` + `GlobalCommiterV2`
Pro
1. Could skip the clean up problem at first.
Cons
2. Where to execute the sub-topology following the `Committer`.

 Topology Option2 `TmpFileWriter` + `Committer` + `MergerCoordinator` +
`MergeWriter` + AggWriter + Committer
Pros
1. Could skip the clean up problem at first
2. Decouple the GlobalCommitterV2 to `AggWriter` + `Committer`
Con
1. Where to execute the sub-topology following the `Committer`.

### Option3 FileWriter + MergeCoordinator + MergeFileWriter + Writer(Agg) +
Committer
Pro
1. There is only one committer. It is very easy to support in the batch
execution mode.
Con
2. It has to consider the cleanup problem. (In theory both the Option1 and
Option2 need to cleanup)


### Summary

>From above we could divide the sink topology into two parts:
1. Write topology.
2. And One committer

So we could provide a unified sink API looks like the following:

public interface Sink {
List> getWriters();
Committer createCommitter()
}

In the long run maybe we could give the user more powerful ability like
this (Currently some transformation still belongs to runtime):
Sink {
Transformation createWriteTopology();
 CommitFunction createCommitter();
}

Best,
Guowei


On Sun, Sep 20, 2020 at 6:09 PM Guowei Ma  wrote:

> Hi, Stevn
> I want to make a clarification first, the following reply only considers
> the Iceberge sink, but does not consider other sinks.  Before make decision
> we should consider all the sink.I would try to summary all the sink
> requirments in the next mail
>
>
> >>  run global committer in jobmanager (e.g. like sink coordinator)
>
> I think it could be.
>
>
> >> You meant GlobalCommit -> GlobalCommT, right?
>
> Yes. Thanks :)
>
>
> >> Is this called when restored from checkpoint/savepoint?
>
> Yes.
>
>
> >>Iceberg sink needs to do a dup check here on which GlobalCommT were
> committed and which weren't. Should it return the filtered/de-duped list of
> GlobalCommT?
>
>
> I think Iceberg sink needs to do the dedup in the `commit` call. The
> `recoveredGlobalCommittables` is just for restoring the ids.
>
>
> >> Sink implementation can decide if it wants to commit immediately or
> just leave
>
> I think only the frame knows *when* call the commit function.
>
>
> >>should this be "commit(List)"?
>
> It could be. thanks.
>
>
> Best,
> Guowei
>
>
> 

Re: [DISCUSS] FLIP-143: Unified Sink API

2020-09-20 Thread Guowei Ma
Hi, Stevn
I want to make a clarification first, the following reply only considers
the Iceberge sink, but does not consider other sinks.  Before make decision
we should consider all the sink.I would try to summary all the sink
requirments in the next mail


>>  run global committer in jobmanager (e.g. like sink coordinator)

I think it could be.


>> You meant GlobalCommit -> GlobalCommT, right?

Yes. Thanks :)


>> Is this called when restored from checkpoint/savepoint?

Yes.


>>Iceberg sink needs to do a dup check here on which GlobalCommT were
committed and which weren't. Should it return the filtered/de-duped list of
GlobalCommT?


I think Iceberg sink needs to do the dedup in the `commit` call. The
`recoveredGlobalCommittables` is just for restoring the ids.


>> Sink implementation can decide if it wants to commit immediately or just
leave

I think only the frame knows *when* call the commit function.


>>should this be "commit(List)"?

It could be. thanks.


Best,
Guowei


On Sun, Sep 20, 2020 at 12:11 AM Steven Wu  wrote:

> > I prefer to let the developer produce id to dedupe. I think this gives
> the developer more opportunity to optimize.
>
> Thinking about it again, I totally agree with Guowei on this. We don't
> really need the framework to generate the unique id for Iceberg sink.
> De-dup logic is totally internal to Iceberg sink and should be isolated
> inside. My earlier question regarding "commitGlobally(List)
> can be concurrent or not" also becomes irrelevant, as long as the framework
> handles the GlobalCommT list properly (even with concurrent calls).
>
> Here are the things where framework can help
>
>1. run global committer in jobmanager (e.g. like sink coordinator)
>2. help with checkpointing, bookkeeping, commit failure handling,
>recovery
>
>
> @Guowei Ma  regarding the GlobalCommitter
> interface, I have some clarifying questions.
>
> > void recoveredGlobalCommittables(List globalCommits)
>
>1. You meant GlobalCommit -> GlobalCommT, right?
>2. Is this called when restored from checkpoint/savepoint?
>3.  Iceberg sink needs to do a dup check here on which GlobalCommT
>were committed and which weren't. Should it return the filtered/de-duped
>list of GlobalCommT?
>4. Sink implementation can decide if it wants to commit immediately or
>just leave
>
> > void commit(GlobalCommit globalCommit);
>
> should this be "commit(List)"?
>
> Thanks,
> Steven
>
>
> On Sat, Sep 19, 2020 at 1:56 AM Guowei Ma  wrote:
>
>> Hi, all
>>
>> >>Just to add to what Aljoscha said regarding the unique id. Iceberg sink
>> >>checkpoints the unique id into state during snapshot. It also inserts
>> the
>> >>unique id into the Iceberg snapshot metadata during commit. When a job
>> >>restores the state after failure, it needs to know if the restored
>> >>transactions/commits were successful or not. It basically iterates
>> through
>> >>the list of table snapshots from Iceberg and matches the unique ids with
>> >>what is stored in Iceberg snapshot metadata.
>>
>> Thanks Steven for these detailed explanations. It makes me know the
>> IceBerg
>> better. However, I prefer to let the developer produce id to dedupe. I
>> think this gives the developer more opportunity to optimize. You could see
>> the following for more details. Please correct me if I misunderstand you.
>>
>> >> 3. Whether the `Writer` supports async functionality or not. Currently
>> I
>> do
>> >> not know which sink could benefit from it. Maybe it is just my own
>> problem.
>>
>> >> Here, I don't really know. We can introduce an "isAvailable()" method
>> >> and mostly ignore it for now and sinks can just always return true. Or,
>> >> as an alternative, we don't add the method now but can add it later
>> with
>> >> a default implementation. Either way, we will probably not take
>> >> advantage of the "isAvailable()" now because that would require more
>> >> runtime changes.
>>
>> From the @Pitor's explanation I could see the other benefit that might be
>> gained in the future. For example decoupling the task number and the
>> thread
>> number. But I have to admit that introducing `isAvailable` might introduce
>> some complications in the runtime. You could see my alternative API option
>> in the following. I believe that we could support such an async sink
>> writer
>> very easily in the future. What do you think?
>>
>> >> Ye

Re: [DISCUSS] FLIP-143: Unified Sink API

2020-09-19 Thread Guowei Ma
Hi, all

>>Just to add to what Aljoscha said regarding the unique id. Iceberg sink
>>checkpoints the unique id into state during snapshot. It also inserts the
>>unique id into the Iceberg snapshot metadata during commit. When a job
>>restores the state after failure, it needs to know if the restored
>>transactions/commits were successful or not. It basically iterates through
>>the list of table snapshots from Iceberg and matches the unique ids with
>>what is stored in Iceberg snapshot metadata.

Thanks Steven for these detailed explanations. It makes me know the IceBerg
better. However, I prefer to let the developer produce id to dedupe. I
think this gives the developer more opportunity to optimize. You could see
the following for more details. Please correct me if I misunderstand you.

>> 3. Whether the `Writer` supports async functionality or not. Currently I
do
>> not know which sink could benefit from it. Maybe it is just my own
problem.

>> Here, I don't really know. We can introduce an "isAvailable()" method
>> and mostly ignore it for now and sinks can just always return true. Or,
>> as an alternative, we don't add the method now but can add it later with
>> a default implementation. Either way, we will probably not take
>> advantage of the "isAvailable()" now because that would require more
>> runtime changes.

>From the @Pitor's explanation I could see the other benefit that might be
gained in the future. For example decoupling the task number and the thread
number. But I have to admit that introducing `isAvailable` might introduce
some complications in the runtime. You could see my alternative API option
in the following. I believe that we could support such an async sink writer
very easily in the future. What do you think?

>> Yes, this is still tricky. What is the current state, would the
>> introduction of a "LocalCommit" and a "GlobalCommit" already solve both
>> the Iceberg and Hive cases? I believe Hive is the most tricky one here,
>> but if we introduce the "combine" method on GlobalCommit, that could
>> serve the same purpose as the "aggregation operation" on the individual
>> files, and we could even execute that "combine" in a distributed way.
>>We assume that GlobalCommit is a Agg/Combiner?

I would share what possible problems that I am seeing currently and the
alternative options.

## IceBerg Sink

### Concern about generating nonce by framework.

If let the `GlobalCommitter` provide a random nonce for the `IceBergSink` I
think that it might not be efficient.  Because even if there are a very
small number of committables in the state you still need to iterate all the
iceberg snapshot files to check whether the committable is committed
already. Even if it is efficient for the IceBergSink it might not be the
case for other sinks.

If the framework generates auto-increment nonce instead, it might still not
be optimal for users. For example, users might want to use some business id
so that after failover they could query whether the commit is successful
after failover.

I think users could generate more efficient nonce such as an auto-increment
one. Therefore, it seems to provide more optimization chances if we let
users to generate the nonce.


### Alternative Option

public interface GlobalCommit {
// provide some runtime context such as attempt-id,job-id,task-id.
void open(InitContext context);

// This GlobalCommit would aggregate the committable to a
GlobalCommit before doing the commit operation.
GlobalCommT combine(List commitables)

// This method would be called after committing all the
GlobalCommit producing in the previous session.
void recoveredGlobalCommittables(List globalCommits)

// developer would guarantee the idempotency by himself
void commit(GlobalCommit globalCommit);
}

User could guarantee the idenpointecy himself in a more efficient or
application specific way. If the user wants the `GlobalCommit` to be
executed in a distributed way, the user could use the runtime information
to generate the partial order id himself.(We could ignore the clean up
first)

Currently the sink might be looks like following:

Sink {
Writer createWriter();
Optional> createCommitter();
Optional> createGlobalCommitter();
}

## Hive

The HiveSink needs to compute whether a directory is finished or not. But
HiveSink can not use the above `combine` method to decide whether a
directory is finished or not.

For example we assume that whether the directory is finished or not is
decided by the event time. There might be a topology that the source and
sink are forward. The event time might be different in different instances
of the `writer`. So the GlobalCommit’s combine can not produce a
GlobalCommT when the snapshot happens.

In addition to the above case we should also consider the unaligned
checkpoint. Because the watermark does not skip. So there might be the same
problem in the unaligned checkpoint.

###

Re: Re: [ANNOUNCE] Apache Flink 1.11.2 released

2020-09-18 Thread Guowei Ma
Thanks Zhuzhu for driving the release!!!

Best,
Guowei


On Fri, Sep 18, 2020 at 5:10 PM Yun Gao  wrote:

> Great! Very thanks @ZhuZhu for driving this and thanks for all contributed
> to the release!
>
> Best,
>  Yun
>
> --Original Mail --
> *Sender:*Jingsong Li 
> *Send Date:*Thu Sep 17 13:31:41 2020
> *Recipients:*user-zh 
> *CC:*dev , user , Apache
> Announce List 
> *Subject:*Re: [ANNOUNCE] Apache Flink 1.11.2 released
>
>> Thanks ZhuZhu for driving the release.
>>
>> Best,
>> Jingsong
>>
>> On Thu, Sep 17, 2020 at 1:29 PM Zhu Zhu  wrote:
>>
>>> The Apache Flink community is very happy to announce the release of
>>> Apache
>>> Flink 1.11.2, which is the second bugfix release for the Apache Flink
>>> 1.11
>>> series.
>>>
>>> Apache Flink® is an open-source stream processing framework for
>>> distributed, high-performing, always-available, and accurate data
>>> streaming
>>> applications.
>>>
>>> The release is available for download at:
>>> https://flink.apache.org/downloads.html
>>>
>>> Please check out the release blog post for an overview of the
>>> improvements
>>> for this bugfix release:
>>> https://flink.apache.org/news/2020/09/17/release-1.11.2.html
>>>
>>> The full release notes are available in Jira:
>>>
>>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12348575
>>>
>>> We would like to thank all contributors of the Apache Flink community who
>>> made this release possible!
>>>
>>> Thanks,
>>> Zhu
>>>
>>
>>
>> --
>> Best, Jingsong Lee
>>
>


Re: [DISCUSS] FLIP-143: Unified Sink API

2020-09-16 Thread Guowei Ma
Hi, Steven

I am not particularly sure whether to provide id in GlobalCommit.

But my understanding is: if the committer function is idempotent, the
framework can guarantee exactly once semantics in batch/stream execution
mode. But I think maybe the idempotence should be guaranteed by the sink
developer, not on the basic API.

We could  provide an id in GlobalCommit. But the following question would
be that: do we need to provide an id for a normal committable? I would like
to say that I prefer to make the committer single responsibility.

I think maybe we could have an answer when the first nonconsensual question
is resolved.

Aboving is just my personal opinion. I think this is still an open question.

Thank you again for your valuable and thoughtful response.

Best,
Guowei


On Thu, Sep 17, 2020 at 10:53 AM Steven Wu  wrote:

> Guowei, thanks a lot for the summary. Here are a couple more questions that
> need more clarification for the GlobalCommitter case.
>
> * framework provides some sort of unique id per GlobalCommT (e.g. nonce or
> some sort of transaction id)
> * commit failure handling. Should we roll over to the next cycle? if so, we
> may need commit(List )
>
> On Wed, Sep 16, 2020 at 2:11 AM Piotr Nowojski 
> wrote:
>
> > Hey
> >
> > Thanks Dawid for bringing up my suggestion :)
> >
> > > I'm not so sure about this, the sinks I'm aware of would not be able to
> > > implement this method: Kafka doesn't have this, I didn't see it in the
> > > Iceberg interfaces, and HDFS/S3 also don't have it.
> >
> > Aljoscha, as I wrote, FlinkKafkaProducer is actually one for which we
> > could do some magic. At the very least we could use
> > `FlinkKafkaProducer#pendingRecords` to make the sink unavailable when
> some
> > threshold is exceeded. Alternatively, maybe we could hook in to the
> > KafkaProducer's buffer state [1]:
> >
> > > The buffer.memory controls the total amount of memory available to the
> > producer for buffering.
> > > If records are sent faster than they can be transmitted to the server
> > then this buffer space will be exhausted.
> > > When the buffer space is exhausted additional send calls will block.
> >
> > As far as I can see, Kafka is exposing the `buffer-available-bytes`
> > metric, which we might use instead of `pendingRecords`. Heck, we are
> > already hacking KafkaProducer with reflections, we could access
> > `org.apache.kafka.clients.producer.KafkaProducer#accumulator` field to
> > call  `accumulator.bufferPoolAvailableMemory()` method, if metric would
> be
> > to expensive to check per every record.
> >
> > Furthermore, I'm pretty sure other sinks (maybe not all) provide similar
> > features. If we are desperate, we could always contribute something to
> > those systems to make them expose the internal buffer's state.
> >
> > If we are really desperate, we could provide a generic records handover
> > wrapper sink, that would have a buffer of N (5? 10? ) records and would
> be
> > handing over those records to the blocking sink running in another
> thread.
> > If the buffer is full, the sink would be unavailable.
> >
> > Guowei
> > > Does the sink's snapshot return immediately when the sink's status is
> > unavailable?
> >
> > State snapshot call is generally speaking non blocking already, so it
> > should not be an issue. If it's blocking and if it will be solving some
> > problem, we could later decide in the runtime code to not execute
> snapshot
> > calls if a sink is unavailable. Think about isAvailable more like a hint
> > from the operator to the runtime, which we can use to make better
> > decisions. Also take a look at the FLIP-27 sources (`SourceReader`),
> where
> > there already is `isAvailable()` method. It would be best if new sinks
> > would just duplicate the same contract.
> >
> > > For me I want to know is what specific sink will benefit from this
> > feature
> >
> > It's not the sinks that would benefit from this, but other parts of the
> > system. Currently task thread is blocked on backpressured Sink, it's
> > blocking some things from happening (checkpointing, closing, ...). If we
> > make sinks non blocking (as is the network stack in the most part and as
> > are the FLIP-27 sources), we will be able to snapshot state of the
> operator
> > immediately. For example, change from blocking to non blocking sources
> was
> > speeding up unaligned checkpoints from ~30seconds down to ~5seconds in
> > our benchmarks, but the difference can be even more profound (

Re: [DISCUSS] FLIP-143: Unified Sink API

2020-09-16 Thread Guowei Ma
ources was
> speeding up unaligned checkpoints from ~30seconds down to ~5seconds in
> our benchmarks, but the difference can be even more profound (hours instead
> of seconds/minutes as reported by some users).
>
> Piotrek
>
> [1]
>
> https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html
>
> śr., 16 wrz 2020 o 06:29 Guowei Ma  napisał(a):
>
> > Hi,all
> >
> > Thanks for all your valuable options and ideas.Currently there are many
> > topics in the mail. I try to summarize what is consensus and what is not.
> > Correct me if I am wrong.
> >
> > ## Consensus
> >
> > 1. The motivation of the unified sink API is to decouple the sink
> > implementation from the different runtime execution mode.
> > 2. The initial scope of the unified sink API only covers the file system
> > type, which supports the real transactions. The FLIP focuses more on the
> > semantics the new sink api should support.
> > 3. We prefer the first alternative API, which could give the framework a
> > greater opportunity to optimize.
> > 4. The `Writer` needs to add a method `prepareCommit`, which would be
> > called from `prepareSnapshotPreBarrier`. And remove the `Flush` method.
> > 5. The FLIP could move the `Snapshot & Drain` section in order to be more
> > focused.
> >
> > ## Not Consensus
> >
> > 1. What should the “Unified Sink API” support/cover? The API can
> > “unified”(decoupe) the commit operation in the term of supporting exactly
> > once semantics. However, even if we narrow down the initial supported
> > system to the file system there would be different topology requirements.
> > These requirements come from performance optimization
> > (IceBergSink/MergeHiveSink) or functionality(i.e. whether a bucket is
> > “finished”).  Should the unified sink API support these requirements?
> > 2. The API does not expose the checkpoint-id because the batch execution
> > mode does not have the normal checkpoint. But there still some
> > implementations depend on this.(IceBergSink uses this to do some dedupe).
> > I think how to support this requirement depends on the first open
> question.
> > 3. Whether the `Writer` supports async functionality or not. Currently I
> do
> > not know which sink could benefit from it. Maybe it is just my own
> problem.
> >
> > Best,
> > Guowei
> >
> >
> > On Wed, Sep 16, 2020 at 12:02 PM Guowei Ma  wrote:
> >
> > >
> > > Hi, Steven
> > > Thanks you for your thoughtful ideas and concerns.
> > >
> > > >>I still like the concept of grouping data files per checkpoint for
> > > streaming mode. it is cleaner and probably easier to manage and deal
> with
> > > commit failures. Plus, it >>can reduce dupes for the at least once
> > > >>mode.  I understand checkpoint is not an option for batch execution.
> We
> > > don't have to expose the checkpointId in API, as >>long as  the
> internal
> > > bookkeeping groups data files by checkpoints for streaming >>mode.
> > >
> > > I think this problem(How to dedupe the combined committed data) also
> > > depends on where to place the agg/combine logic .
> > >
> > > 1. If the agg/combine takes place in the “commit” maybe we need to
> figure
> > > out how to give the aggregated committable a unique and auto-increment
> id
> > > in the committer.
> > > 2. If the agg/combine takes place in a separate operator maybe sink
> > > developer could maintain the id itself by using the state.
> > >
> > > I think this problem is also decided by what the topology pattern the
> > sink
> > > API should support. Actually there are already many other topology
> > > requirements. :)
> > >
> > > Best,
> > > Guowei
> > >
> > >
> > > On Wed, Sep 16, 2020 at 7:46 AM Steven Wu 
> wrote:
> > >
> > >> > AFAIK the committer would not see the file-1-2 when ck1 happens in
> the
> > >> ExactlyOnce mode.
> > >>
> > >> @Guowei Ma  I think you are right for exactly
> > once
> > >> checkpoint semantics. what about "at least once"? I guess we can argue
> > that
> > >> it is fine to commit file-1-2 for at least once mode.
> > >>
> > >> I still like the concept of grouping data files per checkpoint for
> > >> streaming mode. it is cleaner and probably easier to manage and deal
> > with
> > >> commit failures. P

[jira] [Created] (FLINK-19261) Update document according to RestOptions

2020-09-16 Thread Guowei Ma (Jira)
Guowei Ma created FLINK-19261:
-

 Summary: Update document according to RestOptions
 Key: FLINK-19261
 URL: https://issues.apache.org/jira/browse/FLINK-19261
 Project: Flink
  Issue Type: Sub-task
  Components: Documentation
Reporter: Guowei Ma






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19260) Update documentation based on bin/flink output

2020-09-16 Thread Guowei Ma (Jira)
Guowei Ma created FLINK-19260:
-

 Summary: Update documentation based on bin/flink output
 Key: FLINK-19260
 URL: https://issues.apache.org/jira/browse/FLINK-19260
 Project: Flink
  Issue Type: Bug
  Components: Command Line Client, Documentation
Reporter: Guowei Ma






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: Re: [VOTE] FLIP-134: Batch execution for the DataStream API

2020-09-16 Thread Guowei Ma
+1 (non-binding)

Best,
Guowei


On Wed, Sep 16, 2020 at 3:03 PM Yun Gao 
wrote:

> +1 (non-binding)
>
> Very thanks for bring this up! And the FLIP is indeed necessary for stream
> & batch unification.
>
> --
> Sender:Dawid Wysakowicz
> Date:2020/09/16 15:01:08
> Recipient:; Aljoscha Krettek
> Theme:Re: [VOTE] FLIP-134: Batch execution for the DataStream API
>
> +1 (binding)
>
> On 14/09/2020 14:25, Aljoscha Krettek wrote:
> >
> > Hi all,
> >
> > After the discussion in [1], I would like to open a voting thread for
> > FLIP-134 (https://s.apache.org/FLIP-134) [2] which discusses a new
> > BATCH execution mode for the DataStream API.
> >
> > The vote will be open until September 17, unless there is an objection
> > or not enough votes.
> >
> > Regards,
> > Aljoscha
> >
> > [1]
> >
> https://lists.apache.org/thread.html/reb368f095ec13638b95cd5d885a0aa8e69af06d6e982a5f045f50022%40%3Cdev.flink.apache.org%3E
> > [2] https://cwiki.apache.org/confluence/x/4i94CQ
>
>
>


Re: [ANNOUNCE] New Apache Flink Committer - Niels Basjes

2020-09-15 Thread Guowei Ma
Congratulations :)

Best,
Guowei


On Tue, Sep 15, 2020 at 6:14 PM Matthias Pohl 
wrote:

> Congrats!
>
> Best,
> Matthias
>
> On Tue, Sep 15, 2020 at 9:26 AM Dawid Wysakowicz 
> wrote:
>
> > Welcome, Niels!
> >
> > Best,
> >
> > Dawid
> >
> > On 14/09/2020 11:22, Matt Wang wrote:
> > > Congratulations, Niels!
> > >
> > >
> > > --
> > >
> > > Best,
> > > Matt Wang
> > >
> > >
> > > On 09/14/2020 17:02,Konstantin Knauf wrote:
> > > Congratulations!
> > >
> > > On Mon, Sep 14, 2020 at 10:51 AM tison  wrote:
> > >
> > > Congrats!
> > >
> > > Best,
> > > tison.
> > >
> > >
> > > Aljoscha Krettek  于2020年9月14日周一 下午4:38写道:
> > >
> > > Congratulations! 💐
> > >
> > > Aljoscha
> > >
> > > On 14.09.20 10:37, Robert Metzger wrote:
> > > Hi all,
> > >
> > > On behalf of the PMC, I’m very happy to announce Niels Basjes as a new
> > > Flink committer.
> > >
> > > Niels has been an active community member since the early days of
> > > Flink,
> > > with 19 commits dating back until 2015.
> > > Besides his work on the code, he has been driving initiatives on dev@
> > > list,
> > > supporting users and giving talks at conferences.
> > >
> > > Please join me in congratulating Niels for becoming a Flink committer!
> > >
> > > Best,
> > > Robert Metzger
> > >
> > >
> > >
> > >
> > >
> > >
> > > --
> > >
> > > Konstantin Knauf | Head of Product
> > >
> > > +49 160 91394525
> > >
> > >
> > > Follow us @VervericaData Ververica 
> > >
> > >
> > > --
> > >
> > > Join Flink Forward  - The Apache Flink
> > > Conference
> > >
> > > Stream Processing | Event Driven | Real Time
> > >
> > > --
> > >
> > > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
> > >
> > > --
> > > Ververica GmbH
> > > Registered at Amtsgericht Charlottenburg: HRB 158244 B
> > > Managing Directors: Yip Park Tung Jason, Jinwei (Kevin) Zhang, Karl
> Anton
> > > Wehner
> >
>


Re: [ANNOUNCE] New Apache Flink Committer - Arvid Heise

2020-09-15 Thread Guowei Ma
Congratulations :)

Best,
Guowei


On Tue, Sep 15, 2020 at 6:41 PM 刘建刚  wrote:

> Congratulations!
>
> Best
>
> Matthias Pohl  于2020年9月15日周二 下午6:07写道:
>
> > Congratulations! ;-)
> >
> > On Tue, Sep 15, 2020 at 11:47 AM Xingbo Huang 
> wrote:
> >
> > > Congratulations!
> > >
> > > Best,
> > > Xingbo
> > >
> > > Igal Shilman  于2020年9月15日周二 下午5:44写道:
> > >
> > > > Congrats Arvid!
> > > >
> > > > On Tue, Sep 15, 2020 at 11:12 AM David Anderson <
> da...@alpinegizmo.com
> > >
> > > > wrote:
> > > >
> > > > > Congratulations, Arvid! Well deserved.
> > > > >
> > > > > Best,
> > > > > David
> > > > >
> > > > > On Tue, Sep 15, 2020 at 10:23 AM Paul Lam 
> > > wrote:
> > > > >
> > > > > > Congrats, Arvid!
> > > > > >
> > > > > > Best,
> > > > > > Paul Lam
> > > > > >
> > > > > > > 2020年9月15日 15:29,Jingsong Li  写道:
> > > > > > >
> > > > > > > Congratulations Arvid !
> > > > > > >
> > > > > > > Best,
> > > > > > > Jingsong
> > > > > > >
> > > > > > > On Tue, Sep 15, 2020 at 3:27 PM Dawid Wysakowicz <
> > > > > dwysakow...@apache.org
> > > > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > >> Congratulations Arvid! Very well deserved!
> > > > > > >>
> > > > > > >> Best,
> > > > > > >>
> > > > > > >> Dawid
> > > > > > >>
> > > > > > >> On 15/09/2020 04:38, Zhijiang wrote:
> > > > > > >>> Hi all,
> > > > > > >>>
> > > > > > >>> On behalf of the PMC, I’m very happy to announce Arvid Heise
> > as a
> > > > new
> > > > > > >> Flink committer.
> > > > > > >>>
> > > > > > >>> Arvid has been an active community member for more than a
> year,
> > > > with
> > > > > > 138
> > > > > > >> contributions including 116 commits, reviewed many PRs with
> good
> > > > > quality
> > > > > > >> comments.
> > > > > > >>> He is mainly working on the runtime scope, involved in
> critical
> > > > > > features
> > > > > > >> like task mailbox model and unaligned checkpoint, etc.
> > > > > > >>> Besides that, he was super active to reply questions in the
> > user
> > > > mail
> > > > > > >> list (34 emails in March, 51 emails in June, etc), also active
> > in
> > > > dev
> > > > > > mail
> > > > > > >> list and Jira issue discussions.
> > > > > > >>>
> > > > > > >>> Please join me in congratulating Arvid for becoming a Flink
> > > > > committer!
> > > > > > >>>
> > > > > > >>> Best,
> > > > > > >>> Zhijiang
> > > > > > >>
> > > > > > >>
> > > > > > >
> > > > > > > --
> > > > > > > Best, Jingsong Lee
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> >
> > --
> >
> > Matthias Pohl | Engineer
> >
> > Follow us @VervericaData Ververica 
> >
> > --
> >
> > Join Flink Forward  - The Apache Flink
> > Conference
> >
> > Stream Processing | Event Driven | Real Time
> >
> > --
> >
> > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
> >
> > --
> > Ververica GmbH
> > Registered at Amtsgericht Charlottenburg: HRB 158244 B
> > Managing Directors: Yip Park Tung Jason, Jinwei (Kevin) Zhang, Karl Anton
> > Wehner
> >
>


Re: [ANNOUNCE] New Apache Flink Committer - Yun Tang

2020-09-15 Thread Guowei Ma
Congratulations :)

Best,
Guowei


On Wed, Sep 16, 2020 at 11:54 AM Zhijiang
 wrote:

> Congratulations and welcome, Yun!
>
>
> --
> From:Jark Wu 
> Send Time:2020年9月16日(星期三) 11:35
> To:dev 
> Cc:tangyun ; Yun Tang 
> Subject:Re: [ANNOUNCE] New Apache Flink Committer - Yun Tang
>
> Congratulations Yun!
>
> On Wed, 16 Sep 2020 at 10:40, Rui Li  wrote:
>
> > Congratulations Yun!
> >
> > On Wed, Sep 16, 2020 at 10:20 AM Paul Lam  wrote:
> >
> > > Congrats, Yun! Well deserved!
> > >
> > > Best,
> > > Paul Lam
> > >
> > > > 2020年9月15日 19:14,Yang Wang  写道:
> > > >
> > > > Congratulations, Yun!
> > > >
> > > > Best,
> > > > Yang
> > > >
> > > > Leonard Xu  于2020年9月15日周二 下午7:11写道:
> > > >
> > > >> Congrats, Yun!
> > > >>
> > > >> Best,
> > > >> Leonard
> > > >>> 在 2020年9月15日,19:01,Yangze Guo  写道:
> > > >>>
> > > >>> Congrats, Yun!
> > > >>
> > > >>
> > >
> > >
> >
> > --
> > Best regards!
> > Rui Li
> >
>
>


Re: Re: [ANNOUNCE] New Apache Flink Committer - Igal Shilman

2020-09-15 Thread Guowei Ma
Congratulations :)
Best,
Guowei


On Wed, Sep 16, 2020 at 11:54 AM Zhijiang
 wrote:

> Congratulations and welcome, Igal!
>
>
> --
> From:Yun Gao 
> Send Time:2020年9月16日(星期三) 10:59
> To:Stephan Ewen ; dev 
> Subject:Re: Re: [ANNOUNCE] New Apache Flink Committer - Igal Shilman
>
> Congratulations Igal!
>
> Best,
>  Yun
>
>
>
>
>
>
> --
> Sender:Stephan Ewen
> Date:2020/09/15 22:48:30
> Recipient:dev
> Theme:Re: [ANNOUNCE] New Apache Flink Committer - Igal Shilman
>
> Welcome, Igal!
>
> On Tue, Sep 15, 2020 at 3:18 PM Seth Wiesman  wrote:
>
> > Congrats Igal!
> >
> > On Tue, Sep 15, 2020 at 7:13 AM Benchao Li  wrote:
> >
> > > Congratulations!
> > >
> > > Zhu Zhu  于2020年9月15日周二 下午6:51写道:
> > >
> > > > Congratulations, Igal!
> > > >
> > > > Thanks,
> > > > Zhu
> > > >
> > > > Rafi Aroch  于2020年9月15日周二 下午6:43写道:
> > > >
> > > > > Congratulations Igal! Well deserved!
> > > > >
> > > > > Rafi
> > > > >
> > > > >
> > > > > On Tue, Sep 15, 2020 at 11:14 AM Tzu-Li (Gordon) Tai <
> > > > tzuli...@apache.org>
> > > > > wrote:
> > > > >
> > > > > > Hi all,
> > > > > >
> > > > > > It's great seeing many new Flink committers recently, and to add
> to
> > > > that
> > > > > > I'd like to announce one more new committer: Igal Shilman!
> > > > > >
> > > > > > Igal has been a long time member of the community. You may very
> > > likely
> > > > > know
> > > > > > Igal from the Stateful Functions sub-project, as he was the
> > original
> > > > > author
> > > > > > of it before it was contributed to Flink.
> > > > > > Ever since StateFun was contributed to Flink, he has consistently
> > > > > > maintained the project and supported users in the mailing lists.
> > > > > > Before that, he had also helped tremendously in some work on
> > Flink's
> > > > > > serialization stack.
> > > > > >
> > > > > > Please join me in welcoming and congratulating Igal for becoming
> a
> > > > Flink
> > > > > > committer!
> > > > > >
> > > > > > Cheers,
> > > > > > Gordon
> > > > > >
> > > > >
> > > >
> > >
> > >
> > > --
> > >
> > > Best,
> > > Benchao Li
> > >
> >
>
>
>


Re: [ANNOUNCE] New Apache Flink Committer - Godfrey He

2020-09-15 Thread Guowei Ma
Congratulations :)

Best,
Guowei


On Wed, Sep 16, 2020 at 12:19 PM Jark Wu  wrote:

> Hi everyone,
>
> It's great seeing many new Flink committers recently, and on behalf of the
> PMC,
> I'd like to announce one more new committer: Godfrey He.
>
> Godfrey is a very long time contributor in the Flink community since the
> end of 2016.
> He has been a very active contributor in the Flink SQL component with 153
> PRs and more than 571,414 lines which is quite outstanding.
> Godfrey has paid essential effort with SQL optimization and helped a lot
> during the blink merging.
> Besides that, he is also quite active with community work especially in
> Chinese mailing list.
>
> Please join me in congratulating Godfrey for becoming a Flink committer!
>
> Cheers,
> Jark Wu
>


Re: [DISCUSS] FLIP-143: Unified Sink API

2020-09-15 Thread Guowei Ma
Hi,all

Thanks for all your valuable options and ideas.Currently there are many
topics in the mail. I try to summarize what is consensus and what is not.
Correct me if I am wrong.

## Consensus

1. The motivation of the unified sink API is to decouple the sink
implementation from the different runtime execution mode.
2. The initial scope of the unified sink API only covers the file system
type, which supports the real transactions. The FLIP focuses more on the
semantics the new sink api should support.
3. We prefer the first alternative API, which could give the framework a
greater opportunity to optimize.
4. The `Writer` needs to add a method `prepareCommit`, which would be
called from `prepareSnapshotPreBarrier`. And remove the `Flush` method.
5. The FLIP could move the `Snapshot & Drain` section in order to be more
focused.

## Not Consensus

1. What should the “Unified Sink API” support/cover? The API can
“unified”(decoupe) the commit operation in the term of supporting exactly
once semantics. However, even if we narrow down the initial supported
system to the file system there would be different topology requirements.
These requirements come from performance optimization
(IceBergSink/MergeHiveSink) or functionality(i.e. whether a bucket is
“finished”).  Should the unified sink API support these requirements?
2. The API does not expose the checkpoint-id because the batch execution
mode does not have the normal checkpoint. But there still some
implementations depend on this.(IceBergSink uses this to do some dedupe).
I think how to support this requirement depends on the first open question.
3. Whether the `Writer` supports async functionality or not. Currently I do
not know which sink could benefit from it. Maybe it is just my own problem.

Best,
Guowei


On Wed, Sep 16, 2020 at 12:02 PM Guowei Ma  wrote:

>
> Hi, Steven
> Thanks you for your thoughtful ideas and concerns.
>
> >>I still like the concept of grouping data files per checkpoint for
> streaming mode. it is cleaner and probably easier to manage and deal with
> commit failures. Plus, it >>can reduce dupes for the at least once
> >>mode.  I understand checkpoint is not an option for batch execution. We
> don't have to expose the checkpointId in API, as >>long as  the internal
> bookkeeping groups data files by checkpoints for streaming >>mode.
>
> I think this problem(How to dedupe the combined committed data) also
> depends on where to place the agg/combine logic .
>
> 1. If the agg/combine takes place in the “commit” maybe we need to figure
> out how to give the aggregated committable a unique and auto-increment id
> in the committer.
> 2. If the agg/combine takes place in a separate operator maybe sink
> developer could maintain the id itself by using the state.
>
> I think this problem is also decided by what the topology pattern the sink
> API should support. Actually there are already many other topology
> requirements. :)
>
> Best,
> Guowei
>
>
> On Wed, Sep 16, 2020 at 7:46 AM Steven Wu  wrote:
>
>> > AFAIK the committer would not see the file-1-2 when ck1 happens in the
>> ExactlyOnce mode.
>>
>> @Guowei Ma  I think you are right for exactly once
>> checkpoint semantics. what about "at least once"? I guess we can argue that
>> it is fine to commit file-1-2 for at least once mode.
>>
>> I still like the concept of grouping data files per checkpoint for
>> streaming mode. it is cleaner and probably easier to manage and deal with
>> commit failures. Plus, it can reduce dupes for the at least once mode.  I
>> understand checkpoint is not an option for batch execution. We don't have
>> to expose the checkpointId in API, as long as  the internal bookkeeping
>> groups data files by checkpoints for streaming mode.
>>
>>
>> On Tue, Sep 15, 2020 at 6:58 AM Steven Wu  wrote:
>>
>>> > images don't make it through to the mailing lists. You would need to
>>> host the file somewhere and send a link.
>>>
>>> Sorry about that. Here is the sample DAG in google drawings.
>>>
>>> https://docs.google.com/drawings/d/1-P8F2jF9RG9HHTtAfWEBRuU_2uV9aDTdqEt5dLs2JPk/edit?usp=sharing
>>>
>>>
>>> On Tue, Sep 15, 2020 at 4:58 AM Guowei Ma  wrote:
>>>
>>>> Hi, Dawid
>>>>
>>>> >>I still find the merging case the most confusing. I don't necessarily
>>>> understand why do you need the "SingleFileCommit" step in this scenario.
>>>> The way I
>>>> >> understand "commit" operation is that it makes some data/artifacts
>>>> visible to the external system, thus it should be immutable from a
>>>> poin

Re: [DISCUSS] FLIP-143: Unified Sink API

2020-09-15 Thread Guowei Ma
Hi, Steven
Thanks you for your thoughtful ideas and concerns.

>>I still like the concept of grouping data files per checkpoint for
streaming mode. it is cleaner and probably easier to manage and deal with
commit failures. Plus, it >>can reduce dupes for the at least once
>>mode.  I understand checkpoint is not an option for batch execution. We
don't have to expose the checkpointId in API, as >>long as  the internal
bookkeeping groups data files by checkpoints for streaming >>mode.

I think this problem(How to dedupe the combined committed data) also
depends on where to place the agg/combine logic .

1. If the agg/combine takes place in the “commit” maybe we need to figure
out how to give the aggregated committable a unique and auto-increment id
in the committer.
2. If the agg/combine takes place in a separate operator maybe sink
developer could maintain the id itself by using the state.

I think this problem is also decided by what the topology pattern the sink
API should support. Actually there are already many other topology
requirements. :)

Best,
Guowei


On Wed, Sep 16, 2020 at 7:46 AM Steven Wu  wrote:

> > AFAIK the committer would not see the file-1-2 when ck1 happens in the
> ExactlyOnce mode.
>
> @Guowei Ma  I think you are right for exactly once
> checkpoint semantics. what about "at least once"? I guess we can argue that
> it is fine to commit file-1-2 for at least once mode.
>
> I still like the concept of grouping data files per checkpoint for
> streaming mode. it is cleaner and probably easier to manage and deal with
> commit failures. Plus, it can reduce dupes for the at least once mode.  I
> understand checkpoint is not an option for batch execution. We don't have
> to expose the checkpointId in API, as long as  the internal bookkeeping
> groups data files by checkpoints for streaming mode.
>
>
> On Tue, Sep 15, 2020 at 6:58 AM Steven Wu  wrote:
>
>> > images don't make it through to the mailing lists. You would need to
>> host the file somewhere and send a link.
>>
>> Sorry about that. Here is the sample DAG in google drawings.
>>
>> https://docs.google.com/drawings/d/1-P8F2jF9RG9HHTtAfWEBRuU_2uV9aDTdqEt5dLs2JPk/edit?usp=sharing
>>
>>
>> On Tue, Sep 15, 2020 at 4:58 AM Guowei Ma  wrote:
>>
>>> Hi, Dawid
>>>
>>> >>I still find the merging case the most confusing. I don't necessarily
>>> understand why do you need the "SingleFileCommit" step in this scenario.
>>> The way I
>>> >> understand "commit" operation is that it makes some data/artifacts
>>> visible to the external system, thus it should be immutable from a point
>>> of
>>> view of a single >>process. Having an additional step in the same process
>>> that works on committed data contradicts with those assumptions. I might
>>> be
>>> missing something though. >> Could you elaborate >why can't it be
>>> something
>>> like FileWriter -> FileMergeWriter -> Committer (either global or
>>> non-global)? Again it might be just me not getting the example.
>>>
>>> I think you are right. The topology
>>> "FileWriter->FileMergeWriter->Committer" could meet the merge
>>> requirement.
>>> The topology "FileWriter-> SingleFileCommitter -> FileMergeWriter ->
>>> GlobalCommitter" reuses some code of the StreamingFileSink(For example
>>> rolling policy) so it has the "SingleFileCommitter" in the topology. In
>>> general I want to use the case to show that there are different
>>> topologies
>>> according to the requirements.
>>>
>>> BTW: IIRC, @Jingsong Lee  telled me that the
>>> actual topology of merged supported HiveSink is more complicated than
>>> that.
>>>
>>>
>>> >> I've just briefly skimmed over the proposed interfaces. I would
>>> suggest
>>> one
>>> >> addition to the Writer interface (as I understand this is the runtime
>>> >> interface in this proposal?): add some availability method, to avoid,
>>> if
>>> >> possible, blocking calls on the sink. We already have similar
>>> >> availability methods in the new sources [1] and in various places in
>>> the
>>> >> network stack [2].
>>> >> BTW Let's not forget about Piotr's comment. I think we could add the
>>> isAvailable or similar method to the Writer interface in the FLIP.
>>>
>>> Thanks @Dawid Wysakowicz   for your reminder.
>>> There
>>&g

Re: [DISCUSS] FLIP-143: Unified Sink API

2020-09-15 Thread Guowei Ma
Hi, Dawid

>>I still find the merging case the most confusing. I don't necessarily
understand why do you need the "SingleFileCommit" step in this scenario.
The way I
>> understand "commit" operation is that it makes some data/artifacts
visible to the external system, thus it should be immutable from a point of
view of a single >>process. Having an additional step in the same process
that works on committed data contradicts with those assumptions. I might be
missing something though. >> Could you elaborate >why can't it be something
like FileWriter -> FileMergeWriter -> Committer (either global or
non-global)? Again it might be just me not getting the example.

I think you are right. The topology
"FileWriter->FileMergeWriter->Committer" could meet the merge requirement.
The topology "FileWriter-> SingleFileCommitter -> FileMergeWriter ->
GlobalCommitter" reuses some code of the StreamingFileSink(For example
rolling policy) so it has the "SingleFileCommitter" in the topology. In
general I want to use the case to show that there are different topologies
according to the requirements.

BTW: IIRC, @Jingsong Lee  telled me that the
actual topology of merged supported HiveSink is more complicated than that.


>> I've just briefly skimmed over the proposed interfaces. I would suggest
one
>> addition to the Writer interface (as I understand this is the runtime
>> interface in this proposal?): add some availability method, to avoid, if
>> possible, blocking calls on the sink. We already have similar
>> availability methods in the new sources [1] and in various places in the
>> network stack [2].
>> BTW Let's not forget about Piotr's comment. I think we could add the
isAvailable or similar method to the Writer interface in the FLIP.

Thanks @Dawid Wysakowicz   for your reminder. There
are two many issues at the same time.

In addition to what Ajjoscha said : there is very little system support
it.   Another thing I worry about is that: Does the sink's snapshot return
immediately when the sink's status is unavailable? Maybe we could do it by
dedupe some element in the state but I think it might be too complicated.
For me I want to know is what specific sink will benefit from this
feature.  @piotr   Please correct me if  I
misunderstand you. thanks.

Best,
Guowei


On Tue, Sep 15, 2020 at 3:55 PM Dawid Wysakowicz 
wrote:

> What I understand is that HiveSink's implementation might need the local
> committer(FileCommitter) because the file rename is needed.
> But the iceberg only needs to write the manifest file.  Would you like to
> enlighten me why the Iceberg needs the local committer?
> Thanks
>
> Sorry if I caused a confusion here. I am not saying the Iceberg sink needs
> a local committer. What I had in mind is that prior to the Iceberg example
> I did not see a need for a "GlobalCommitter" in the streaming case. I
> thought it is always enough to have the "normal" committer in that case.
> Now I understand that this differentiation is not really about logical
> separation. It is not really about the granularity with which we commit,
> i.e. answering the "WHAT" question. It is really about the performance and
> that in the end we will have a single "transaction", so it is about
> answering the question "HOW".
>
>
>-
>
>Commit a directory with merged files(Some user want to merge the files
>in a directory before committing the directory to Hive meta store)
>
>
>1.
>
>FileWriter -> SingleFileCommit -> FileMergeWriter  -> GlobalCommitter
>
> I still find the merging case the most confusing. I don't necessarily
> understand why do you need the "SingleFileCommit" step in this scenario.
> The way I understand "commit" operation is that it makes some
> data/artifacts visible to the external system, thus it should be immutable
> from a point of view of a single process. Having an additional step in the
> same process that works on committed data contradicts with those
> assumptions. I might be missing something though. Could you elaborate why
> can't it be something like FileWriter -> FileMergeWriter -> Committer
> (either global or non-global)? Again it might be just me not getting the
> example.
>
> I've just briefly skimmed over the proposed interfaces. I would suggest one
> addition to the Writer interface (as I understand this is the runtime
> interface in this proposal?): add some availability method, to avoid, if
> possible, blocking calls on the sink. We already have similar
> availability methods in the new sources [1] and in various places in the
> network stack [2].
>
> BTW Let&

Re: [DISCUSS] FLIP-143: Unified Sink API

2020-09-14 Thread Guowei Ma
>> I would think that we only need flush() and the semantics are that it
>> prepares for a commit, so on a physical level it would be called from
>> "prepareSnapshotPreBarrier". Now that I'm thinking about it more I
>> think flush() should be renamed to something like "prepareCommit()".

> Generally speaking it is a good point that emitting the committables
> should happen before emitting the checkpoint barrier downstream.
> However, if I remember offline discussions well, the idea behind
> Writer#flush and Writer#snapshotState was to differentiate commit on
> checkpoint vs final checkpoint at the end of the job. Both of these
> methods could emit committables, but the flush should not leave any in
> progress state (e.g. in case of file sink in STREAM mode, in
> snapshotState it could leave some open files that would be committed in
> a subsequent cycle, however flush should close all files). The
> snapshotState as it is now can not be called in
> prepareSnapshotPreBarrier as it can store some state, which should
> happen in Operator#snapshotState as otherwise it would always be
> synchronous. Therefore I think we would need sth like:

> void prepareCommit(boolean flush, WriterOutput output);

> ver 1:

> List snapshotState();

> ver 2:

> void snapshotState(); // not sure if we need that method at all in option
2

I second Dawid's proposal. This is a valid scenario. And version2 does not
need the snapshotState() any more.

>> The Committer is as described in the FLIP, it's basically a function
>> "void commit(Committable)". The GobalCommitter would be a function "void
>> commit(List)". The former would be used by an S3 sink where
>> we can individually commit files to S3, a committable would be the list
>> of part uploads that will form the final file and the commit operation
>> creates the metadata in S3. The latter would be used by something like
>> Iceberg where the Committer needs a global view of all the commits to be
>> efficient and not overwhelm the system.
>>
>> I don't know yet if sinks would only implement on type of commit
>> function or potentially both at the same time, and maybe Commit can
>> return some CommitResult that gets shipped to the GlobalCommit function.
>> I must admit it I did not get the need for Local/Normal + Global
>> committer at first. The Iceberg example helped a lot. I think it makes a
>> lot of sense.

@Dawid
What I understand is that HiveSink's implementation might need the local
committer(FileCommitter) because the file rename is needed.
But the iceberg only needs to write the manifest file.  Would you like to
enlighten me why the Iceberg needs the local committer?
Thanks

Best,
Guowei


On Mon, Sep 14, 2020 at 11:19 PM Dawid Wysakowicz 
wrote:

> Hi all,
>
> > I would think that we only need flush() and the semantics are that it
> > prepares for a commit, so on a physical level it would be called from
> > "prepareSnapshotPreBarrier". Now that I'm thinking about it more I
> > think flush() should be renamed to something like "prepareCommit()".
>
> Generally speaking it is a good point that emitting the committables
> should happen before emitting the checkpoint barrier downstream.
> However, if I remember offline discussions well, the idea behind
> Writer#flush and Writer#snapshotState was to differentiate commit on
> checkpoint vs final checkpoint at the end of the job. Both of these
> methods could emit committables, but the flush should not leave any in
> progress state (e.g. in case of file sink in STREAM mode, in
> snapshotState it could leave some open files that would be committed in
> a subsequent cycle, however flush should close all files). The
> snapshotState as it is now can not be called in
> prepareSnapshotPreBarrier as it can store some state, which should
> happen in Operator#snapshotState as otherwise it would always be
> synchronous. Therefore I think we would need sth like:
>
> void prepareCommit(boolean flush, WriterOutput output);
>
> ver 1:
>
> List snapshotState();
>
> ver 2:
>
> void snapshotState(); // not sure if we need that method at all in option 2
>
> > The Committer is as described in the FLIP, it's basically a function
> > "void commit(Committable)". The GobalCommitter would be a function "void
> > commit(List)". The former would be used by an S3 sink where
> > we can individually commit files to S3, a committable would be the list
> > of part uploads that will form the final file and the commit operation
> > creates the metadata in S3. The latter would be used by something like
> > Iceberg where the Committer needs a global view of all the commits to be
> > efficient and not overwhelm the system.
> >
> > I don't know yet if sinks would only implement on type of commit
> > function or potentially both at the same time, and maybe Commit can
> > return some CommitResult that gets shipped to the GlobalCommit function.
> I must admit it I did not get the need for Local/Normal + Global
> committer at first. The Iceberg example helped a lot. 

Re: [DISCUSS] FLIP-143: Unified Sink API

2020-09-14 Thread Guowei Ma
Hi, aljoscha

>I don't understand why we need the "Drain and Snapshot" section. It
>seems to be some details about stop-with-savepoint and drain, and the
>relation to BATCH execution but I don't know if it is needed to
>understand the rest of the document. I'm happy to be wrong here, though,
>if there's good reasons for the section.

The new unified sink API should provide a way for the sink developer to
deal with EOI(Drain) to guarantee the Exactly-once semantics. This is what
I want to say mostly in this section. Current streaming style sink API does
not provide a good way to deal with it. It is why the `StreamingFileSink`
does not commit the last part of data in the bounded scenario. Our theme is
unified. I am afraid that I will let users misunderstand that adding this
requirement to the new sink API is only for bounded scenarios, so I
explained in this paragraph that stop-with-savepoint might also have the
similar requirement.

For the snapshot I also want to prevent users from misunderstanding that it
is specially prepared for the unbounded scenario. Actually it might be also
possible with bounded + batch execution mode in the future.

I could reorganize the section if this section makes the reader confused
but I think we might need to keep the drain at least. WDYT?

>On the question of Alternative 1 and 2, I have a strong preference for
>Alternative 1 because we could avoid strong coupling to other modules.
>With Alternative 2 we would depend on `flink-streaming-java` and even
>`flink-runtime`. For the new source API (FLIP-27) we managed to keep the
>dependencies slim and the code is in flink-core. I'd be very happy if we
>can manage the same for the new sink API.

I am open to alternative 1. Maybe I miss something but I do not get why the
second alternative would depend on `flink-runtime` or
`flink-streaming-java`. The all the state api currently is in the
flink-core. Could you give some further explanation?  thanks :)

Best,
Guowei


On Tue, Sep 15, 2020 at 12:05 PM Guowei Ma  wrote:

> ## Concurrent checkpoints
> AFAIK the committer would not see the file-1-2 when ck1 happens in the
> ExactlyOnce mode.
>
> ## Committable bookkeeping and combining
>
> I agree with you that the "CombineGlobalCommitter" would work. But we put
> more optimization logic in the committer, which will make the committer
> more and more complicated, and eventually become the same as the
> Writer. For example, The committer needs to clean up some unused manifest
> file when restoring from a failure if we introduce the optimizations to the
> committer.
>
> In this case another alternative might be to put this "merging"
> optimization to a separate agg operator(maybe just like another `Writer`?).
> The agg could produce an aggregated committable to the committer. The agg
> operator could manage the whole life cycle of the manifest file it created.
> It would make the committer have single responsibility.
>
> >>The main question is if this pattern is generic to be put into the sink
> framework or not.
> Maybe I am wrong. But what I can feel from the current discussion is that
> different requirements have different topological requirements.
>
> ## Using checkpointId
> In the batch execution mode there would be no normal checkpoint any more.
> That is why we do not introduce the checkpoint id in the API. So it is a
> great thing that sink decouples its implementation from checkpointid. :)
>
> Best,
> Guowei
>
>
> On Tue, Sep 15, 2020 at 7:33 AM Steven Wu  wrote:
>
>>
>> ## concurrent checkpoints
>>
>> @Aljoscha Krettek  regarding the concurrent
>> checkpoints, let me illustrate with a simple DAG below.
>> [image: image.png]
>>
>> Let's assume each writer emits one file per checkpoint cycle and *writer-2
>> is slow*. Now let's look at what the global committer receives
>>
>> timeline:
>> --> Now
>> from Writer-1:  file-1-1, ck-1, file-1-2, ck-2
>> from Writer-2:
>> file-2-1, ck-1
>>
>> In this case, the committer shouldn't include "file-1-2" into the commit
>> for ck-1.
>>
>> ## Committable bookkeeping and combining
>>
>> I like David's proposal where the framework takes care of the
>> bookkeeping of committables and provides a combiner API (CommT ->
>> GlobalCommT) for GlobalCommitter. The only requirement is to tie the
>> commit/CommT/GlobalCommT to a checkpoint.
>>
>> When a commit is successful for checkpoint-N, the framework needs to
>> remove the GlobalCommT from the state corresponding to checkpoints <= N. If
>> a commit fails, the GlobalCommT accumulates a

Re: [DISCUSS] FLIP-143: Unified Sink API

2020-09-14 Thread Guowei Ma
## Concurrent checkpoints
AFAIK the committer would not see the file-1-2 when ck1 happens in the
ExactlyOnce mode.

## Committable bookkeeping and combining

I agree with you that the "CombineGlobalCommitter" would work. But we put
more optimization logic in the committer, which will make the committer
more and more complicated, and eventually become the same as the
Writer. For example, The committer needs to clean up some unused manifest
file when restoring from a failure if we introduce the optimizations to the
committer.

In this case another alternative might be to put this "merging"
optimization to a separate agg operator(maybe just like another `Writer`?).
The agg could produce an aggregated committable to the committer. The agg
operator could manage the whole life cycle of the manifest file it created.
It would make the committer have single responsibility.

>>The main question is if this pattern is generic to be put into the sink
framework or not.
Maybe I am wrong. But what I can feel from the current discussion is that
different requirements have different topological requirements.

## Using checkpointId
In the batch execution mode there would be no normal checkpoint any more.
That is why we do not introduce the checkpoint id in the API. So it is a
great thing that sink decouples its implementation from checkpointid. :)

Best,
Guowei


On Tue, Sep 15, 2020 at 7:33 AM Steven Wu  wrote:

>
> ## concurrent checkpoints
>
> @Aljoscha Krettek  regarding the concurrent
> checkpoints, let me illustrate with a simple DAG below.
> [image: image.png]
>
> Let's assume each writer emits one file per checkpoint cycle and *writer-2
> is slow*. Now let's look at what the global committer receives
>
> timeline:
> --> Now
> from Writer-1:  file-1-1, ck-1, file-1-2, ck-2
> from Writer-2:
> file-2-1, ck-1
>
> In this case, the committer shouldn't include "file-1-2" into the commit
> for ck-1.
>
> ## Committable bookkeeping and combining
>
> I like David's proposal where the framework takes care of the
> bookkeeping of committables and provides a combiner API (CommT ->
> GlobalCommT) for GlobalCommitter. The only requirement is to tie the
> commit/CommT/GlobalCommT to a checkpoint.
>
> When a commit is successful for checkpoint-N, the framework needs to
> remove the GlobalCommT from the state corresponding to checkpoints <= N. If
> a commit fails, the GlobalCommT accumulates and will be included in the
> next cycle. That is how the Iceberg sink works. I think it is good to
> piggyback retries with Flink's periodic checkpoints for Iceberg sink.
> Otherwise, it can get complicated to implement retry logic that won't
> interfere with Flink checkpoints.
>
> The main question is if this pattern is generic to be put into the sink
> framework or not.
>
> > A alternative topology option for the IcebergSink might be :
> DataFileWriter
> -> Agg -> GlobalCommitter. One pro of this method is that we can let Agg
> take care of the cleanup instead of coupling the cleanup logic to the
> committer.
>
> @Guowei Ma  I would favor David's suggestion of
> "combine" API rather than a separate "Agg" operator.
>
> ## Using checkpointId
>
> > I think this can have some problems, for example when checkpoint ids are
> not strictly sequential, when we wrap around, or when the JobID changes.
> This will happen when doing a stop/start-from-savepoint cycle, for example.
>
> checkpointId can work if it is monotonically increasing, which I believe
> is the case for Flink today. Restoring from checkpoint or savepoint will
> resume the checkpointIds.
>
> We can deal with JobID change by saving it into the state and Iceberg
> snapshot metadata. There is already a PR [1] for that.
>
> ## Nonce
>
> > Flink provide a nonce to the GlobalCommitter where Flink guarantees that
> this nonce is unique
>
> That is actually how we implemented internally. Flink Iceberg sink
> basically hashes the Manifest file location as the nonce. Since the Flink
> generated Manifest file location is unique, it  guarantees the nonce is
> unique.
>
> IMO, checkpointId is also one way of implementing Nonce based on today's
> Flink behavior.
>
> > and will not change for repeated invocations of the GlobalCommitter with
> the same set of committables
>
>  if the same set of committables are combined into one GlobalCommT (like
> ManifestFile in Iceberg), then the Nonce could be part of the GlobalCommT
> interface.
>
> BTW, as David pointed out, the ManifestFile optimization is only in our
> internal implementation [2] right now. For the open source version, t

Re: [DISCUSS] FLIP-143: Unified Sink API

2020-09-14 Thread Guowei Ma
Hi all,


Very thanks for the discussion and the valuable opinions! Currently there
are several ongoing issues and we would like to show what we are thinking
in the next few mails.

It seems that the biggest issue now is about the topology of the sinks.
Before deciding what the sink API would look like, I would like to first
summarize the different topologies we have mentioned so that we could sync
on the same page and gain more insights about this issue. There are four
types of topology I could see. Please correct me if I misunderstand what
you mean:

   1.

   Commit individual files. (StreamingFileSink)
   1.

  FileWriter -> FileCommitter
  2.

   Commit a directory (HiveSink)
   1.

  FileWriter -> FileCommitter -> GlobalCommitter
  3.

   Commit a bundle of files (Iceberg)
   1.

  DataFileWriter  -> GlobalCommitter
  4.

   Commit a directory with merged files(Some user want to merge the files
   in a directory before committing the directory to Hive meta store)
   1.

  FileWriter -> SingleFileCommit -> FileMergeWriter  -> GlobalCommitter


It can be seen from the above that the topologies are different according
to different requirements. Not only that there may be other options for the
second and third categories. E.g

A alternative topology option for the IcebergSink might be : DataFileWriter
-> Agg -> GlobalCommitter. One pro of this method is that we can let Agg
take care of the cleanup instead of coupling the cleanup logic to the
committer.


In the long run I think we might provide the sink developer the ability to
build arbitrary topologies. Maybe Flink could only provide a basic commit
transformation and let the user build other parts of the topology. In the
1.12 we might first provide different patterns for these different
scenarios at first and I think these components could be reused in the
future.

Best,
Guowei


On Mon, Sep 14, 2020 at 11:19 PM Dawid Wysakowicz 
wrote:

> Hi all,
>
> > I would think that we only need flush() and the semantics are that it
> > prepares for a commit, so on a physical level it would be called from
> > "prepareSnapshotPreBarrier". Now that I'm thinking about it more I
> > think flush() should be renamed to something like "prepareCommit()".
>
> Generally speaking it is a good point that emitting the committables
> should happen before emitting the checkpoint barrier downstream.
> However, if I remember offline discussions well, the idea behind
> Writer#flush and Writer#snapshotState was to differentiate commit on
> checkpoint vs final checkpoint at the end of the job. Both of these
> methods could emit committables, but the flush should not leave any in
> progress state (e.g. in case of file sink in STREAM mode, in
> snapshotState it could leave some open files that would be committed in
> a subsequent cycle, however flush should close all files). The
> snapshotState as it is now can not be called in
> prepareSnapshotPreBarrier as it can store some state, which should
> happen in Operator#snapshotState as otherwise it would always be
> synchronous. Therefore I think we would need sth like:
>
> void prepareCommit(boolean flush, WriterOutput output);
>
> ver 1:
>
> List snapshotState();
>
> ver 2:
>
> void snapshotState(); // not sure if we need that method at all in option 2
>
> > The Committer is as described in the FLIP, it's basically a function
> > "void commit(Committable)". The GobalCommitter would be a function "void
> > commit(List)". The former would be used by an S3 sink where
> > we can individually commit files to S3, a committable would be the list
> > of part uploads that will form the final file and the commit operation
> > creates the metadata in S3. The latter would be used by something like
> > Iceberg where the Committer needs a global view of all the commits to be
> > efficient and not overwhelm the system.
> >
> > I don't know yet if sinks would only implement on type of commit
> > function or potentially both at the same time, and maybe Commit can
> > return some CommitResult that gets shipped to the GlobalCommit function.
> I must admit it I did not get the need for Local/Normal + Global
> committer at first. The Iceberg example helped a lot. I think it makes a
> lot of sense.
>
> > For Iceberg, writers don't need any state. But the GlobalCommitter
> > needs to
> > checkpoint StateT. For the committer, CommT is "DataFile". Since a single
> > committer can collect thousands (or more) data files in one checkpoint
> > cycle, as an optimization we checkpoint a single "ManifestFile" (for the
> > collected thousands data files) as StateT. This allows us to absorb
> > extended commit outages without losing written/uploaded data files, as
> > operator state size is as small as one manifest file per checkpoint cycle
> > [2].
> > --
> > StateT snapshotState(SnapshotContext context) throws Exception;
> >
> > That means we also need the restoreCommitter API in the Sink interface
> > ---
> > C

[DISCUSS] FLIP-143: Unified Sink API

2020-09-10 Thread Guowei Ma
Hi, devs & users

As discussed in FLIP-131[1], Flink will deprecate the DataSet API in favor
of DataStream API and Table API. Users should be able to use DataStream API
to write jobs that support both bounded and unbounded execution modes.
However, Flink does not provide a sink API to guarantee the Exactly-once
semantics in both bounded and unbounded scenarios, which blocks the
unification.

So we want to introduce a new unified sink API which could let the user
develop the sink once and run it everywhere. You could find more details in
FLIP-143[2].

The FLIP contains some open questions that I'd really appreciate inputs
from the community. Some of the open questions include:

   1. We provide two alternative Sink API in the FLIP. The only difference
   between the two versions is how to expose the state to the user. We want to
   know which one is your preference?
   2. How does the sink API support to write to the Hive?
   3. Is the sink an operator or a topology?

[1]
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741
[2]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API

Best,
Guowei


Re: [VOTE] FLIP-131: Consolidate the user-facing Dataflow SDKs/APIs (and deprecate the DataSet API)

2020-09-03 Thread Guowei Ma
+1
Looking forward to having a unified datastream api.
Best,
Guowei


On Thu, Sep 3, 2020 at 3:46 PM Dawid Wysakowicz 
wrote:

> +1
>
> I think it gives a clear idea why we should deprecate and eventually
> remove the DataSet API.
>
> Best,
>
> Dawid
>
> On 03/09/2020 09:37, Yun Gao wrote:
> > Very thanks for bring this up!  +1 for deprecating the DataSet API and
> providing a unified streaming/batch programming model to users.
> >
> > Best,
> >  Yun
> >
> >
> > --
> > Sender:Aljoscha Krettek
> > Date:2020/09/02 19:22:51
> > Recipient:Flink Dev
> > Theme:[VOTE] FLIP-131: Consolidate the user-facing Dataflow SDKs/APIs
> (and deprecate the DataSet API)
> >
> > Hi all,
> >
> > After the discussion in [1], I would like to open a voting thread for
> > FLIP-131 (https://s.apache.org/FLIP-131) [2] which discusses the
> > deprecation of the DataSet API and future work on the DataStream API and
> > Table API for bounded (batch) execution.
> >
> > The vote will be open until September 7 (72h + weekend), unless there is
> > an objection or not enough votes.
> >
> > Regards,
> > Aljoscha
> >
> > [1]
> >
> https://lists.apache.org/thread.html/r4f24c4312cef7270a1349c39b89fb1184c84065944b43aedf9cfba6a%40%3Cdev.flink.apache.org%3E
> > [2]
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741
>
>


Re: [ANNOUNCE] Apache Flink 1.10.2 released

2020-08-25 Thread Guowei Ma
Hi,

Thanks a lot for being the release manager Zhu Zhu!
Thanks everyone contributed to this!

Best,
Guowei


On Wed, Aug 26, 2020 at 11:18 AM Yun Tang  wrote:

> Thanks for Zhu's work to manage this release and everyone who contributed
> to this!
>
> Best,
> Yun Tang
> 
> From: Yangze Guo 
> Sent: Tuesday, August 25, 2020 14:47
> To: Dian Fu 
> Cc: Zhu Zhu ; dev ; user <
> u...@flink.apache.org>; user-zh 
> Subject: Re: [ANNOUNCE] Apache Flink 1.10.2 released
>
> Thanks a lot for being the release manager Zhu Zhu!
> Congrats to all others who have contributed to the release!
>
> Best,
> Yangze Guo
>
> On Tue, Aug 25, 2020 at 2:42 PM Dian Fu  wrote:
> >
> > Thanks ZhuZhu for managing this release and everyone else who
> contributed to this release!
> >
> > Regards,
> > Dian
> >
> > 在 2020年8月25日,下午2:22,Till Rohrmann  写道:
> >
> > Great news. Thanks a lot for being our release manager Zhu Zhu and to
> all others who have contributed to the release!
> >
> > Cheers,
> > Till
> >
> > On Tue, Aug 25, 2020 at 5:37 AM Zhu Zhu  wrote:
> >>
> >> The Apache Flink community is very happy to announce the release of
> Apache Flink 1.10.2, which is the first bugfix release for the Apache Flink
> 1.10 series.
> >>
> >> Apache Flink® is an open-source stream processing framework for
> distributed, high-performing, always-available, and accurate data streaming
> applications.
> >>
> >> The release is available for download at:
> >> https://flink.apache.org/downloads.html
> >>
> >> Please check out the release blog post for an overview of the
> improvements for this bugfix release:
> >> https://flink.apache.org/news/2020/08/25/release-1.10.2.html
> >>
> >> The full release notes are available in Jira:
> >>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12347791
> >>
> >> We would like to thank all contributors of the Apache Flink community
> who made this release possible!
> >>
> >> Thanks,
> >> Zhu
> >
> >
>


Re: [DISCUSS] FLIP-134: DataStream Semantics for Bounded Input

2020-08-24 Thread Guowei Ma
Hi, Klou

Thanks for your proposal. It's a very good idea.
Just a little comment about the "Batch vs Streaming Scheduling".  In the
AUTOMATIC execution mode maybe we could not pick BATCH execution mode even
if all sources are bounded. For example some applications would use the
`CheckpointListener`, which is not available in the BATCH mode in current
implementation.
So maybe we need more checks in the AUTOMATIC execution mode.

Best,
Guowei


On Thu, Aug 20, 2020 at 10:27 PM Kostas Kloudas  wrote:

> Hi all,
>
> Thanks for the comments!
>
> @Dawid: "execution.mode" can be a nice alternative and from a quick
> look it is not used currently by any configuration option. I will
> update the FLIP accordingly.
>
> @David: Given that having the option to allow timers to fire at the
> end of the job is already in the FLIP, I will leave it as is and I
> will update the default policy to be "ignore processing time timers
> set by the user". This will allow existing dataStream programs to run
> on bounded inputs. This update will affect point 2 in the "Processing
> Time Support in Batch" section.
>
> If these changes cover your proposals, then I would like to start a
> voting thread tomorrow evening if this is ok with you.
>
> Please let me know until then.
>
> Kostas
>
> On Tue, Aug 18, 2020 at 3:54 PM David Anderson 
> wrote:
> >
> > Being able to optionally fire registered processing time timers at the
> end of a job would be interesting, and would help in (at least some of) the
> cases I have in mind. I don't have a better idea.
> >
> > David
> >
> > On Mon, Aug 17, 2020 at 8:24 PM Kostas Kloudas 
> wrote:
> >>
> >> Hi Kurt and David,
> >>
> >> Thanks a lot for the insightful feedback!
> >>
> >> @Kurt: For the topic of checkpointing with Batch Scheduling, I totally
> >> agree with you that it requires a lot more work and careful thinking
> >> on the semantics. This FLIP was written under the assumption that if
> >> the user wants to have checkpoints on bounded input, he/she will have
> >> to go with STREAMING as the scheduling mode. Checkpointing for BATCH
> >> can be handled as a separate topic in the future.
> >>
> >> In the case of MIXED workloads and for this FLIP, the scheduling mode
> >> should be set to STREAMING. That is why the AUTOMATIC option sets
> >> scheduling to BATCH only if all the sources are bounded. I am not sure
> >> what are the plans there at the scheduling level, as one could imagine
> >> in the future that in mixed workloads, we schedule first all the
> >> bounded subgraphs in BATCH mode and we allow only one UNBOUNDED
> >> subgraph per application, which is going to be scheduled after all
> >> Bounded ones have finished. Essentially the bounded subgraphs will be
> >> used to bootstrap the unbounded one. But, I am not aware of any plans
> >> towards that direction.
> >>
> >>
> >> @David: The processing time timer handling is a topic that has also
> >> been discussed in the community in the past, and I do not remember any
> >> final conclusion unfortunately.
> >>
> >> In the current context and for bounded input, we chose to favor
> >> reproducibility of the result, as this is expected in batch processing
> >> where the whole input is available in advance. This is why this
> >> proposal suggests to not allow processing time timers. But I
> >> understand your argument that the user may want to be able to run the
> >> same pipeline on batch and streaming this is why we added the two
> >> options under future work, namely (from the FLIP):
> >>
> >> ```
> >> Future Work: In the future we may consider adding as options the
> capability of:
> >> * firing all the registered processing time timers at the end of a job
> >> (at close()) or,
> >> * ignoring all the registered processing time timers at the end of a
> job.
> >> ```
> >>
> >> Conceptually, we are essentially saying that we assume that batch
> >> execution is assumed to be instantaneous and refers to a single
> >> "point" in time and any processing-time timers for the future may fire
> >> at the end of execution or be ignored (but not throw an exception). I
> >> could also see ignoring the timers in batch as the default, if this
> >> makes more sense.
> >>
> >> By the way, do you have any usecases in mind that will help us better
> >> shape our processing time timer handling?
> >>
> >> Kostas
> >>
> >> On Mon, Aug 17, 2020 at 2:52 PM David Anderson 
> wrote:
> >> >
> >> > Kostas,
> >> >
> >> > I'm pleased to see some concrete details in this FLIP.
> >> >
> >> > I wonder if the current proposal goes far enough in the direction of
> recognizing the need some users may have for "batch" and "bounded
> streaming" to be treated differently. If I've understood it correctly, the
> section on scheduling allows me to choose STREAMING scheduling even if I
> have bounded sources. I like that approach, because it recognizes that even
> though I have bounded inputs, I don't necessarily want batch processing
> semantics. I think it makes sense to exten

Re: [ANNOUNCE] New PMC member: Piotr Nowojski

2020-07-07 Thread Guowei Ma
Congratulations!

Best,
Guowei


Fabian Hueske  于2020年7月7日周二 下午4:34写道:

> Congrats Piotr!
>
> Cheers, Fabian
>
> Am Di., 7. Juli 2020 um 10:04 Uhr schrieb Marta Paes Moreira <
> ma...@ververica.com>:
>
> > Go Piotr! Congrats!
> >
> > On Tue, Jul 7, 2020 at 7:15 AM Biao Liu  wrote:
> >
> > > Congrats Piotr! Well deserved!
> > >
> > > Thanks,
> > > Biao /'bɪ.aʊ/
> > >
> > >
> > >
> > > On Tue, 7 Jul 2020 at 13:03, Congxian Qiu 
> > wrote:
> > >
> > > > Congratulations Piotr!
> > > >
> > > > Best,
> > > > Congxian
> > > >
> > > >
> > > > Zhijiang  于2020年7月7日周二
> 下午12:25写道:
> > > >
> > > > > Congratulations Piotr!
> > > > >
> > > > > Best,
> > > > > Zhijiang
> > > > >
> > > > >
> > > > > --
> > > > > From:Rui Li 
> > > > > Send Time:2020年7月7日(星期二) 11:55
> > > > > To:dev 
> > > > > Cc:pnowojski 
> > > > > Subject:Re: [ANNOUNCE] New PMC member: Piotr Nowojski
> > > > >
> > > > > Congrats!
> > > > >
> > > > > On Tue, Jul 7, 2020 at 11:25 AM Yangze Guo 
> > wrote:
> > > > >
> > > > > > Congratulations!
> > > > > >
> > > > > > Best,
> > > > > > Yangze Guo
> > > > > >
> > > > > > On Tue, Jul 7, 2020 at 11:01 AM Jiayi Liao <
> > buptliaoji...@gmail.com>
> > > > > > wrote:
> > > > > > >
> > > > > > > Congratulations Piotr!
> > > > > > >
> > > > > > > Best,
> > > > > > > Jiayi Liao
> > > > > > >
> > > > > > > On Tue, Jul 7, 2020 at 10:54 AM Jark Wu 
> > wrote:
> > > > > > >
> > > > > > > > Congratulations Piotr!
> > > > > > > >
> > > > > > > > Best,
> > > > > > > > Jark
> > > > > > > >
> > > > > > > > On Tue, 7 Jul 2020 at 10:50, Yuan Mei <
> yuanmei.w...@gmail.com>
> > > > > wrote:
> > > > > > > >
> > > > > > > > > Congratulations, Piotr!
> > > > > > > > >
> > > > > > > > > On Tue, Jul 7, 2020 at 1:07 AM Stephan Ewen <
> > se...@apache.org>
> > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi all!
> > > > > > > > > >
> > > > > > > > > > It is my pleasure to announce that Piotr Nowojski joined
> > the
> > > > > Flink
> > > > > > PMC.
> > > > > > > > > >
> > > > > > > > > > Many of you may know Piotr from the work he does on the
> > data
> > > > > > processing
> > > > > > > > > > runtime and the network stack, from the mailing list, or
> > the
> > > > > > release
> > > > > > > > > > manager work.
> > > > > > > > > >
> > > > > > > > > > Congrats, Piotr!
> > > > > > > > > >
> > > > > > > > > > Best,
> > > > > > > > > > Stephan
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > >
> > > > >
> > > > >
> > > > > --
> > > > > Best regards!
> > > > > Rui Li
> > > > >
> > > > >
> > > >
> > >
> >
>


Re: [ANNOUNCE] Yu Li is now part of the Flink PMC

2020-06-17 Thread Guowei Ma
Congratulations , Yu!
Best,
Guowei


Yang Wang  于2020年6月18日周四 上午10:36写道:

> Congratulations , Yu!
>
> Best,
> Yang
>
> Piotr Nowojski  于2020年6月17日周三 下午9:21写道:
>
> > Congratulations :)
> >
> > > On 17 Jun 2020, at 14:53, Yun Tang  wrote:
> > >
> > > Congratulations , Yu! well deserved.
> > >
> > > Best
> > > Yun Tang
> > > 
> > > From: Yu Li 
> > > Sent: Wednesday, June 17, 2020 20:03
> > > To: dev 
> > > Subject: Re: [ANNOUNCE] Yu Li is now part of the Flink PMC
> > >
> > > Thanks everyone! Really happy to work in such a great and encouraging
> > > community!
> > >
> > > Best Regards,
> > > Yu
> > >
> > >
> > > On Wed, 17 Jun 2020 at 19:59, Congxian Qiu 
> > wrote:
> > >
> > >> Congratulations Yu !
> > >> Best,
> > >> Congxian
> > >>
> > >>
> > >> Thomas Weise  于2020年6月17日周三 下午6:23写道:
> > >>
> > >>> Congratulations!
> > >>>
> > >>>
> > >>> On Wed, Jun 17, 2020, 2:59 AM Fabian Hueske 
> wrote:
> > >>>
> >  Congrats Yu!
> > 
> >  Cheers, Fabian
> > 
> >  Am Mi., 17. Juni 2020 um 10:20 Uhr schrieb Till Rohrmann <
> >  trohrm...@apache.org>:
> > 
> > > Congratulations Yu!
> > >
> > > Cheers,
> > > Till
> > >
> > > On Wed, Jun 17, 2020 at 7:53 AM Jingsong Li <
> jingsongl...@gmail.com>
> > > wrote:
> > >
> > >> Congratulations Yu, well deserved!
> > >>
> > >> Best,
> > >> Jingsong
> > >>
> > >> On Wed, Jun 17, 2020 at 1:42 PM Yuan Mei 
> >  wrote:
> > >>
> > >>> Congrats, Yu!
> > >>>
> > >>> GXGX & well deserved!!
> > >>>
> > >>> Best Regards,
> > >>>
> > >>> Yuan
> > >>>
> > >>> On Wed, Jun 17, 2020 at 9:15 AM jincheng sun <
> >  sunjincheng...@gmail.com>
> > >>> wrote:
> > >>>
> >  Hi all,
> > 
> >  On behalf of the Flink PMC, I'm happy to announce that Yu Li is
> > >> now
> >  part of the Apache Flink Project Management Committee (PMC).
> > 
> >  Yu Li has been very active on Flink's Statebackend component,
> > >>> working
> > > on
> >  various improvements, for example the RocksDB memory management
> > >> for
> > > 1.10.
> >  and keeps checking and voting for our releases, and also has
> > > successfully
> >  produced two releases(1.10.0&1.10.1) as RM.
> > 
> >  Congratulations & Welcome Yu Li!
> > 
> >  Best,
> >  Jincheng (on behalf of the Flink PMC)
> > 
> > >>>
> > >>
> > >> --
> > >> Best, Jingsong Lee
> > >>
> > >
> > 
> > >>>
> > >>
> >
> >
>


Re: NoSuchMethodError: org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.(Ljava/lang/String;Lorg/apache/flink/fs/s3presto/common/HadoopConfigLoader

2020-06-09 Thread Guowei Ma
Sorry, I check the code find that '
org/apache/flink/fs/s3presto/common/HadoopConfigLoader'' is shaded on
purpose.
So could you check whether there is a old "presto" jar in your lib
directory?
Best,
Guowei


Claude Murad  于2020年6月10日周三 上午10:06写道:

> Thanks for your reply.  The
> org.apache.flink.fs.s3presto.common.HadoopConfigLoader is not in the jar.
> Did you mean to remove the S3FileSystemFactory if the class doesn't exist?
> If so, how will it work without the S3FileSystemFactory?  Also, this is the
> jar that was packaged in the Flink docker image.
>
> On Tue, Jun 9, 2020 at 9:46 PM Guowei Ma  wrote:
>
>> Hi,
>> In 1.10 there is no
>> 'Lorg/apache/flink/fs/s3presto/common/HadoopConfigLoader' . So I think
>> there might be a legacy S3FileSystemFactory in your jar. You could check
>> whether there is a 'org.apache.flink.fs.s3presto.common.HadoopConfigLoader'
>> in your jar or not. If there is one you could remove the
>> old S3FileSystemFactory and try again.
>>
>> Btw I think you might not copy both flink-s3-fs-hadoop-1.10.0
>> and  flink-s3-fs-presto-1.10.0.jar to the same plugin dir. [1]
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/filesystems/s3.html#hadooppresto-s3-file-systems-plugins
>> Best,
>> Guowei
>>
>>
>> Claude Murad  于2020年6月10日周三 上午4:06写道:
>>
>>> Hello,
>>>
>>> I'm trying to upgrade Flink from 1.7 to 1.10 retaining our Hadoop
>>> integration.  I copied the jar
>>> file flink-shaded-hadoop-2-uber-2.7.5-10.0.jar into /opt/flink/lib.  I also
>>> copied the files flink-s3-fs-hadoop-1.10.0.jar and
>>> flink-s3-fs-presto-1.10.0.jar into /opt/flink/plugins/s3 folder.  The error
>>> below occurs after deploying and launching docker image 1.10.0-scala_2.11.
>>> I saw that S3FileSystemFactory.java is now importing
>>> org.apache.flink.runtime.util.HadoopConfigLoader instead of
>>> org.apache.flink.fs.s3.common.HadoopConfigLoader which is how it was
>>> before.  I see the jar file flink-dist_2.11-1.10.0.jar contains
>>> the org.apache.flink.runtime.util.HadoopConfigLoader and it is under the
>>> folder /opt/flink/lib.  Any ideas on how to resolve this error?  Any help
>>> would be greatly appreciated, thank you.
>>>
>>>
>>> ERROR org.apache.flink.core.fs.FileSystem   -
>>> Failed to load a file system via services
>>> java.util.ServiceConfigurationError:
>>> org.apache.flink.core.fs.FileSystemFactory: Provider
>>> org.apache.flink.fs.s3presto.S3PFileSystemFactory could not be instantiated
>>> at java.util.ServiceLoader.fail(ServiceLoader.java:232)
>>> at java.util.ServiceLoader.access$100(ServiceLoader.java:185)
>>> at
>>> java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:384)
>>> at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)
>>> at java.util.ServiceLoader$1.next(ServiceLoader.java:480)
>>> at
>>> org.apache.flink.core.fs.FileSystem.addAllFactoriesToList(FileSystem.java:1024)
>>> at
>>> org.apache.flink.core.fs.FileSystem.loadFileSystemFactories(FileSystem.java:1006)
>>> at org.apache.flink.core.fs.FileSystem.initialize(FileSystem.java:303)
>>> at
>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.configureFileSystems(ClusterEntrypoint.java:194)
>>> at
>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:164)
>>> at
>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:518)
>>> at
>>> org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint.main(StandaloneSessionClusterEntrypoint.java:64)
>>> Caused by: java.lang.NoSuchMethodError:
>>> org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.(Ljava/lang/String;Lorg/apache/flink/fs/s3presto/common/HadoopConfigLoader;)V
>>> at
>>> org.apache.flink.fs.s3presto.S3FileSystemFactory.(S3FileSystemFactory.java:50)
>>> at
>>> org.apache.flink.fs.s3presto.S3PFileSystemFactory.(S3PFileSystemFactory.java:24)
>>> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>>> at
>>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>>> at
>>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>>> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>>> at java.lang.Class.newInstance(Class.java:442)
>>> at
>>> java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:380)
>>>
>>


Re: [ANNOUNCE] New Apache Flink Committer - Xintong Song

2020-06-07 Thread Guowei Ma
Congratulations

Best,
Guowei


Matt Wang  于2020年6月6日周六 上午9:56写道:

> Congratulations!
>
>
> ---
> Best,
> Matt Wang
>
>
> On 06/5/2020 22:34,Andrey Zagrebin wrote:
> Welcome to committers and congrats, Xintong!
>
> Cheers,
> Andrey
>
> On Fri, Jun 5, 2020 at 4:22 PM Till Rohrmann  wrote:
>
> Congratulations!
>
> Cheers,
> Till
>
> On Fri, Jun 5, 2020 at 10:00 AM Dawid Wysakowicz 
> wrote:
>
> Congratulations!
>
> Best,
>
> Dawid
>
> On 05/06/2020 09:10, tison wrote:
> Congrats, Xintong!
>
> Best,
> tison.
>
>
> Jark Wu  于2020年6月5日周五 下午3:00写道:
>
> Congratulations Xintong!
>
> Best,
> Jark
>
> On Fri, 5 Jun 2020 at 14:32, Danny Chan  wrote:
>
> Congratulations Xintong !
>
> Best,
> Danny Chan
> 在 2020年6月5日 +0800 PM2:20,dev@flink.apache.org,写道:
> Congratulations Xintong
>
>
>


Re: [DISCUSS] FLINK-17989 - java.lang.NoClassDefFoundError org.apache.flink.fs.azure.common.hadoop.HadoopRecoverableWriter

2020-05-27 Thread Guowei Ma
Hi,
I think the StreamingFileSink could not support Azure currently.
You could find more detailed info from here[1].

[1] https://issues.apache.org/jira/browse/FLINK-17444
Best,
Guowei


Israel Ekpo  于2020年5月28日周四 上午6:04写道:

> You can assign the task to me and I will like to collaborate with someone
> to fix it.
>
> On Wed, May 27, 2020 at 5:52 PM Israel Ekpo  wrote:
>
>> Some users are running into issues when using Azure Blob Storage for the
>> StreamFileSink
>>
>> https://issues.apache.org/jira/browse/FLINK-17989
>>
>> The issue is because certain packages are relocated in the POM file and
>> some classes are dropped in the final shaded jar
>>
>> I have attempted to comment out the relocated and recompile the source
>> but I keep hitting roadblocks of other relocation and filtration each time
>> I update a specific pom file
>>
>> How can this be addressed so that these users can be unblocked? Why are
>> the classes filtered out? What is the workaround? I can work on the patch
>> if I have some guidance.
>>
>> This is an issue in Flink 1.9 and 1.10 and I believe 1.11 has the same
>> issue but I am yet to confirm
>>
>> Thanks.
>>
>>
>>
>


Re: java.lang.NoSuchMethodError while writing to Kafka from Flink

2020-05-24 Thread Guowei Ma
Hi
1. You could check whether the 'org.apache.flink.api.java.clean' is in
your classpath first.
2. Do you follow the doc[1] to deploy your local cluster and run some
existed examples such as WordCount?

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/cluster_setup.html
Best,
Guowei


Re: [ANNOUNCE] New Apache Flink PMC Member - Hequn Chen

2020-04-17 Thread Guowei Ma
Congratulations!

Best,
Guowei


Xintong Song  于2020年4月17日周五 下午5:21写道:

> Congratulations, Hequn~!
>
> Thank you~
>
> Xintong Song
>
>
>
> On Fri, Apr 17, 2020 at 4:50 PM godfrey he  wrote:
>
> > Congratulations, Hequn!
> >
> > Best,
> > Godfrey
> >
> > Leonard Xu  于2020年4月17日周五 下午4:30写道:
> >
> > > Congratulations!
> > >
> > > Best,
> > > Leonard Xu
> > > > 在 2020年4月17日,15:46,Benchao Li  写道:
> > > >
> > > > Congratulations Hequn!
> > > >
> > > > Stephan Ewen  于2020年4月17日周五 下午3:42写道:
> > > >
> > > >> Congrats!
> > > >>
> > > >> On Fri, Apr 17, 2020 at 9:40 AM Jark Wu  wrote:
> > > >>
> > > >>> Congratulations Hequn!
> > > >>>
> > > >>> Best,
> > > >>> Jark
> > > >>>
> > > >>> On Fri, 17 Apr 2020 at 15:32, Yangze Guo 
> wrote:
> > > >>>
> > >  Congratulations!
> > > 
> > >  Best,
> > >  Yangze Guo
> > > 
> > >  On Fri, Apr 17, 2020 at 3:19 PM Jeff Zhang 
> > wrote:
> > > >
> > > > Congratulations, Hequn!
> > > >
> > > > Paul Lam  于2020年4月17日周五 下午3:02写道:
> > > >
> > > >> Congrats Hequn! Thanks a lot for your contribution to the
> > > >> community!
> > > >>
> > > >> Best,
> > > >> Paul Lam
> > > >>
> > > >> Dian Fu  于2020年4月17日周五 下午2:58写道:
> > > >>
> > > >>> Congratulations, Hequn!
> > > >>>
> > >  在 2020年4月17日,下午2:36,Becket Qin  写道:
> > > 
> > >  Hi all,
> > > 
> > >  I am glad to announce that Hequn Chen has joined the Flink
> PMC.
> > > 
> > >  Hequn has contributed to Flink for years. He has worked on
> > > >>> several
> > >  components including Table / SQL,PyFlink and Flink ML
> Pipeline.
> > > >> Besides,
> > >  Hequn is also very active in the community since the
> beginning.
> > > 
> > >  Congratulations, Hequn! Looking forward to your future
> > >  contributions.
> > > 
> > >  Thanks,
> > > 
> > >  Jiangjie (Becket) Qin
> > >  (On behalf of the Apache Flink PMC)
> > > >>>
> > > >>>
> > > >>
> > > >
> > > >
> > > > --
> > > > Best Regards
> > > >
> > > > Jeff Zhang
> > > 
> > > >>>
> > > >>
> > > >
> > > >
> > > > --
> > > >
> > > > Benchao Li
> > > > School of Electronics Engineering and Computer Science, Peking
> > University
> > > > Tel:+86-15650713730
> > > > Email: libenc...@gmail.com; libenc...@pku.edu.cn
> > >
> > >
> >
>


Re: [ANNOUNCE] New Committers and PMC member

2020-04-07 Thread Guowei Ma
Congratulations!

Best,
Guowei


Dawid Wysakowicz  于2020年4月7日周二 下午2:50写道:

> Thank you all for the support!
>
> Best,
>
> Dawid
>
> On 02/04/2020 04:33, godfrey he wrote:
> > Congratulations to all of you~
> >
> > Best,
> > Godfrey
> >
> > Ismaël Mejía  于2020年4月2日周四 上午6:42写道:
> >
> >> Congrats everyone!
> >>
> >> On Thu, Apr 2, 2020 at 12:16 AM Rong Rong  wrote:
> >>> Congratulations to all!!!
> >>>
> >>> --
> >>> Rong
> >>>
> >>> On Wed, Apr 1, 2020 at 2:27 PM Thomas Weise  wrote:
> >>>
>  Congratulations!
> 
> 
>  On Wed, Apr 1, 2020 at 9:31 AM Fabian Hueske 
> >> wrote:
> > Congrats everyone!
> >
> > Cheers, Fabian
> >
> > Am Mi., 1. Apr. 2020 um 18:26 Uhr schrieb Yun Tang  >>> :
> >> Congratulations to all of you!
> >>
> >> Best
> >> Yun Tang
> >> 
> >> From: Yang Wang 
> >> Sent: Wednesday, April 1, 2020 22:28
> >> To: dev 
> >> Subject: Re: [ANNOUNCE] New Committers and PMC member
> >>
> >> Congratulations all.
> >>
> >> Best,
> >> Yang
> >>
> >> Leonard Xu  于2020年4月1日周三 下午10:15写道:
> >>
> >>> Congratulations Konstantin, Dawid and Zhijiang!  Well deserved!
> >>>
> >>> Best,
> >>> Leonard Xu
>  在 2020年4月1日,21:22,Jark Wu  写道:
> 
>  Congratulations to you all!
> 
>  Best,
>  Jark
> 
>  On Wed, 1 Apr 2020 at 20:33, Kurt Young 
> >> wrote:
> > Congratulations to you all!
> >
> > Best,
> > Kurt
> >
> >
> > On Wed, Apr 1, 2020 at 7:41 PM Danny Chan <
> >> yuzhao@gmail.com>
> >> wrote:
> >> Congratulations!
> >>
> >> Best,
> >> Danny Chan
> >> 在 2020年4月1日 +0800 PM7:36,dev@flink.apache.org,写道:
> >>> Congratulations!
> >>>
>
>


[jira] [Created] (FLINK-17041) Migrate current TypeInformation creation to the TypeInformationExtractor framework

2020-04-07 Thread Guowei Ma (Jira)
Guowei Ma created FLINK-17041:
-

 Summary: Migrate current TypeInformation creation to the 
TypeInformationExtractor framework
 Key: FLINK-17041
 URL: https://issues.apache.org/jira/browse/FLINK-17041
 Project: Flink
  Issue Type: Sub-task
Reporter: Guowei Ma






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17039) Introduce TypeInformationExtractor interface

2020-04-07 Thread Guowei Ma (Jira)
Guowei Ma created FLINK-17039:
-

 Summary: Introduce TypeInformationExtractor interface
 Key: FLINK-17039
 URL: https://issues.apache.org/jira/browse/FLINK-17039
 Project: Flink
  Issue Type: Sub-task
  Components: API / DataStream
Reporter: Guowei Ma






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17038) Decouple resolving Type and creating TypeInformation process

2020-04-07 Thread Guowei Ma (Jira)
Guowei Ma created FLINK-17038:
-

 Summary: Decouple resolving Type and creating TypeInformation 
process
 Key: FLINK-17038
 URL: https://issues.apache.org/jira/browse/FLINK-17038
 Project: Flink
  Issue Type: Improvement
  Components: API / DataStream
Affects Versions: 1.10.0
Reporter: Guowei Ma






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] FLIP-115: Filesystem connector in Table

2020-03-19 Thread Guowei Ma
Hi,


I am very interested in the topic. I would like to join the offline
discussion if possible. I think you guys already give many inputs and
concerns. I would share some of my thoughts. Correct me if I misunderstand
you.

Flink is a unified engine. Since that Flink should provide the e2e
exactly-once semantics for the user in both streaming and batch. E2E
exactly-once is not a trivial thing.

StreamingFileSink already does a lot of work on how to support e2e
exactly-once semantics for the “file” output scenario. It provides a
layered architecture

   1.

   BulkWriter/Encode deals with the data format in a file.
   2.

   BucketAssinger/RollingPolicy deals with the lifecycle of the file and
   directory structure.
   3.

   RecoverableWriter deals with the exactly-once semantics.


All these layers are orthogonal and could be combined with each other. This
could reduce much of the work. (Currently, there are some limitations.)
There are some already known issues such as how to support batch in the
StreamingFileSink, How to support orc and so on. But there are already some
discussions on how to resolve these issues.

Jinsong also gives some new cases that the StreamingFileSink might not
support currently.  I am very glad to see that you all agree that improving
the StreamingFileSink architecture for these new cases.

Best,
Guowei


Jingsong Li  于2020年3月19日周四 上午12:19写道:

> Hi Stephan & Kostas & Piotrek, thanks for these inputs,
>
> Maybe what I expressed is not clear. For the implementation, I want to know
> what you think, rather than must making another set from scratch. Piotrek
> you are right, implementation is the part of this FLIP too, because we can
> not list all detail things in the FLIP, so the implementation do affect
> user's behaviors. And the maintenance / development costs are also points.
>
> I think you already persuaded me. I am thinking about based on
> StreamingFileSink. And extending StreamingFileSink can solve "partition
> commit" requirement, I have tried in my POC. And it is true, Recoverable
> things and S3 things also important.
> So I listed "What is missing" for StreamingFileSink in previous mail. (It
> is not defense for making another set from scratch)
>
> Hi Stephan,
>
> >> The FLIP is "Filesystem connector in Table", it's about building up
> Flink
> Table's capabilities.
>
> What I mean is this is not just for Hive, this FLIP is for table. So we
> don't need do all things for Hive. But Hive is also a "format" (or
> something else) of Filesystem connector. Its requirements can be
> considered.
>
> I think you are right about the design, and let me take this seriously, a
> unify way make us stronger, less confuse, less surprise, more rigorous
> design. And I am pretty sure table things are good for enhancing DataStream
> api too.
>
> Hi Kostas,
>
> Yes, Parquet and Orc are the main formats. Good to know your support~
>
> I think streaming file sink and file system connector things are important
> to Flink, it is good&time to think about these common requirements, think
> about batch support, it is not just about table, it is for whole Flink too.
> If there are some requirements that is hard to support or violates existing
> design. Exclude them.
>
> Best,
> Jingsong Lee
>
>
> On Wed, Mar 18, 2020 at 10:31 PM Piotr Nowojski 
> wrote:
>
> > Hi Kurt,
> >
> > +1 for having some offline discussion on this topic.
> >
> > But I think the question about using StreamingFileSink or implementing
> > subset of it’s feature from scratch is quite fundamental design decision,
> > with impact on the behaviour of Public API, so I wouldn’t discard it as
> > technical detail and should be included as part of the FLIP (I know It
> > could be argued in opposite direction).
> >
> > Piotrek
> >
> > > On 18 Mar 2020, at 13:55, Kurt Young  wrote:
> > >
> > > Hi all,
> > >
> > > Thanks for the discuss and feedbacks. I think this FLIP doesn't imply
> the
> > > implementation
> > > of such connector yet, it only describes the functionality and expected
> > > behaviors from user's
> > > perspective. Reusing current StreamingFileSink is definitely one of the
> > > possible ways to
> > > implement it. Since there are lots of details and I would suggest we
> can
> > > have an offline meeting
> > > to discuss the how these could be achieved by extending
> StremingFileSink,
> > > and how much
> > > effort we need to put on it. What do you think?
> > >
> > > Best,
> > > Kurt
> > >
> > >
> > > On Wed, Mar 18, 2020 at 7:21 PM Kostas Kloudas 
> > wrote:
> > >
> > >> Hi all,
> > >>
> > >> I also agree with Stephan on this!
> > >>
> > >> It has been more than a year now that most of our efforts have had the
> > >> "unify" / "unification"/ etc either on their title or in their core
> > >> and this has been the focus of all our resources. By deviating from
> > >> this now, we only put more stress on other teams in the future. When
> > >> the users start using a given API, with high probability, they will
> > >> ask (

<    1   2   3   4   >