Resuming Savepoint issue with upgraded Flink version 1.11.2

2020-10-22 Thread Partha Mishra
Hi,

We are trying to save checkpoints for one of the flink job running in Flink 
version 1.9 and tried to resume the same flink job in Flink version 1.11.2. We 
are getting the below error when trying to restore the saved checkpoint in the 
newer flink version. Can

Cannot map checkpoint/savepoint state for operator 
fbb4ef531e002f8fb3a2052db255adf5 to the new program, because the operator is 
not available in the new program.


Complete Stack Trace :
{​"errors":["org.apache.flink.runtime.rest.handler.RestHandlerException: 
Could not execute application.\n\tat 
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$1(JarRunHandler.java:103)\n\tat
 
java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836)\n\tat
 
java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811)\n\tat
 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)\n\tat
 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1609)\n\tat
 java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat 
java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)\n\tat
 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)\n\tat
 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat
 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat
 java.lang.Thread.run(Thread.java:748)\nCaused by: 
java.util.concurrent.CompletionException: 
org.apache.flink.util.FlinkRuntimeException: Could not execute 
application.\n\tat 
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)\n\tat
 
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)\n\tat
 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606)\n\t...
 7 more\nCaused by: org.apache.flink.util.FlinkRuntimeException: Could not 
execute application.\n\tat 
org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:81)\n\tat
 
org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:67)\n\tat
 
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$0(JarRunHandler.java:100)\n\tat
 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)\n\t...
 7 more\nCaused by: org.apache.flink.client.program.ProgramInvocationException: 
The main method caused an error: Failed to execute job 
'ST1_100Services-preprod-Tumbling-ProcessedBased'.\n\tat 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)\n\tat
 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)\n\tat
 org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)\n\tat 
org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:78)\n\t...
 10 more\nCaused by: org.apache.flink.util.FlinkException: Failed to execute 
job 'ST1_100Services-preprod-Tumbling-ProcessedBased'.\n\tat 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1821)\n\tat
 
org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:128)\n\tat
 
org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76)\n\tat
 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1697)\n\tat
 
org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:699)\n\tat
 com.man.ceon.cep.jobs.AnalyticService$.main(AnalyticService.scala:108)\n\tat 
com.man.ceon.cep.jobs.AnalyticService.main(AnalyticService.scala)\n\tat 
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n\tat 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\n\tat
 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat
 java.lang.reflect.Method.invoke(Method.java:498)\n\tat 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)\n\t...
 13 more\nCaused by: org.apache.flink.runtime.client.JobSubmissionException: 
Failed to submit job.\n\tat 
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$internalSubmitJob$3(Dispatcher.java:344)\n\tat
 
java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836)\n\tat
 
java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811)\n\tat
 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)\n\tat
 

Re: [DISCUSS] FLIP-149: Introduce the KTable Connector

2020-10-22 Thread Jark Wu
Hi Timo,

I have some concerns about `kafka-cdc`,
1) cdc is an abbreviation of Change Data Capture which is commonly used for
databases, not for message queues.
2) usually, cdc produces full content of changelog, including
UPDATE_BEFORE, however "upsert kafka" doesn't
3) `kafka-cdc` sounds like a natively support for `debezium-json` format,
however, it is not and even we don't want
   "upsert kafka" to support "debezium-json"


Hi Jingsong,

I think the terminology of "upsert" is fine, because Kafka also uses
"upsert" to define such behavior in their official documentation [1]:

> a data record in a changelog stream is interpreted as an UPSERT aka
INSERT/UPDATE

Materialize uses the "UPSERT" keyword to define such behavior too [2].
Users have been requesting such feature using "upsert kafka" terminology in
user mailing lists [3][4].
Many other systems support "UPSERT" statement natively, such as impala [5],
SAP [6], Phoenix [7], Oracle NoSQL [8], etc..

Therefore, I think we don't need to be afraid of introducing "upsert"
terminology, it is widely accepted by users.

Best,
Jark


[1]:
https://kafka.apache.org/20/documentation/streams/developer-guide/dsl-api.html#streams_concepts_ktable
[2]:
https://materialize.io/docs/sql/create-source/text-kafka/#upsert-on-a-kafka-topic
[3]:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/SQL-materialized-upsert-tables-td18482.html#a18503
[4]:
http://apache-flink.147419.n8.nabble.com/Kafka-Sink-AppendStreamTableSink-doesn-t-support-consuming-update-changes-td5959.html
[5]: https://impala.apache.org/docs/build/html/topics/impala_upsert.html
[6]:
https://help.sap.com/viewer/7c78579ce9b14a669c1f3295b0d8ca16/Cloud/en-US/ea8b6773be584203bcd99da76844c5ed.html
[7]: https://phoenix.apache.org/atomic_upsert.html
[8]:
https://docs.oracle.com/en/database/other-databases/nosql-database/18.3/sqlfornosql/adding-table-rows-using-insert-and-upsert-statements.html

On Fri, 23 Oct 2020 at 10:36, Jingsong Li  wrote:

> The `kafka-cdc` looks good to me.
> We can even give options to indicate whether to turn on compact, because
> compact is just an optimization?
>
> - ktable let me think about KSQL.
> - kafka-compacted it is not just compacted, more than that, it still has
> the ability of CDC
> - upsert-kafka , upsert is back, and I don't really want to see it again
> since we have CDC
>
> Best,
> Jingsong
>
> On Fri, Oct 23, 2020 at 2:21 AM Timo Walther  wrote:
>
> > Hi Jark,
> >
> > I would be fine with `connector=upsert-kafka`. Another idea would be to
> > align the name to other available Flink connectors [1]:
> >
> > `connector=kafka-cdc`.
> >
> > Regards,
> > Timo
> >
> > [1] https://github.com/ververica/flink-cdc-connectors
> >
> > On 22.10.20 17:17, Jark Wu wrote:
> > > Another name is "connector=upsert-kafka', I think this can solve Timo's
> > > concern on the "compacted" word.
> > >
> > > Materialize also uses "ENVELOPE UPSERT" [1] keyword to identify such
> > kafka
> > > sources.
> > > I think "upsert" is a well-known terminology widely used in many
> systems
> > > and matches the
> > >   behavior of how we handle the kafka messages.
> > >
> > > What do you think?
> > >
> > > Best,
> > > Jark
> > >
> > > [1]:
> > >
> >
> https://materialize.io/docs/sql/create-source/text-kafka/#upsert-on-a-kafka-topic
> > >
> > >
> > >
> > >
> > > On Thu, 22 Oct 2020 at 22:53, Kurt Young  wrote:
> > >
> > >> Good validation messages can't solve the broken user experience,
> > especially
> > >> that
> > >> such update mode option will implicitly make half of current kafka
> > options
> > >> invalid or doesn't
> > >> make sense.
> > >>
> > >> Best,
> > >> Kurt
> > >>
> > >>
> > >> On Thu, Oct 22, 2020 at 10:31 PM Jark Wu  wrote:
> > >>
> > >>> Hi Timo, Seth,
> > >>>
> > >>> The default value "inserting" of "mode" might be not suitable,
> > >>> because "debezium-json" emits changelog messages which include
> updates.
> > >>>
> > >>> On Thu, 22 Oct 2020 at 22:10, Seth Wiesman 
> wrote:
> > >>>
> >  +1 for supporting upsert results into Kafka.
> > 
> >  I have no comments on the implementation details.
> > 
> >  As far as configuration goes, I tend to favor Timo's option where we
> > >> add
> > >>> a
> >  "mode" property to the existing Kafka table with default value
> > >>> "inserting".
> >  If the mode is set to "updating" then the validation changes to the
> > new
> >  requirements. I personally find it more intuitive than a seperate
> >  connector, my fear is users won't understand its the same physical
> > >> kafka
> >  sink under the hood and it will lead to other confusion like does it
> > >>> offer
> >  the same persistence guarantees? I think we are capable of adding
> good
> >  valdiation messaging that solves Jark and Kurts concerns.
> > 
> > 
> >  On Thu, Oct 22, 2020 at 8:51 AM Timo Walther 
> > >> wrote:
> > 
> > > Hi Jark,
> > >
> > > "calling it "kafka-compacted" can even remind users to 

Re: [DISCUSS] FLIP-148: Introduce Sort-Merge Based Blocking Shuffle to Flink

2020-10-22 Thread Yingjie Cao
Hi Zhijiang,

Thanks for your reply and suggestions.

1. For
taskmanager.network.sort-merge-blocking-shuffle.max-files-per-partition, we
decide to append all data produced by one result partition to one file, so
this option will be removed.

2. For
taskmanager.network.sort-merge-blocking-shuffle.buffers-per-partition, the
required buffer of the buffer pool will be min(numSubpartition + 1, this
config value), so there it does not increase the number of required buffers
but may reduce it when the parallelism is very high. So when user switch to
sort-merge implementation, there should be no insufficient network buffers
issue.

3. For taskmanager.network.sort-merge-blocking-shuffle.min-parallelism, I
agree a bool value is easier to config for user, so we will replace this
with a bool switch. We can add this config option back is we have
performance concerns in the future.

Best,
Yingjie

Zhijiang  于2020年10月19日周一 下午5:27写道:

> Thanks for launching the discussion and the respective FLIP, Yingjie!
>
> In general, I am +1 for this proposal since sort-merge ability has already
> been taken widely in other batch-based project, like MR, Spark, etc.
> And it indeed has some performance benefits in some scenarios as mentioned
> in FLIP.
>
> I only have some thoughts for the section of `Public Interfaces` since it
> cares about how the users understand and better use in practice.
>  As for the new introduced classes, the can be further reviewed in follow
> up PR since without existing interfaces refactoring ATM.
>
> 1.
> taskmanager.network.sort-merge-blocking-shuffle.max-files-per-partition:
> the default value should be `1` I guess?  It is better to give a proper
> default value that most of users do not need to
>  care about it in practice.
>
> 2. taskmanager.network.sort-merge-blocking-shuffle.buffers-per-partition:
> how about making the default for the number of required buffers in
> LocalBufferPool as now for result partition?
>  Then it is transparent for users to not increase any memory resource no
> matter with either hash based or sort-merge based way. For the tuned
> setting , it is better to give some hints to guide
>  users how to adjust it for better performance based on some factors.
>
> 3. taskmanager.network.sort-merge-blocking-shuffle.min-parallelism: I
> guess it is not very easy or determined to give a proper value for the
> switch between hash based and sort-merge based.
>  And how much data a subpartition taking (broadcast) or not suitable for
> hash based is not completely decided by the number of parallelism somehow.
> And users might be confused how to tune
>  it in practice. I prefer to giving a simple boolean type option for easy
> use and the default value can be false in MVP. Then it will not bring any
> effects for users after upgrade to new version by default,
>  and sort-merge option can be enabled to try out if users willing in
> desired scenarios.
>
> Best,
> Zhijiang
> --
> From:Till Rohrmann 
> Send Time:2020年10月16日(星期五) 15:42
> To:dev 
> Subject:Re: [DISCUSS] FLIP-148: Introduce Sort-Merge Based Blocking
> Shuffle to Flink
>
> Thanks for sharing the preliminary numbers with us Yingjie. The numbers
> look quite impressive :-)
>
> Cheers,
> Till
>
> On Thu, Oct 15, 2020 at 5:25 PM Yingjie Cao 
> wrote:
>
> > Hi Till,
> >
> > Thanks for your reply and comments.
> >
> > You are right, the proposed sort-merge based shuffle is an extension of
> the
> > existing blocking shuffle and does not change any default behavior of
> > Flink.
> >
> > As for the performance, according to our previous experience, sort-merge
> > based implementation can reduce the shuffle time by 30% to even 90%
> > compared to hash-based implementation. My PoC implementation without any
> > further optimization can already reduce the shuffle time over 10% on SSD
> > and over 70% on HDD for a simple 1000 * 1000 parallelism benchmark job.
> >
> > After switch to sort-merge based blocking shuffle, some of our users'
> jobs
> > can scale up to over 2 parallelism, though need some JM and RM side
> > optimization. I haven't ever tried to find where the upper bound is, but
> I
> > guess sever tens of thousand should be able to m
> > <
> >
> http://www.baidu.com/link?url=g0rAiJfPTxlMOJ4v6DXQeXhu5Y5HroJ1HHBHo34fjTZ5mtC0aYfog4eRKEnJmoPaImLyFafqncmA7l3Zowb8vovv6Dy9VhO3TlAtjNqoV-W
> > >eet
> > the needs of most users.
> >
> > Best,
> > Yingjie
> >
> > Till Rohrmann  于2020年10月15日周四 下午3:57写道:
> >
> > > Hi Yingjie,
> > >
> > > thanks for proposing the sort-merge based blocking shuffle. I like the
> > > proposal and it does not seem to change the internals of Flink. Instead
> > it
> > > is an extension of existing interfaces which makes it a
> > > non-invasive addition.
> > >
> > > Do you have any numbers comparing the performance of the sort-merge
> based
> > > shuffle against the hash-based shuffle? To what parallelism can you
> scale
> > > up when using the 

[jira] [Created] (FLINK-19776) cancel with savePoint throw Unrecognized field "status"

2020-10-22 Thread tonychan (Jira)
tonychan created FLINK-19776:


 Summary: cancel with savePoint  throw  Unrecognized field "status"
 Key: FLINK-19776
 URL: https://issues.apache.org/jira/browse/FLINK-19776
 Project: Flink
  Issue Type: Bug
  Components: API / Core
Affects Versions: 1.8.3
Reporter: tonychan
 Attachments: image-2020-10-23-10-56-13-197.png, 
image-2020-10-23-10-56-44-838.png, image-2020-10-23-10-57-50-418.png

job running with yarn-per job mode

!image-2020-10-23-10-56-44-838.png!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] FLIP-149: Introduce the KTable Connector

2020-10-22 Thread Jingsong Li
The `kafka-cdc` looks good to me.
We can even give options to indicate whether to turn on compact, because
compact is just an optimization?

- ktable let me think about KSQL.
- kafka-compacted it is not just compacted, more than that, it still has
the ability of CDC
- upsert-kafka , upsert is back, and I don't really want to see it again
since we have CDC

Best,
Jingsong

On Fri, Oct 23, 2020 at 2:21 AM Timo Walther  wrote:

> Hi Jark,
>
> I would be fine with `connector=upsert-kafka`. Another idea would be to
> align the name to other available Flink connectors [1]:
>
> `connector=kafka-cdc`.
>
> Regards,
> Timo
>
> [1] https://github.com/ververica/flink-cdc-connectors
>
> On 22.10.20 17:17, Jark Wu wrote:
> > Another name is "connector=upsert-kafka', I think this can solve Timo's
> > concern on the "compacted" word.
> >
> > Materialize also uses "ENVELOPE UPSERT" [1] keyword to identify such
> kafka
> > sources.
> > I think "upsert" is a well-known terminology widely used in many systems
> > and matches the
> >   behavior of how we handle the kafka messages.
> >
> > What do you think?
> >
> > Best,
> > Jark
> >
> > [1]:
> >
> https://materialize.io/docs/sql/create-source/text-kafka/#upsert-on-a-kafka-topic
> >
> >
> >
> >
> > On Thu, 22 Oct 2020 at 22:53, Kurt Young  wrote:
> >
> >> Good validation messages can't solve the broken user experience,
> especially
> >> that
> >> such update mode option will implicitly make half of current kafka
> options
> >> invalid or doesn't
> >> make sense.
> >>
> >> Best,
> >> Kurt
> >>
> >>
> >> On Thu, Oct 22, 2020 at 10:31 PM Jark Wu  wrote:
> >>
> >>> Hi Timo, Seth,
> >>>
> >>> The default value "inserting" of "mode" might be not suitable,
> >>> because "debezium-json" emits changelog messages which include updates.
> >>>
> >>> On Thu, 22 Oct 2020 at 22:10, Seth Wiesman  wrote:
> >>>
>  +1 for supporting upsert results into Kafka.
> 
>  I have no comments on the implementation details.
> 
>  As far as configuration goes, I tend to favor Timo's option where we
> >> add
> >>> a
>  "mode" property to the existing Kafka table with default value
> >>> "inserting".
>  If the mode is set to "updating" then the validation changes to the
> new
>  requirements. I personally find it more intuitive than a seperate
>  connector, my fear is users won't understand its the same physical
> >> kafka
>  sink under the hood and it will lead to other confusion like does it
> >>> offer
>  the same persistence guarantees? I think we are capable of adding good
>  valdiation messaging that solves Jark and Kurts concerns.
> 
> 
>  On Thu, Oct 22, 2020 at 8:51 AM Timo Walther 
> >> wrote:
> 
> > Hi Jark,
> >
> > "calling it "kafka-compacted" can even remind users to enable log
> > compaction"
> >
> > But sometimes users like to store a lineage of changes in their
> >> topics.
> > Indepent of any ktable/kstream interpretation.
> >
> > I let the majority decide on this topic to not further block this
> > effort. But we might find a better name like:
> >
> > connector = kafka
> > mode = updating/inserting
> >
> > OR
> >
> > connector = kafka-updating
> >
> > ...
> >
> > Regards,
> > Timo
> >
> >
> >
> >
> > On 22.10.20 15:24, Jark Wu wrote:
> >> Hi Timo,
> >>
> >> Thanks for your opinions.
> >>
> >> 1) Implementation
> >> We will have an stateful operator to generate INSERT and
> >>> UPDATE_BEFORE.
> >> This operator is keyby-ed (primary key as the shuffle key) after
> >> the
> > source
> >> operator.
> >> The implementation of this operator is very similar to the existing
> >> `DeduplicateKeepLastRowFunction`.
> >> The operator will register a value state using the primary key
> >> fields
>  as
> >> keys.
> >> When the value state is empty under current key, we will emit
> >> INSERT
>  for
> >> the input row.
> >> When the value state is not empty under current key, we will emit
> >> UPDATE_BEFORE using the row in state,
> >> and emit UPDATE_AFTER using the input row.
> >> When the input row is DELETE, we will clear state and emit DELETE
> >>> row.
> >>
> >> 2) new option vs new connector
> >>> We recently simplified the table options to a minimum amount of
> >> characters to be as concise as possible in the DDL.
> >> I think this is the reason why we want to introduce a new
> >> connector,
> >> because we can simplify the options in DDL.
> >> For example, if using a new option, the DDL may look like this:
> >>
> >> CREATE TABLE users (
> >> user_id BIGINT,
> >> user_name STRING,
> >> user_level STRING,
> >> region STRING,
> >> PRIMARY KEY (user_id) NOT ENFORCED
> >> ) WITH (
> >> 'connector' = 'kafka',
> >> 'model' = 'table',
> >> 'topic' = 

Re: [VOTE] NEW FLIP-102: Add More Metrics to TaskManager

2020-10-22 Thread Xintong Song
Thanks Yadong, Mattias and Lining for reviving this FLIP.

I've seen so many users confused by the current webui page of task manager
metrics. This FLIP should definitely help them understand the memory
footprints and tune the configurations for task managers.

The design part of this proposal looks really good to me. The UI is clear
and easy to understand. The metrics look correct to me.

KIND REMINDER: I think the section `Implementation Proposal` in the FLIP
doc needs to be updated, so that we can vote on this FLIP. Currently, all
the tickets listed are closed.

Thank you~

Xintong Song



On Thu, Oct 22, 2020 at 5:53 PM Yadong Xie  wrote:

> Hi all
>
> I want to start a new vote for FLIP-102, which proposes to add more metrics
> to the task manager in web UI.
>
> The new FLIP-102 was revisited and adapted following the old ML discussion
> <
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-FLIP-102-Add-More-Metrics-to-TaskManager-td37898.html
> >
> .
>
> Thanks to Matthias and Lining's effort, more metrics are available. We can
> match most of the effective configuration to the metrics just as Flink Doc
> <
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/memory/mem_setup_tm.html#detailed-memory-model
> >
> describes now.
>
> The vote will last for at least 72 hours, following the consensus voting.
>
>
> FLIP-102 wiki:
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-102%3A+Add+More+Metrics+to+TaskManager
>
>
> Discussion thread:
>
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-FLIP-102-Add-More-Metrics-to-TaskManager-td37898.html
>
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-75-Flink-Web-UI-Improvement-Proposal-td33540.html
>
> Thanks,
>
> Yadong
>


[jira] [Created] (FLINK-19775) SystemProcessingTimeServiceTest.testImmediateShutdown is instable

2020-10-22 Thread Dian Fu (Jira)
Dian Fu created FLINK-19775:
---

 Summary: SystemProcessingTimeServiceTest.testImmediateShutdown is 
instable
 Key: FLINK-19775
 URL: https://issues.apache.org/jira/browse/FLINK-19775
 Project: Flink
  Issue Type: Bug
  Components: API / DataStream
Affects Versions: 1.11.0
Reporter: Dian Fu


https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=8131=logs=d89de3df-4600-5585-dadc-9bbc9a5e661c=66b5c59a-0094-561d-0e44-b149dfdd586d

{code}
2020-10-22T21:12:54.9462382Z [ERROR] 
testImmediateShutdown(org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeServiceTest)
  Time elapsed: 0.009 s  <<< ERROR!
2020-10-22T21:12:54.9463024Z java.lang.InterruptedException
2020-10-22T21:12:54.9463331Zat java.lang.Object.wait(Native Method)
2020-10-22T21:12:54.9463766Zat java.lang.Object.wait(Object.java:502)
2020-10-22T21:12:54.9464140Zat 
org.apache.flink.core.testutils.OneShotLatch.await(OneShotLatch.java:63)
2020-10-22T21:12:54.9466014Zat 
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeServiceTest.testImmediateShutdown(SystemProcessingTimeServiceTest.java:154)
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: NullPointerException when trying to read null array in Postgres using JDBC Connector

2020-10-22 Thread Dylan Forciea
Looking at this, it’s a simple enough fix. My question would just be around a 
unit test that tests this particular bug. It doesn’t look like there is 
anything that directly tests the Postgres row converter. There is a test that 
utilizes Derby, but it looks like only Postgres supports arrays as far as I can 
tell.

I could create a unit test for the PostgresRowConverter and test the array 
functionality in it.  If that sounds like a good plan, I’d be willing to create 
a PR to fix this issue.

Regards,
Dylan Forciea

From: Dylan Forciea 
Date: Thursday, October 22, 2020 at 8:58 AM
To: Danny Chan 
Cc: "u...@flink.apache.org" , "dev@flink.apache.org" 

Subject: Re: NullPointerException when trying to read null array in Postgres 
using JDBC Connector

Danny,

Thanks! I have created a new JIRA issue [1]. I’ll look into how hard it is to 
get a patch and unit test myself, although I may need a hand on the process of 
making a change to both the master branch and a release branch if it is desired 
to get a fix into 1.11.

Regards,
Dylan Forciea

[1] https://issues.apache.org/jira/browse/FLINK-19771

From: Danny Chan 
Date: Thursday, October 22, 2020 at 4:34 AM
To: Dylan Forciea 
Cc: Flink ML 
Subject: Re: NullPointerException when trying to read null array in Postgres 
using JDBC Connector

Yes, the current code throws directly for NULLs, can you log an issue there ?

Dylan Forciea mailto:dy...@oseberg.io>> 于2020年10月21日周三 
上午4:30写道:
I believe I am getting an error because I have a nullable postgres array of 
text that is set to NULL that I’m reading using the JDBC SQL Connector. Is this 
something that should be allowed? Looking at the source code line below, it 
doesn’t look like the case of an array being null would be handled.

[error] Caused by: java.io.IOException: Couldn't access resultSet
[error]   at 
org.apache.flink.connector.jdbc.table.JdbcRowDataInputFormat.nextRecord(JdbcRowDataInputFormat.java:266)
[error]   at 
org.apache.flink.connector.jdbc.table.JdbcRowDataInputFormat.nextRecord(JdbcRowDataInputFormat.java:57)
[error]   at 
org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:91)
[error]   at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
[error]   at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
[error]   at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213)
[error] Caused by: java.lang.NullPointerException
[error]   at 
org.apache.flink.connector.jdbc.internal.converter.PostgresRowConverter.lambda$createPostgresArrayConverter$c06ce9f4$2(PostgresRowConverter.java:97)
[error]   at 
org.apache.flink.connector.jdbc.internal.converter.AbstractJdbcRowConverter.toInternal(AbstractJdbcRowConverter.java:79)
[error]   at 
org.apache.flink.connector.jdbc.table.JdbcRowDataInputFormat.nextRecord(JdbcRowDataInputFormat.java:259)
[error]   ... 5 more

Thanks,
Dylan Forciea


Re: [DISCUSS] FLIP-149: Introduce the KTable Connector

2020-10-22 Thread Timo Walther

Hi Jark,

I would be fine with `connector=upsert-kafka`. Another idea would be to 
align the name to other available Flink connectors [1]:


`connector=kafka-cdc`.

Regards,
Timo

[1] https://github.com/ververica/flink-cdc-connectors

On 22.10.20 17:17, Jark Wu wrote:

Another name is "connector=upsert-kafka', I think this can solve Timo's
concern on the "compacted" word.

Materialize also uses "ENVELOPE UPSERT" [1] keyword to identify such kafka
sources.
I think "upsert" is a well-known terminology widely used in many systems
and matches the
  behavior of how we handle the kafka messages.

What do you think?

Best,
Jark

[1]:
https://materialize.io/docs/sql/create-source/text-kafka/#upsert-on-a-kafka-topic




On Thu, 22 Oct 2020 at 22:53, Kurt Young  wrote:


Good validation messages can't solve the broken user experience, especially
that
such update mode option will implicitly make half of current kafka options
invalid or doesn't
make sense.

Best,
Kurt


On Thu, Oct 22, 2020 at 10:31 PM Jark Wu  wrote:


Hi Timo, Seth,

The default value "inserting" of "mode" might be not suitable,
because "debezium-json" emits changelog messages which include updates.

On Thu, 22 Oct 2020 at 22:10, Seth Wiesman  wrote:


+1 for supporting upsert results into Kafka.

I have no comments on the implementation details.

As far as configuration goes, I tend to favor Timo's option where we

add

a

"mode" property to the existing Kafka table with default value

"inserting".

If the mode is set to "updating" then the validation changes to the new
requirements. I personally find it more intuitive than a seperate
connector, my fear is users won't understand its the same physical

kafka

sink under the hood and it will lead to other confusion like does it

offer

the same persistence guarantees? I think we are capable of adding good
valdiation messaging that solves Jark and Kurts concerns.


On Thu, Oct 22, 2020 at 8:51 AM Timo Walther 

wrote:



Hi Jark,

"calling it "kafka-compacted" can even remind users to enable log
compaction"

But sometimes users like to store a lineage of changes in their

topics.

Indepent of any ktable/kstream interpretation.

I let the majority decide on this topic to not further block this
effort. But we might find a better name like:

connector = kafka
mode = updating/inserting

OR

connector = kafka-updating

...

Regards,
Timo




On 22.10.20 15:24, Jark Wu wrote:

Hi Timo,

Thanks for your opinions.

1) Implementation
We will have an stateful operator to generate INSERT and

UPDATE_BEFORE.

This operator is keyby-ed (primary key as the shuffle key) after

the

source

operator.
The implementation of this operator is very similar to the existing
`DeduplicateKeepLastRowFunction`.
The operator will register a value state using the primary key

fields

as

keys.
When the value state is empty under current key, we will emit

INSERT

for

the input row.
When the value state is not empty under current key, we will emit
UPDATE_BEFORE using the row in state,
and emit UPDATE_AFTER using the input row.
When the input row is DELETE, we will clear state and emit DELETE

row.


2) new option vs new connector

We recently simplified the table options to a minimum amount of

characters to be as concise as possible in the DDL.
I think this is the reason why we want to introduce a new

connector,

because we can simplify the options in DDL.
For example, if using a new option, the DDL may look like this:

CREATE TABLE users (
user_id BIGINT,
user_name STRING,
user_level STRING,
region STRING,
PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'model' = 'table',
'topic' = 'pageviews_per_region',
'properties.bootstrap.servers' = '...',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest',
'key.format' = 'csv',
'key.fields' = 'user_id',
'value.format' = 'avro',
'sink.partitioner' = 'hash'
);

If using a new connector, we can have a different default value for

the

options and remove unnecessary options,
the DDL can look like this which is much more concise:

CREATE TABLE pageviews_per_region (
user_id BIGINT,
user_name STRING,
user_level STRING,
region STRING,
PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
'connector' = 'kafka-compacted',
'topic' = 'pageviews_per_region',
'properties.bootstrap.servers' = '...',
'key.format' = 'csv',
'value.format' = 'avro'
);


When people read `connector=kafka-compacted` they might not know

that

it

has ktable semantics. You don't need to enable log compaction in

order

to use a KTable as far as I know.

We don't need to let users know it has ktable semantics, as

Konstantin

mentioned this may carry more implicit
meaning than we want to imply here. I agree users don't need to

enable

log

compaction, but from the production perspective,
log compaction should always be enabled if it is used in this

purpose.

Calling it "kafka-compacted" can even 

[jira] [Created] (FLINK-19774) Introduce sub partition view version for approximate Failover

2020-10-22 Thread Yuan Mei (Jira)
Yuan Mei created FLINK-19774:


 Summary: Introduce sub partition view version for approximate 
Failover
 Key: FLINK-19774
 URL: https://issues.apache.org/jira/browse/FLINK-19774
 Project: Flink
  Issue Type: Sub-task
Reporter: Yuan Mei






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19773) Exponential backoff restart strategy

2020-10-22 Thread Levi Ramsey (Jira)
Levi Ramsey created FLINK-19773:
---

 Summary: Exponential backoff restart strategy
 Key: FLINK-19773
 URL: https://issues.apache.org/jira/browse/FLINK-19773
 Project: Flink
  Issue Type: Improvement
Affects Versions: 1.11.2
Reporter: Levi Ramsey


There are situations where the current restart strategies (fixed-delay and 
failure-rate) seem to be suboptimal.  For example, in HDFS sinks, a delay 
between restarts shorter than the lease expiration time in HDFS is going to 
result in many restart attempts which fail, putting somewhat pointless stress 
on a cluster.  On the other hand, setting a delay of close to the lease 
expiration time will mean far more downtime than necessary when the cause of 
failure is something that works itself out quickly.

 

An exponential backoff restart strategy would address this.  For example a 
backoff strategy where the jobs are contending for a lease on a shared resource 
that terminates after 1200 seconds of inactivity might have successive delays 
of 1, 2, 4, 8, 16... 1024 seconds (after which a cumulative delay of more than 
1200 seconds has passed).

While not intrinsically tied to exponential backoff (it's more of an example of 
variable delay), in the case of many jobs failing due to an infrastructure 
failure, a thundering herd scenario can be mitigated by adding jitter to the 
delays, e.g. 0 -> 1 -> 2 -> 3/4/5 -> 5/6/7/8/9/10/11 seconds.  With this 
jitter, eventually a set of jobs competing to restart will spread out.

(logging the ticket more to start a discussion and perhaps get context around 
if this had been considered and rejected, etc.)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] FLIP-149: Introduce the KTable Connector

2020-10-22 Thread Jark Wu
Another name is "connector=upsert-kafka', I think this can solve Timo's
concern on the "compacted" word.

Materialize also uses "ENVELOPE UPSERT" [1] keyword to identify such kafka
sources.
I think "upsert" is a well-known terminology widely used in many systems
and matches the
 behavior of how we handle the kafka messages.

What do you think?

Best,
Jark

[1]:
https://materialize.io/docs/sql/create-source/text-kafka/#upsert-on-a-kafka-topic




On Thu, 22 Oct 2020 at 22:53, Kurt Young  wrote:

> Good validation messages can't solve the broken user experience, especially
> that
> such update mode option will implicitly make half of current kafka options
> invalid or doesn't
> make sense.
>
> Best,
> Kurt
>
>
> On Thu, Oct 22, 2020 at 10:31 PM Jark Wu  wrote:
>
> > Hi Timo, Seth,
> >
> > The default value "inserting" of "mode" might be not suitable,
> > because "debezium-json" emits changelog messages which include updates.
> >
> > On Thu, 22 Oct 2020 at 22:10, Seth Wiesman  wrote:
> >
> > > +1 for supporting upsert results into Kafka.
> > >
> > > I have no comments on the implementation details.
> > >
> > > As far as configuration goes, I tend to favor Timo's option where we
> add
> > a
> > > "mode" property to the existing Kafka table with default value
> > "inserting".
> > > If the mode is set to "updating" then the validation changes to the new
> > > requirements. I personally find it more intuitive than a seperate
> > > connector, my fear is users won't understand its the same physical
> kafka
> > > sink under the hood and it will lead to other confusion like does it
> > offer
> > > the same persistence guarantees? I think we are capable of adding good
> > > valdiation messaging that solves Jark and Kurts concerns.
> > >
> > >
> > > On Thu, Oct 22, 2020 at 8:51 AM Timo Walther 
> wrote:
> > >
> > > > Hi Jark,
> > > >
> > > > "calling it "kafka-compacted" can even remind users to enable log
> > > > compaction"
> > > >
> > > > But sometimes users like to store a lineage of changes in their
> topics.
> > > > Indepent of any ktable/kstream interpretation.
> > > >
> > > > I let the majority decide on this topic to not further block this
> > > > effort. But we might find a better name like:
> > > >
> > > > connector = kafka
> > > > mode = updating/inserting
> > > >
> > > > OR
> > > >
> > > > connector = kafka-updating
> > > >
> > > > ...
> > > >
> > > > Regards,
> > > > Timo
> > > >
> > > >
> > > >
> > > >
> > > > On 22.10.20 15:24, Jark Wu wrote:
> > > > > Hi Timo,
> > > > >
> > > > > Thanks for your opinions.
> > > > >
> > > > > 1) Implementation
> > > > > We will have an stateful operator to generate INSERT and
> > UPDATE_BEFORE.
> > > > > This operator is keyby-ed (primary key as the shuffle key) after
> the
> > > > source
> > > > > operator.
> > > > > The implementation of this operator is very similar to the existing
> > > > > `DeduplicateKeepLastRowFunction`.
> > > > > The operator will register a value state using the primary key
> fields
> > > as
> > > > > keys.
> > > > > When the value state is empty under current key, we will emit
> INSERT
> > > for
> > > > > the input row.
> > > > > When the value state is not empty under current key, we will emit
> > > > > UPDATE_BEFORE using the row in state,
> > > > > and emit UPDATE_AFTER using the input row.
> > > > > When the input row is DELETE, we will clear state and emit DELETE
> > row.
> > > > >
> > > > > 2) new option vs new connector
> > > > >> We recently simplified the table options to a minimum amount of
> > > > > characters to be as concise as possible in the DDL.
> > > > > I think this is the reason why we want to introduce a new
> connector,
> > > > > because we can simplify the options in DDL.
> > > > > For example, if using a new option, the DDL may look like this:
> > > > >
> > > > > CREATE TABLE users (
> > > > >user_id BIGINT,
> > > > >user_name STRING,
> > > > >user_level STRING,
> > > > >region STRING,
> > > > >PRIMARY KEY (user_id) NOT ENFORCED
> > > > > ) WITH (
> > > > >'connector' = 'kafka',
> > > > >'model' = 'table',
> > > > >'topic' = 'pageviews_per_region',
> > > > >'properties.bootstrap.servers' = '...',
> > > > >'properties.group.id' = 'testGroup',
> > > > >'scan.startup.mode' = 'earliest',
> > > > >'key.format' = 'csv',
> > > > >'key.fields' = 'user_id',
> > > > >'value.format' = 'avro',
> > > > >'sink.partitioner' = 'hash'
> > > > > );
> > > > >
> > > > > If using a new connector, we can have a different default value for
> > the
> > > > > options and remove unnecessary options,
> > > > > the DDL can look like this which is much more concise:
> > > > >
> > > > > CREATE TABLE pageviews_per_region (
> > > > >user_id BIGINT,
> > > > >user_name STRING,
> > > > >user_level STRING,
> > > > >region STRING,
> > > > >PRIMARY KEY (user_id) NOT ENFORCED
> > > > > ) WITH (
> > > > >'connector' = 'kafka-compacted',
> > > > >'topic' = 

Re: Un-ignored Parsing Exceptions in the CsvFormat

2020-10-22 Thread Austin Cawley-Edwards
Hey Roman,

Sorry to miss this -- thanks for the confirmation and making the ticket.
I'm happy to propose a fix if someone is able to assign the ticket to me.

Best,
Austin

On Mon, Oct 19, 2020 at 6:56 AM Khachatryan Roman <
khachatryan.ro...@gmail.com> wrote:

> Hey Austin,
>
> I think you are right. The problematic row contains an odd number of
> delimiters in which case skipFields will return -1, which in turn leads to
> an exception.
>
> I opened a bug ticket https://issues.apache.org/jira/browse/FLINK-19711
> to fix it.
>
> Regards,
> Roman
>
>
> On Fri, Oct 16, 2020 at 8:32 PM Austin Cawley-Edwards <
> austin.caw...@gmail.com> wrote:
>
>> Hey all,
>>
>> I'm ingesting CSV files with Flink 1.10.2 using SQL and the CSV
>> Format[1].
>>
>> Even with the `ignoreParseErrors()` set, the job fails when it encounters
>> some types of malformed rows. The root cause is indeed a `ParseException`,
>> so I'm wondering if there's anything more I need to do to ignore these
>> rows. Each field in the schema is a STRING.
>>
>>
>> I've configured the CSV format and table like so:
>>
>> tableEnv.connect(
>> new FileSystem()
>> .path(path)
>> )
>> .withFormat(
>> new Csv()
>> .quoteCharacter('"')
>> .ignoreParseErrors()
>> )
>> .withSchema(schema)
>> .inAppendMode()
>>
>>
>> Shot in the dark, but should `RowCsvInputFormat#parseRecord` have a check
>> to `isLenient()` if there is an unexpected parser position?[2]
>>
>> Example error:
>>
>> 2020-10-16 12:50:18
>> org.apache.flink.streaming.runtime.tasks.AsynchronousException: Caught
>> exception when processing split: null
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:1098)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:1066)
>> at
>> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:351)
>> Caused by: org.apache.flink.api.common.io.ParseException: Unexpected
>> parser position for column 1 of row '",
>> https://www.facebook.com/GoingOn-Networks-154758847925524/,https://www.linkedin.com/company/goingon,,
>> ""company,'
>> at
>> org.apache.flink.api.java.io.RowCsvInputFormat.parseRecord(RowCsvInputFormat.java:204)
>> at
>> org.apache.flink.api.java.io.CsvInputFormat.readRecord(CsvInputFormat.java:111)
>> at
>> org.apache.flink.api.common.io.DelimitedInputFormat.nextRecord(DelimitedInputFormat.java:520)
>> at
>> org.apache.flink.api.java.io.CsvInputFormat.nextRecord(CsvInputFormat.java:79)
>> at
>> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:329)
>>
>>
>> Thanks,
>> Austin
>>
>> [1]:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#csv-format
>> [2]:
>> https://github.com/apache/flink/blob/c09e959cf55c549ca4a3673f72deeb12a34e12f5/flink-java/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java#L203-L206
>>
>


[jira] [Created] (FLINK-19772) Add utility for retrying sync operations with exponential timeout

2020-10-22 Thread Chesnay Schepler (Jira)
Chesnay Schepler created FLINK-19772:


 Summary: Add utility for retrying sync operations with exponential 
timeout
 Key: FLINK-19772
 URL: https://issues.apache.org/jira/browse/FLINK-19772
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Coordination
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler
 Fix For: 1.12.0


The {{FutureUtils}} currently only allow retrying an operation with a fixed 
delay. There should also be variants that accept an exponential delay.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] FLIP-149: Introduce the KTable Connector

2020-10-22 Thread Kurt Young
Good validation messages can't solve the broken user experience, especially
that
such update mode option will implicitly make half of current kafka options
invalid or doesn't
make sense.

Best,
Kurt


On Thu, Oct 22, 2020 at 10:31 PM Jark Wu  wrote:

> Hi Timo, Seth,
>
> The default value "inserting" of "mode" might be not suitable,
> because "debezium-json" emits changelog messages which include updates.
>
> On Thu, 22 Oct 2020 at 22:10, Seth Wiesman  wrote:
>
> > +1 for supporting upsert results into Kafka.
> >
> > I have no comments on the implementation details.
> >
> > As far as configuration goes, I tend to favor Timo's option where we add
> a
> > "mode" property to the existing Kafka table with default value
> "inserting".
> > If the mode is set to "updating" then the validation changes to the new
> > requirements. I personally find it more intuitive than a seperate
> > connector, my fear is users won't understand its the same physical kafka
> > sink under the hood and it will lead to other confusion like does it
> offer
> > the same persistence guarantees? I think we are capable of adding good
> > valdiation messaging that solves Jark and Kurts concerns.
> >
> >
> > On Thu, Oct 22, 2020 at 8:51 AM Timo Walther  wrote:
> >
> > > Hi Jark,
> > >
> > > "calling it "kafka-compacted" can even remind users to enable log
> > > compaction"
> > >
> > > But sometimes users like to store a lineage of changes in their topics.
> > > Indepent of any ktable/kstream interpretation.
> > >
> > > I let the majority decide on this topic to not further block this
> > > effort. But we might find a better name like:
> > >
> > > connector = kafka
> > > mode = updating/inserting
> > >
> > > OR
> > >
> > > connector = kafka-updating
> > >
> > > ...
> > >
> > > Regards,
> > > Timo
> > >
> > >
> > >
> > >
> > > On 22.10.20 15:24, Jark Wu wrote:
> > > > Hi Timo,
> > > >
> > > > Thanks for your opinions.
> > > >
> > > > 1) Implementation
> > > > We will have an stateful operator to generate INSERT and
> UPDATE_BEFORE.
> > > > This operator is keyby-ed (primary key as the shuffle key) after the
> > > source
> > > > operator.
> > > > The implementation of this operator is very similar to the existing
> > > > `DeduplicateKeepLastRowFunction`.
> > > > The operator will register a value state using the primary key fields
> > as
> > > > keys.
> > > > When the value state is empty under current key, we will emit INSERT
> > for
> > > > the input row.
> > > > When the value state is not empty under current key, we will emit
> > > > UPDATE_BEFORE using the row in state,
> > > > and emit UPDATE_AFTER using the input row.
> > > > When the input row is DELETE, we will clear state and emit DELETE
> row.
> > > >
> > > > 2) new option vs new connector
> > > >> We recently simplified the table options to a minimum amount of
> > > > characters to be as concise as possible in the DDL.
> > > > I think this is the reason why we want to introduce a new connector,
> > > > because we can simplify the options in DDL.
> > > > For example, if using a new option, the DDL may look like this:
> > > >
> > > > CREATE TABLE users (
> > > >user_id BIGINT,
> > > >user_name STRING,
> > > >user_level STRING,
> > > >region STRING,
> > > >PRIMARY KEY (user_id) NOT ENFORCED
> > > > ) WITH (
> > > >'connector' = 'kafka',
> > > >'model' = 'table',
> > > >'topic' = 'pageviews_per_region',
> > > >'properties.bootstrap.servers' = '...',
> > > >'properties.group.id' = 'testGroup',
> > > >'scan.startup.mode' = 'earliest',
> > > >'key.format' = 'csv',
> > > >'key.fields' = 'user_id',
> > > >'value.format' = 'avro',
> > > >'sink.partitioner' = 'hash'
> > > > );
> > > >
> > > > If using a new connector, we can have a different default value for
> the
> > > > options and remove unnecessary options,
> > > > the DDL can look like this which is much more concise:
> > > >
> > > > CREATE TABLE pageviews_per_region (
> > > >user_id BIGINT,
> > > >user_name STRING,
> > > >user_level STRING,
> > > >region STRING,
> > > >PRIMARY KEY (user_id) NOT ENFORCED
> > > > ) WITH (
> > > >'connector' = 'kafka-compacted',
> > > >'topic' = 'pageviews_per_region',
> > > >'properties.bootstrap.servers' = '...',
> > > >'key.format' = 'csv',
> > > >'value.format' = 'avro'
> > > > );
> > > >
> > > >> When people read `connector=kafka-compacted` they might not know
> that
> > it
> > > >> has ktable semantics. You don't need to enable log compaction in
> order
> > > >> to use a KTable as far as I know.
> > > > We don't need to let users know it has ktable semantics, as
> Konstantin
> > > > mentioned this may carry more implicit
> > > > meaning than we want to imply here. I agree users don't need to
> enable
> > > log
> > > > compaction, but from the production perspective,
> > > > log compaction should always be enabled if it is used in this
> purpose.
> > > > Calling it "kafka-compacted" can even remind 

Re: [DISCUSS] FLIP-149: Introduce the KTable Connector

2020-10-22 Thread Jark Wu
Hi Timo, Seth,

The default value "inserting" of "mode" might be not suitable,
because "debezium-json" emits changelog messages which include updates.

On Thu, 22 Oct 2020 at 22:10, Seth Wiesman  wrote:

> +1 for supporting upsert results into Kafka.
>
> I have no comments on the implementation details.
>
> As far as configuration goes, I tend to favor Timo's option where we add a
> "mode" property to the existing Kafka table with default value "inserting".
> If the mode is set to "updating" then the validation changes to the new
> requirements. I personally find it more intuitive than a seperate
> connector, my fear is users won't understand its the same physical kafka
> sink under the hood and it will lead to other confusion like does it offer
> the same persistence guarantees? I think we are capable of adding good
> valdiation messaging that solves Jark and Kurts concerns.
>
>
> On Thu, Oct 22, 2020 at 8:51 AM Timo Walther  wrote:
>
> > Hi Jark,
> >
> > "calling it "kafka-compacted" can even remind users to enable log
> > compaction"
> >
> > But sometimes users like to store a lineage of changes in their topics.
> > Indepent of any ktable/kstream interpretation.
> >
> > I let the majority decide on this topic to not further block this
> > effort. But we might find a better name like:
> >
> > connector = kafka
> > mode = updating/inserting
> >
> > OR
> >
> > connector = kafka-updating
> >
> > ...
> >
> > Regards,
> > Timo
> >
> >
> >
> >
> > On 22.10.20 15:24, Jark Wu wrote:
> > > Hi Timo,
> > >
> > > Thanks for your opinions.
> > >
> > > 1) Implementation
> > > We will have an stateful operator to generate INSERT and UPDATE_BEFORE.
> > > This operator is keyby-ed (primary key as the shuffle key) after the
> > source
> > > operator.
> > > The implementation of this operator is very similar to the existing
> > > `DeduplicateKeepLastRowFunction`.
> > > The operator will register a value state using the primary key fields
> as
> > > keys.
> > > When the value state is empty under current key, we will emit INSERT
> for
> > > the input row.
> > > When the value state is not empty under current key, we will emit
> > > UPDATE_BEFORE using the row in state,
> > > and emit UPDATE_AFTER using the input row.
> > > When the input row is DELETE, we will clear state and emit DELETE row.
> > >
> > > 2) new option vs new connector
> > >> We recently simplified the table options to a minimum amount of
> > > characters to be as concise as possible in the DDL.
> > > I think this is the reason why we want to introduce a new connector,
> > > because we can simplify the options in DDL.
> > > For example, if using a new option, the DDL may look like this:
> > >
> > > CREATE TABLE users (
> > >user_id BIGINT,
> > >user_name STRING,
> > >user_level STRING,
> > >region STRING,
> > >PRIMARY KEY (user_id) NOT ENFORCED
> > > ) WITH (
> > >'connector' = 'kafka',
> > >'model' = 'table',
> > >'topic' = 'pageviews_per_region',
> > >'properties.bootstrap.servers' = '...',
> > >'properties.group.id' = 'testGroup',
> > >'scan.startup.mode' = 'earliest',
> > >'key.format' = 'csv',
> > >'key.fields' = 'user_id',
> > >'value.format' = 'avro',
> > >'sink.partitioner' = 'hash'
> > > );
> > >
> > > If using a new connector, we can have a different default value for the
> > > options and remove unnecessary options,
> > > the DDL can look like this which is much more concise:
> > >
> > > CREATE TABLE pageviews_per_region (
> > >user_id BIGINT,
> > >user_name STRING,
> > >user_level STRING,
> > >region STRING,
> > >PRIMARY KEY (user_id) NOT ENFORCED
> > > ) WITH (
> > >'connector' = 'kafka-compacted',
> > >'topic' = 'pageviews_per_region',
> > >'properties.bootstrap.servers' = '...',
> > >'key.format' = 'csv',
> > >'value.format' = 'avro'
> > > );
> > >
> > >> When people read `connector=kafka-compacted` they might not know that
> it
> > >> has ktable semantics. You don't need to enable log compaction in order
> > >> to use a KTable as far as I know.
> > > We don't need to let users know it has ktable semantics, as Konstantin
> > > mentioned this may carry more implicit
> > > meaning than we want to imply here. I agree users don't need to enable
> > log
> > > compaction, but from the production perspective,
> > > log compaction should always be enabled if it is used in this purpose.
> > > Calling it "kafka-compacted" can even remind users to enable log
> > compaction.
> > >
> > > I don't agree to introduce "model = table/stream" option, or
> > > "connector=kafka-table",
> > > because this means we are introducing Table vs Stream concept from
> KSQL.
> > > However, we don't have such top-level concept in Flink SQL now, this
> will
> > > further confuse users.
> > > In Flink SQL, all the things are STREAM, the differences are whether it
> > is
> > > bounded or unbounded,
> > >   whether it is insert-only or changelog.
> > >
> > >
> > > Best,
> > > 

Re: [DISCUSS] FLIP-149: Introduce the KTable Connector

2020-10-22 Thread Seth Wiesman
+1 for supporting upsert results into Kafka.

I have no comments on the implementation details.

As far as configuration goes, I tend to favor Timo's option where we add a
"mode" property to the existing Kafka table with default value "inserting".
If the mode is set to "updating" then the validation changes to the new
requirements. I personally find it more intuitive than a seperate
connector, my fear is users won't understand its the same physical kafka
sink under the hood and it will lead to other confusion like does it offer
the same persistence guarantees? I think we are capable of adding good
valdiation messaging that solves Jark and Kurts concerns.


On Thu, Oct 22, 2020 at 8:51 AM Timo Walther  wrote:

> Hi Jark,
>
> "calling it "kafka-compacted" can even remind users to enable log
> compaction"
>
> But sometimes users like to store a lineage of changes in their topics.
> Indepent of any ktable/kstream interpretation.
>
> I let the majority decide on this topic to not further block this
> effort. But we might find a better name like:
>
> connector = kafka
> mode = updating/inserting
>
> OR
>
> connector = kafka-updating
>
> ...
>
> Regards,
> Timo
>
>
>
>
> On 22.10.20 15:24, Jark Wu wrote:
> > Hi Timo,
> >
> > Thanks for your opinions.
> >
> > 1) Implementation
> > We will have an stateful operator to generate INSERT and UPDATE_BEFORE.
> > This operator is keyby-ed (primary key as the shuffle key) after the
> source
> > operator.
> > The implementation of this operator is very similar to the existing
> > `DeduplicateKeepLastRowFunction`.
> > The operator will register a value state using the primary key fields as
> > keys.
> > When the value state is empty under current key, we will emit INSERT for
> > the input row.
> > When the value state is not empty under current key, we will emit
> > UPDATE_BEFORE using the row in state,
> > and emit UPDATE_AFTER using the input row.
> > When the input row is DELETE, we will clear state and emit DELETE row.
> >
> > 2) new option vs new connector
> >> We recently simplified the table options to a minimum amount of
> > characters to be as concise as possible in the DDL.
> > I think this is the reason why we want to introduce a new connector,
> > because we can simplify the options in DDL.
> > For example, if using a new option, the DDL may look like this:
> >
> > CREATE TABLE users (
> >user_id BIGINT,
> >user_name STRING,
> >user_level STRING,
> >region STRING,
> >PRIMARY KEY (user_id) NOT ENFORCED
> > ) WITH (
> >'connector' = 'kafka',
> >'model' = 'table',
> >'topic' = 'pageviews_per_region',
> >'properties.bootstrap.servers' = '...',
> >'properties.group.id' = 'testGroup',
> >'scan.startup.mode' = 'earliest',
> >'key.format' = 'csv',
> >'key.fields' = 'user_id',
> >'value.format' = 'avro',
> >'sink.partitioner' = 'hash'
> > );
> >
> > If using a new connector, we can have a different default value for the
> > options and remove unnecessary options,
> > the DDL can look like this which is much more concise:
> >
> > CREATE TABLE pageviews_per_region (
> >user_id BIGINT,
> >user_name STRING,
> >user_level STRING,
> >region STRING,
> >PRIMARY KEY (user_id) NOT ENFORCED
> > ) WITH (
> >'connector' = 'kafka-compacted',
> >'topic' = 'pageviews_per_region',
> >'properties.bootstrap.servers' = '...',
> >'key.format' = 'csv',
> >'value.format' = 'avro'
> > );
> >
> >> When people read `connector=kafka-compacted` they might not know that it
> >> has ktable semantics. You don't need to enable log compaction in order
> >> to use a KTable as far as I know.
> > We don't need to let users know it has ktable semantics, as Konstantin
> > mentioned this may carry more implicit
> > meaning than we want to imply here. I agree users don't need to enable
> log
> > compaction, but from the production perspective,
> > log compaction should always be enabled if it is used in this purpose.
> > Calling it "kafka-compacted" can even remind users to enable log
> compaction.
> >
> > I don't agree to introduce "model = table/stream" option, or
> > "connector=kafka-table",
> > because this means we are introducing Table vs Stream concept from KSQL.
> > However, we don't have such top-level concept in Flink SQL now, this will
> > further confuse users.
> > In Flink SQL, all the things are STREAM, the differences are whether it
> is
> > bounded or unbounded,
> >   whether it is insert-only or changelog.
> >
> >
> > Best,
> > Jark
> >
> >
> > On Thu, 22 Oct 2020 at 20:39, Timo Walther  wrote:
> >
> >> Hi Shengkai, Hi Jark,
> >>
> >> thanks for this great proposal. It is time to finally connect the
> >> changelog processor with a compacted Kafka topic.
> >>
> >> "The operator will produce INSERT rows, or additionally generate
> >> UPDATE_BEFORE rows for the previous image, or produce DELETE rows with
> >> all columns filled with values."
> >>
> >> Could you elaborate a bit on the implementation 

Re: NullPointerException when trying to read null array in Postgres using JDBC Connector

2020-10-22 Thread Dylan Forciea
Danny,

Thanks! I have created a new JIRA issue [1]. I’ll look into how hard it is to 
get a patch and unit test myself, although I may need a hand on the process of 
making a change to both the master branch and a release branch if it is desired 
to get a fix into 1.11.

Regards,
Dylan Forciea

[1] https://issues.apache.org/jira/browse/FLINK-19771

From: Danny Chan 
Date: Thursday, October 22, 2020 at 4:34 AM
To: Dylan Forciea 
Cc: Flink ML 
Subject: Re: NullPointerException when trying to read null array in Postgres 
using JDBC Connector

Yes, the current code throws directly for NULLs, can you log an issue there ?

Dylan Forciea mailto:dy...@oseberg.io>> 于2020年10月21日周三 
上午4:30写道:
I believe I am getting an error because I have a nullable postgres array of 
text that is set to NULL that I’m reading using the JDBC SQL Connector. Is this 
something that should be allowed? Looking at the source code line below, it 
doesn’t look like the case of an array being null would be handled.

[error] Caused by: java.io.IOException: Couldn't access resultSet
[error]   at 
org.apache.flink.connector.jdbc.table.JdbcRowDataInputFormat.nextRecord(JdbcRowDataInputFormat.java:266)
[error]   at 
org.apache.flink.connector.jdbc.table.JdbcRowDataInputFormat.nextRecord(JdbcRowDataInputFormat.java:57)
[error]   at 
org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:91)
[error]   at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
[error]   at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
[error]   at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213)
[error] Caused by: java.lang.NullPointerException
[error]   at 
org.apache.flink.connector.jdbc.internal.converter.PostgresRowConverter.lambda$createPostgresArrayConverter$c06ce9f4$2(PostgresRowConverter.java:97)
[error]   at 
org.apache.flink.connector.jdbc.internal.converter.AbstractJdbcRowConverter.toInternal(AbstractJdbcRowConverter.java:79)
[error]   at 
org.apache.flink.connector.jdbc.table.JdbcRowDataInputFormat.nextRecord(JdbcRowDataInputFormat.java:259)
[error]   ... 5 more

Thanks,
Dylan Forciea


[jira] [Created] (FLINK-19771) NullPointerException when accessing null array from postgres in JDBC Connector

2020-10-22 Thread Dylan Forciea (Jira)
Dylan Forciea created FLINK-19771:
-

 Summary: NullPointerException when accessing null array from 
postgres in JDBC Connector
 Key: FLINK-19771
 URL: https://issues.apache.org/jira/browse/FLINK-19771
 Project: Flink
  Issue Type: Bug
  Components: Connectors / JDBC
Affects Versions: 1.11.2
Reporter: Dylan Forciea


When trying to utilize the JDBC Connector for Postgres, I tried to read in a 
text array. When a row that was null was attempted to be read in, the connector 
threw an exception and execution stopped. It appears looking at the source code 
that if the row is null that it will still attempt to grab the contents out: 
https://github.com/apache/flink/blob/release-1.11.2/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/converter/PostgresRowConverter.java#L97

The stack trace is as follows:
{code:java}
[error] Caused by: java.io.IOException: Couldn't access resultSet[error]   at 
org.apache.flink.connector.jdbc.table.JdbcRowDataInputFormat.nextRecord(JdbcRowDataInputFormat.java:266)[error]
   at 
org.apache.flink.connector.jdbc.table.JdbcRowDataInputFormat.nextRecord(JdbcRowDataInputFormat.java:57)[error]
   at 
org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:91)[error]
   at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)[error]
   at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)[error]
   at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213)[error]
 Caused by: java.lang.NullPointerException[error]   at 
org.apache.flink.connector.jdbc.internal.converter.PostgresRowConverter.lambda$createPostgresArrayConverter$c06ce9f4$2(PostgresRowConverter.java:97)[error]
   at 
org.apache.flink.connector.jdbc.internal.converter.AbstractJdbcRowConverter.toInternal(AbstractJdbcRowConverter.java:79)[error]
   at 
org.apache.flink.connector.jdbc.table.JdbcRowDataInputFormat.nextRecord(JdbcRowDataInputFormat.java:259)[error]
   ... 5 more {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] FLIP-149: Introduce the KTable Connector

2020-10-22 Thread Timo Walther

Hi Jark,

"calling it "kafka-compacted" can even remind users to enable log 
compaction"


But sometimes users like to store a lineage of changes in their topics. 
Indepent of any ktable/kstream interpretation.


I let the majority decide on this topic to not further block this 
effort. But we might find a better name like:


connector = kafka
mode = updating/inserting

OR

connector = kafka-updating

...

Regards,
Timo




On 22.10.20 15:24, Jark Wu wrote:

Hi Timo,

Thanks for your opinions.

1) Implementation
We will have an stateful operator to generate INSERT and UPDATE_BEFORE.
This operator is keyby-ed (primary key as the shuffle key) after the source
operator.
The implementation of this operator is very similar to the existing
`DeduplicateKeepLastRowFunction`.
The operator will register a value state using the primary key fields as
keys.
When the value state is empty under current key, we will emit INSERT for
the input row.
When the value state is not empty under current key, we will emit
UPDATE_BEFORE using the row in state,
and emit UPDATE_AFTER using the input row.
When the input row is DELETE, we will clear state and emit DELETE row.

2) new option vs new connector

We recently simplified the table options to a minimum amount of

characters to be as concise as possible in the DDL.
I think this is the reason why we want to introduce a new connector,
because we can simplify the options in DDL.
For example, if using a new option, the DDL may look like this:

CREATE TABLE users (
   user_id BIGINT,
   user_name STRING,
   user_level STRING,
   region STRING,
   PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
   'connector' = 'kafka',
   'model' = 'table',
   'topic' = 'pageviews_per_region',
   'properties.bootstrap.servers' = '...',
   'properties.group.id' = 'testGroup',
   'scan.startup.mode' = 'earliest',
   'key.format' = 'csv',
   'key.fields' = 'user_id',
   'value.format' = 'avro',
   'sink.partitioner' = 'hash'
);

If using a new connector, we can have a different default value for the
options and remove unnecessary options,
the DDL can look like this which is much more concise:

CREATE TABLE pageviews_per_region (
   user_id BIGINT,
   user_name STRING,
   user_level STRING,
   region STRING,
   PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
   'connector' = 'kafka-compacted',
   'topic' = 'pageviews_per_region',
   'properties.bootstrap.servers' = '...',
   'key.format' = 'csv',
   'value.format' = 'avro'
);


When people read `connector=kafka-compacted` they might not know that it
has ktable semantics. You don't need to enable log compaction in order
to use a KTable as far as I know.

We don't need to let users know it has ktable semantics, as Konstantin
mentioned this may carry more implicit
meaning than we want to imply here. I agree users don't need to enable log
compaction, but from the production perspective,
log compaction should always be enabled if it is used in this purpose.
Calling it "kafka-compacted" can even remind users to enable log compaction.

I don't agree to introduce "model = table/stream" option, or
"connector=kafka-table",
because this means we are introducing Table vs Stream concept from KSQL.
However, we don't have such top-level concept in Flink SQL now, this will
further confuse users.
In Flink SQL, all the things are STREAM, the differences are whether it is
bounded or unbounded,
  whether it is insert-only or changelog.


Best,
Jark


On Thu, 22 Oct 2020 at 20:39, Timo Walther  wrote:


Hi Shengkai, Hi Jark,

thanks for this great proposal. It is time to finally connect the
changelog processor with a compacted Kafka topic.

"The operator will produce INSERT rows, or additionally generate
UPDATE_BEFORE rows for the previous image, or produce DELETE rows with
all columns filled with values."

Could you elaborate a bit on the implementation details in the FLIP? How
are UPDATE_BEFOREs are generated. How much state is required to perform
this operation.

  From a conceptual and semantical point of view, I'm fine with the
proposal. But I would like to share my opinion about how we expose this
feature:

ktable vs kafka-compacted

I'm against having an additional connector like `ktable` or
`kafka-compacted`. We recently simplified the table options to a minimum
amount of characters to be as concise as possible in the DDL. Therefore,
I would keep the `connector=kafka` and introduce an additional option.
Because a user wants to read "from Kafka". And the "how" should be
determined in the lower options.

When people read `connector=ktable` they might not know that this is
Kafka. Or they wonder where `kstream` is?

When people read `connector=kafka-compacted` they might not know that it
has ktable semantics. You don't need to enable log compaction in order
to use a KTable as far as I know. Log compaction and table semantics are
orthogonal topics.

In the end we will need 3 types of information when declaring a Kafka
connector:

CREATE TABLE ... WITH (
connector=kafka   

Re: Need help in creating Flink Streaming s3 Job for multiple path reader one by one

2020-10-22 Thread Chesnay Schepler
The existing ContinuousFileMonitoringFunction and 
ContinuousFileReaderOperator already take care of that.
Unless you aren't re-implementing them from scratch you shouldn't have 
to do anything.


On 10/22/2020 1:47 PM, Satyaa Dixit wrote:

Hi Chesnay,
Thanks for your support.It helped a lot. I need one more help on how to do
checkpointing as part of the s3 reader source in case if some failure
happens due to OutOfMemoryError exception or it could be any other failure,
and want to recover the data from last reader splitted offset during
restart the job in continuation of the previous job in order to avoid
duplicate data.

Thanks,
Satya

On Thu, Oct 15, 2020 at 3:29 PM Chesnay Schepler  wrote:


hmm...I don't see an easy way.
You may have to replicated StreamExecutionEnvironment#createFileInput and
create a custom ContinuousFileMonitoringFunction that ignores missing
files in it's run() method.

Alternatively, use some library to check the existence of the S3
directories before creating the sources.

On 10/15/2020 11:49 AM, Satyaa Dixit wrote:

Hi Chesnay/Team,

Thanks, we got the fix for our problem but got stuck with the below

issue,

request your support.


How to catch FileNotFoundException during runtime,if any directory is
missing in s3 as part of the below source code to avoid job failure.


s3PathList.stream().map(directory ->
S3Service.createInputStream(environment, directory, readerParallelism))
.reduce(DataStream::union).map(joinedStream ->
joinedStream.addSink(kafkaProducer));





Regards,

Satya

On Wed, Oct 14, 2020 at 8:57 PM Satyaa Dixit 

wrote:

Hi Chesnay/Team

Thank you so much.I have tried with the solution but it is not working

as

expected showing compilation issues and tried all the ways .Please find
below code snippet :

s3PathList.stream()
.map(directory -> S3Service.customCreateInputStream(environment,
directory, readerParallelism))
.reduce(DataStream::union).map(joinedStream -> stream.flatMap(new
IntermidiateOperator()).map(joinedStream ->
joinedStream.addSink(kafkaProducer).name("Publish to " +

kafkaTopicName));

public static class IntermidiateOperator implements
FlatMapFunction {
private static final ObjectMapper objectMapper1 = new ObjectMapper();

@Override
public void flatMap(String value, Collector out) throws

Exception {

Test m = objectMapper1.readValue(value, Test.class);
System.out.println("Json string:: --" + m);
// logger.info("Json string:: --"+m);
out.collect(value);
}
}

Also just to clarify one doubt , How to handle *FileNotFoundException*

as

part of flink reader during runtime if in case directory is not

available

in s3. How to avoid job failure in that use case.

Regards,
Satya

On Tue, Oct 13, 2020 at 11:15 AM Satyaa Dixit 
wrote:


Thanks, I'll check it out.

On Mon, Oct 12, 2020 at 1:29 PM Chesnay Schepler 
wrote:


1) There's no mechanism in the API to restrict the number of  number

of

readers across several sources. I can't quite think of a way to

achieve

this; maybe Kostas has an idea.

2) You're mixing  up the Java Streams and Finks DataStream API.

Try this:

s3PathList.stream()
.map(...)
.reduce(...)
.map(joinedStream -> stream.map(new FlatMapFunction...))
.map(joinedStream->  joinedStream.addSink...)

On 10/12/2020 6:05 AM, Satyaa Dixit wrote:

Hi Team,

Could you please help me here. I’m sorry for asking on such short

notice

but my work has stopped due to this.


Regards,
Satya

On Fri, 9 Oct 2020 at 8:53 PM, Satyaa Dixit 
wrote:


Hi  Shesnay/Team,

Thank you so much for the reply.In the continuation of the previous
email, below is the block diagram where I am reading the file  from

s3 and

pushing it to kafka.Now with the current setup, I have total 4

directory

based on the readfile method  from flink environment ,we are

creating 4

readers parallely to process the data from s3 .

Below are my Questions:
1. Can we restrict the no. of readers to process the  data parallely.
e.g let's say if  we have a thousand of directory , in that case i

want to

restrict the no. of readers to 10 and ten parallel threads will

continue

with 100 sequential reading of the directory per thread to consume

the data

.

2.In between the two flink operators i.e s3 reader and kafka sink , i
just want to implement one more operator in order to transform the

data

which i am reading from s3 bucket and then want to push into the

kafka

sink. Below is my working code.Here i am finding  difficulties to
implement  map operator in order to transform the union of

datastreams  by

applying union method over each directory's reader before pushing to

kafka.

List s3PathList =

S3Service.getListOfS3Paths(finalParameters);

s3PathList.stream()
.map(directory -> S3Service.customInputStream(environment, directory,
readerParallelism))
.reduce(DataStream::union)
.map(joinedStream ->

joinedStream.addSink(kafkaProducer).name("Publish

to " + kafkaTopicName));


*Something like this I'm trying to do in order to achieve the above

use

case by applying 

Re: [DISCUSS] FLIP-149: Introduce the KTable Connector

2020-10-22 Thread Jark Wu
Hi Timo,

Thanks for your opinions.

1) Implementation
We will have an stateful operator to generate INSERT and UPDATE_BEFORE.
This operator is keyby-ed (primary key as the shuffle key) after the source
operator.
The implementation of this operator is very similar to the existing
`DeduplicateKeepLastRowFunction`.
The operator will register a value state using the primary key fields as
keys.
When the value state is empty under current key, we will emit INSERT for
the input row.
When the value state is not empty under current key, we will emit
UPDATE_BEFORE using the row in state,
and emit UPDATE_AFTER using the input row.
When the input row is DELETE, we will clear state and emit DELETE row.

2) new option vs new connector
> We recently simplified the table options to a minimum amount of
characters to be as concise as possible in the DDL.
I think this is the reason why we want to introduce a new connector,
because we can simplify the options in DDL.
For example, if using a new option, the DDL may look like this:

CREATE TABLE users (
  user_id BIGINT,
  user_name STRING,
  user_level STRING,
  region STRING,
  PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
  'connector' = 'kafka',
  'model' = 'table',
  'topic' = 'pageviews_per_region',
  'properties.bootstrap.servers' = '...',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'earliest',
  'key.format' = 'csv',
  'key.fields' = 'user_id',
  'value.format' = 'avro',
  'sink.partitioner' = 'hash'
);

If using a new connector, we can have a different default value for the
options and remove unnecessary options,
the DDL can look like this which is much more concise:

CREATE TABLE pageviews_per_region (
  user_id BIGINT,
  user_name STRING,
  user_level STRING,
  region STRING,
  PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
  'connector' = 'kafka-compacted',
  'topic' = 'pageviews_per_region',
  'properties.bootstrap.servers' = '...',
  'key.format' = 'csv',
  'value.format' = 'avro'
);

> When people read `connector=kafka-compacted` they might not know that it
> has ktable semantics. You don't need to enable log compaction in order
> to use a KTable as far as I know.
We don't need to let users know it has ktable semantics, as Konstantin
mentioned this may carry more implicit
meaning than we want to imply here. I agree users don't need to enable log
compaction, but from the production perspective,
log compaction should always be enabled if it is used in this purpose.
Calling it "kafka-compacted" can even remind users to enable log compaction.

I don't agree to introduce "model = table/stream" option, or
"connector=kafka-table",
because this means we are introducing Table vs Stream concept from KSQL.
However, we don't have such top-level concept in Flink SQL now, this will
further confuse users.
In Flink SQL, all the things are STREAM, the differences are whether it is
bounded or unbounded,
 whether it is insert-only or changelog.


Best,
Jark


On Thu, 22 Oct 2020 at 20:39, Timo Walther  wrote:

> Hi Shengkai, Hi Jark,
>
> thanks for this great proposal. It is time to finally connect the
> changelog processor with a compacted Kafka topic.
>
> "The operator will produce INSERT rows, or additionally generate
> UPDATE_BEFORE rows for the previous image, or produce DELETE rows with
> all columns filled with values."
>
> Could you elaborate a bit on the implementation details in the FLIP? How
> are UPDATE_BEFOREs are generated. How much state is required to perform
> this operation.
>
>  From a conceptual and semantical point of view, I'm fine with the
> proposal. But I would like to share my opinion about how we expose this
> feature:
>
> ktable vs kafka-compacted
>
> I'm against having an additional connector like `ktable` or
> `kafka-compacted`. We recently simplified the table options to a minimum
> amount of characters to be as concise as possible in the DDL. Therefore,
> I would keep the `connector=kafka` and introduce an additional option.
> Because a user wants to read "from Kafka". And the "how" should be
> determined in the lower options.
>
> When people read `connector=ktable` they might not know that this is
> Kafka. Or they wonder where `kstream` is?
>
> When people read `connector=kafka-compacted` they might not know that it
> has ktable semantics. You don't need to enable log compaction in order
> to use a KTable as far as I know. Log compaction and table semantics are
> orthogonal topics.
>
> In the end we will need 3 types of information when declaring a Kafka
> connector:
>
> CREATE TABLE ... WITH (
>connector=kafka-- Some information about the connector
>end-offset =   -- Some information about the boundedness
>model = table/stream   -- Some information about interpretation
> )
>
>
> We can still apply all the constraints mentioned in the FLIP. When
> `model` is set to `table`.
>
> What do you think?
>
> Regards,
> Timo
>
>
> On 21.10.20 14:19, Jark Wu wrote:
> > Hi,
> >
> > IMO, if we are going to mix them in 

[jira] [Created] (FLINK-19770) mvn clean verify - testConfigurePythonExecution failing

2020-10-22 Thread Juha Mynttinen (Jira)
Juha Mynttinen created FLINK-19770:
--

 Summary: mvn clean verify - testConfigurePythonExecution failing
 Key: FLINK-19770
 URL: https://issues.apache.org/jira/browse/FLINK-19770
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.11.2
Reporter: Juha Mynttinen


The PR [https://github.com/apache/flink/pull/13322] lately added the test 
method  testConfigurePythonExecution in 
org.apache.flink.client.cli.PythonProgramOptionsTest.
 
"mvn clean verify" fails for me in  testConfigurePythonExecution:
 
...
INFO] Running org.apache.flink.client.cli.PythonProgramOptionsTest
[ERROR] Tests run: 3, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 0.433 s 
<<< FAILURE! - in org.apache.flink.client.cli.PythonProgramOptionsTest
[ERROR] 
testConfigurePythonExecution(org.apache.flink.client.cli.PythonProgramOptionsTest)
  Time elapsed: 0.019 s  <<< ERROR!
java.nio.file.NoSuchFileException: target/dummy-job-jar
at 
java.base/sun.nio.fs.UnixException.translateToIOException(UnixException.java:92)
at 
java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:111)
at 
java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:116)
at 
java.base/sun.nio.fs.UnixFileAttributeViews$Basic.readAttributes(UnixFileAttributeViews.java:55)
at 
java.base/sun.nio.fs.UnixFileSystemProvider.readAttributes(UnixFileSystemProvider.java:149)
at 
java.base/sun.nio.fs.LinuxFileSystemProvider.readAttributes(LinuxFileSystemProvider.java:99)
at java.base/java.nio.file.Files.readAttributes(Files.java:1763)
at java.base/java.nio.file.FileTreeWalker.getAttributes(FileTreeWalker.java:219)
at java.base/java.nio.file.FileTreeWalker.visit(FileTreeWalker.java:276)
at java.base/java.nio.file.FileTreeWalker.walk(FileTreeWalker.java:322)
at java.base/java.nio.file.Files.walkFileTree(Files.java:2716)
at java.base/java.nio.file.Files.walkFileTree(Files.java:2796)
at 
org.apache.flink.client.cli.PythonProgramOptionsTest.testConfigurePythonExecution(PythonProgramOptionsTest.java:131)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.junit.runners.Suite.runChild(Suite.java:128)
at org.junit.runners.Suite.runChild(Suite.java:27)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.apache.maven.surefire.junitcore.JUnitCore.run(JUnitCore.java:55)
at 
org.apache.maven.surefire.junitcore.JUnitCoreWrapper.createRequestAndRun(JUnitCoreWrapper.java:137)
at 
org.apache.maven.surefire.junitcore.JUnitCoreWrapper.executeEager(JUnitCoreWrapper.java:107)
at 
org.apache.maven.surefire.junitcore.JUnitCoreWrapper.execute(JUnitCoreWrapper.java:83)
at 
org.apache.maven.surefire.junitcore.JUnitCoreWrapper.execute(JUnitCoreWrapper.java:75)
at 
org.apache.maven.surefire.junitcore.JUnitCoreProvider.invoke(JUnitCoreProvider.java:158)
at 
org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:384)
at 
org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:345)
at org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:126)
at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:418)
...
[ERROR] Errors:
[ERROR]   

[jira] [Created] (FLINK-19769) Reuse StreamRecord in SourceOutputWithWatermarks#collect

2020-10-22 Thread Caizhi Weng (Jira)
Caizhi Weng created FLINK-19769:
---

 Summary: Reuse StreamRecord in SourceOutputWithWatermarks#collect
 Key: FLINK-19769
 URL: https://issues.apache.org/jira/browse/FLINK-19769
 Project: Flink
  Issue Type: Improvement
  Components: API / DataStream
Reporter: Caizhi Weng
 Fix For: 1.12.0


{{SourceOutputWithWatermarks#collect}} always creates a new {{StreamRecord}} 
object which can be reused quite easily. We should reuse the {{StreamRecord}} 
for optimization.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] FLIP-149: Introduce the KTable Connector

2020-10-22 Thread Timo Walther

Hi Shengkai, Hi Jark,

thanks for this great proposal. It is time to finally connect the 
changelog processor with a compacted Kafka topic.


"The operator will produce INSERT rows, or additionally generate 
UPDATE_BEFORE rows for the previous image, or produce DELETE rows with 
all columns filled with values."


Could you elaborate a bit on the implementation details in the FLIP? How 
are UPDATE_BEFOREs are generated. How much state is required to perform 
this operation.


From a conceptual and semantical point of view, I'm fine with the 
proposal. But I would like to share my opinion about how we expose this 
feature:


ktable vs kafka-compacted

I'm against having an additional connector like `ktable` or 
`kafka-compacted`. We recently simplified the table options to a minimum 
amount of characters to be as concise as possible in the DDL. Therefore, 
I would keep the `connector=kafka` and introduce an additional option. 
Because a user wants to read "from Kafka". And the "how" should be 
determined in the lower options.


When people read `connector=ktable` they might not know that this is 
Kafka. Or they wonder where `kstream` is?


When people read `connector=kafka-compacted` they might not know that it 
has ktable semantics. You don't need to enable log compaction in order 
to use a KTable as far as I know. Log compaction and table semantics are 
orthogonal topics.


In the end we will need 3 types of information when declaring a Kafka 
connector:


CREATE TABLE ... WITH (
  connector=kafka-- Some information about the connector
  end-offset =   -- Some information about the boundedness
  model = table/stream   -- Some information about interpretation
)


We can still apply all the constraints mentioned in the FLIP. When 
`model` is set to `table`.


What do you think?

Regards,
Timo


On 21.10.20 14:19, Jark Wu wrote:

Hi,

IMO, if we are going to mix them in one connector,
1) either users need to set some options to a specific value explicitly,
e.g. "scan.startup.mode=earliest", "sink.partitioner=hash", etc..
This makes the connector awkward to use. Users may face to fix options one
by one according to the exception.
Besides, in the future, it is still possible to use
"sink.partitioner=fixed" (reduce network cost) if users are aware of
the partition routing,
however, it's error-prone to have "fixed" as default for compacted mode.

2) or make those options a different default value when "compacted=true".
This would be more confusing and unpredictable if the default value of
options will change according to other options.
What happens if we have a third mode in the future?

In terms of usage and options, it's very different from the
original "kafka" connector.
It would be more handy to use and less fallible if separating them into two
connectors.
In the implementation layer, we can reuse code as much as possible.

Therefore, I'm still +1 to have a new connector.
The "kafka-compacted" name sounds good to me.

Best,
Jark


On Wed, 21 Oct 2020 at 17:58, Konstantin Knauf  wrote:


Hi Kurt, Hi Shengkai,

thanks for answering my questions and the additional clarifications. I
don't have a strong opinion on whether to extend the "kafka" connector or
to introduce a new connector. So, from my perspective feel free to go with
a separate connector. If we do introduce a new connector I wouldn't call it
"ktable" for aforementioned reasons (In addition, we might suggest that
there is also a "kstreams" connector for symmetry reasons). I don't have a
good alternative name, though, maybe "kafka-compacted" or
"compacted-kafka".

Thanks,

Konstantin


On Wed, Oct 21, 2020 at 4:43 AM Kurt Young  wrote:


Hi all,

I want to describe the discussion process which drove us to have such
conclusion, this might make some of
the design choices easier to understand and keep everyone on the same

page.


Back to the motivation, what functionality do we want to provide in the
first place? We got a lot of feedback and
questions from mailing lists that people want to write Not-Insert-Only
messages into kafka. They might be
intentional or by accident, e.g. wrote an non-windowed aggregate query or
non-windowed left outer join. And
some users from KSQL world also asked about why Flink didn't leverage the
Key concept of every kafka topic
and make kafka as a dynamic changing keyed table.

To work with kafka better, we were thinking to extend the functionality

of

the current kafka connector by letting it
accept updates and deletions. But due to the limitation of kafka, the
update has to be "update by key", aka a table
with primary key.

This introduces a couple of conflicts with current kafka table's options:
1. key.fields: as said above, we need the kafka table to have the primary
key constraint. And users can also configure
key.fields freely, this might cause friction. (Sure we can do some sanity
check on this but it also creates friction.)
2. sink.partitioner: to make the semantics right, we need to make sure

all


Re: Need help in creating Flink Streaming s3 Job for multiple path reader one by one

2020-10-22 Thread Satyaa Dixit
Hi Chesnay,
Thanks for your support.It helped a lot. I need one more help on how to do
checkpointing as part of the s3 reader source in case if some failure
happens due to OutOfMemoryError exception or it could be any other failure,
and want to recover the data from last reader splitted offset during
restart the job in continuation of the previous job in order to avoid
duplicate data.

Thanks,
Satya

On Thu, Oct 15, 2020 at 3:29 PM Chesnay Schepler  wrote:

> hmm...I don't see an easy way.
> You may have to replicated StreamExecutionEnvironment#createFileInput and
> create a custom ContinuousFileMonitoringFunction that ignores missing
> files in it's run() method.
>
> Alternatively, use some library to check the existence of the S3
> directories before creating the sources.
>
> On 10/15/2020 11:49 AM, Satyaa Dixit wrote:
> > Hi Chesnay/Team,
> >
> > Thanks, we got the fix for our problem but got stuck with the below
> issue,
> > request your support.
> >
> >
> > How to catch FileNotFoundException during runtime,if any directory is
> > missing in s3 as part of the below source code to avoid job failure.
> >
> >
> > s3PathList.stream().map(directory ->
> > S3Service.createInputStream(environment, directory, readerParallelism))
> > .reduce(DataStream::union).map(joinedStream ->
> > joinedStream.addSink(kafkaProducer));
> >
> >
> >
> >
> >
> > Regards,
> >
> > Satya
> >
> > On Wed, Oct 14, 2020 at 8:57 PM Satyaa Dixit 
> wrote:
> >
> >> Hi Chesnay/Team
> >>
> >> Thank you so much.I have tried with the solution but it is not working
> as
> >> expected showing compilation issues and tried all the ways .Please find
> >> below code snippet :
> >>
> >> s3PathList.stream()
> >> .map(directory -> S3Service.customCreateInputStream(environment,
> >> directory, readerParallelism))
> >> .reduce(DataStream::union).map(joinedStream -> stream.flatMap(new
> >> IntermidiateOperator()).map(joinedStream ->
> >> joinedStream.addSink(kafkaProducer).name("Publish to " +
> kafkaTopicName));
> >>
> >> public static class IntermidiateOperator implements
> >> FlatMapFunction {
> >> private static final ObjectMapper objectMapper1 = new ObjectMapper();
> >>
> >> @Override
> >> public void flatMap(String value, Collector out) throws
> Exception {
> >> Test m = objectMapper1.readValue(value, Test.class);
> >> System.out.println("Json string:: --" + m);
> >> // logger.info("Json string:: --"+m);
> >> out.collect(value);
> >> }
> >> }
> >>
> >> Also just to clarify one doubt , How to handle *FileNotFoundException*
> as
> >> part of flink reader during runtime if in case directory is not
> available
> >> in s3. How to avoid job failure in that use case.
> >>
> >> Regards,
> >> Satya
> >>
> >> On Tue, Oct 13, 2020 at 11:15 AM Satyaa Dixit 
> >> wrote:
> >>
> >>> Thanks, I'll check it out.
> >>>
> >>> On Mon, Oct 12, 2020 at 1:29 PM Chesnay Schepler 
> >>> wrote:
> >>>
>  1) There's no mechanism in the API to restrict the number of  number
> of
>  readers across several sources. I can't quite think of a way to
> achieve
>  this; maybe Kostas has an idea.
> 
>  2) You're mixing  up the Java Streams and Finks DataStream API.
> 
>  Try this:
> 
>  s3PathList.stream()
>  .map(...)
>  .reduce(...)
>  .map(joinedStream -> stream.map(new FlatMapFunction...))
>  .map(joinedStream->  joinedStream.addSink...)
> 
>  On 10/12/2020 6:05 AM, Satyaa Dixit wrote:
> 
>  Hi Team,
> 
>  Could you please help me here. I’m sorry for asking on such short
> notice
>  but my work has stopped due to this.
> 
> 
>  Regards,
>  Satya
> 
>  On Fri, 9 Oct 2020 at 8:53 PM, Satyaa Dixit 
>  wrote:
> 
> > Hi  Shesnay/Team,
> >
> > Thank you so much for the reply.In the continuation of the previous
> > email, below is the block diagram where I am reading the file  from
> s3 and
> > pushing it to kafka.Now with the current setup, I have total 4
> directory
> > based on the readfile method  from flink environment ,we are
> creating 4
> > readers parallely to process the data from s3 .
> >
> > Below are my Questions:
> > 1. Can we restrict the no. of readers to process the  data parallely.
> > e.g let's say if  we have a thousand of directory , in that case i
> want to
> > restrict the no. of readers to 10 and ten parallel threads will
> continue
> > with 100 sequential reading of the directory per thread to consume
> the data
> > .
> >
> > 2.In between the two flink operators i.e s3 reader and kafka sink , i
> > just want to implement one more operator in order to transform the
> data
> > which i am reading from s3 bucket and then want to push into the
> kafka
> > sink. Below is my working code.Here i am finding  difficulties to
> > implement  map operator in order to transform the union of
> datastreams  by
> > applying union method over each directory's reader before pushing 

[VOTE] NEW FLIP-102: Add More Metrics to TaskManager

2020-10-22 Thread Yadong Xie
Hi all

I want to start a new vote for FLIP-102, which proposes to add more metrics
to the task manager in web UI.

The new FLIP-102 was revisited and adapted following the old ML discussion

.

Thanks to Matthias and Lining's effort, more metrics are available. We can
match most of the effective configuration to the metrics just as Flink Doc

describes now.

The vote will last for at least 72 hours, following the consensus voting.


FLIP-102 wiki:
https://cwiki.apache.org/confluence/display/FLINK/FLIP-102%3A+Add+More+Metrics+to+TaskManager


Discussion thread:
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-FLIP-102-Add-More-Metrics-to-TaskManager-td37898.html
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-75-Flink-Web-UI-Improvement-Proposal-td33540.html

Thanks,

Yadong


Re: [VOTE] FLIP-102: Add More Metrics to TaskManager

2020-10-22 Thread Yadong Xie
Hi all

there have been lots of discussions since the vote started and many
suggestions have been made

Matthias and I had updated the FLIP-102

following the suggestions and discussions

I want to cancel the vote here and start a new one, thanks

Matthias Pohl  于2020年8月21日周五 上午3:33写道:

> Good points, Andrey. Thanks for clarification. I made some minor
> adaptations to the FLIP now:
> - Renamed the `resource` member into `configuration` and made it a
> top-level member besides `metrics` and `hardware` since it's not fitting
> the volatile metrics context that well.
> - I restructured the table under Proposed Changes to cover Metaspace now.
> Additionally, I renamed `shuffle` into `network` to match the memory model
> of FLIP-49.
> - The UI in the screenshot picture has a bug: The counts of Direct and
> Mapped are accompanied by a memory unit even though they are plain counts.
>
> On Thu, Aug 20, 2020 at 4:10 PM Andrey Zagrebin 
> wrote:
>
> > Hi All,
> >
> > Thanks for reviving the discussion, Matthias!
> >
> > This would mean that we could adapt the current proposal to replace the
> > > Nonheap usage pane by a pane displaying the Metaspace usage.
> > >
> > I do not know the value of having the Nonheap usage in metrics. I can see
> > that the metaspace metric can be interesting for the users to debug OOMs.
> > We had the Nonheap usage before, so as discussed, I would be a bit
> careful
> > removing. I believe it deserves a separate poll in user ML
> > whether the Nonheap usage is useless or not.
> > As a current solution, we could keep both or merge them into one box
> with a
> > slash, like Metaspace/Nonheap -> 5Mb/10Mb, if the majority agrees that
> this
> > is not confusing and clear that the metaspace is a part of Nonheap.
> >
>
> That would be a good solution representing both metrics. I adapted the
> table in FLIP-102's Confluence accordingly for now to have it visualized.
> Let's see what others are thinking about it.
>
>
> >
> > Btw, the "Nonheap" in the configuration box of the current FLIP-102 is
> > probably incorrect or confusing as it does not one-to-one correspond to
> the
> > Nonheap JVM metric.
> >
> > The only issue I see is that JVM Overhead would still not be represented
> in
> > > the memory usage
> > > overview.
> >
> > My understanding is that we do not need a usage metric for JVM Overhead
> as
> > it is a virtual unmanaged component which is more about configuring the
> max
> > total process memory.
> >
> > Is there a reason for us to introduce a nested structure
> > > TaskManagerMetricsInfo in the response object? I would rather keep it
> > > consistent in a flat structure instead, i.e. having all the members of
> > > TaskManagerResourceInfo being members of TaskManagerMetricsInfo
> >
> > I would suggest introducing a separate REST call for
> > TaskManagerResourceInfo.
> > Semantically, TaskManagerResourceInfo is more about the TM configuration
> > and it is not directly related to the usage metrics.
> > In future, I would avoid having calls with many responsibilities and
> maybe
> > consider splitting the 'TM details' call into metrics etc unless there
> is a
> > concern for having to do more calls instead of one from UI.
> >
>
> Good point. The growing size of the JSON response record might make it
> worth splitting it up into different endpoints serving different groups of
> data (e.g. /metrics for volatile values and /configuration for static
> ones).
>
>
> >
> > Alternatively, one could think of grouping the metrics collecting the
> > > different values (i.e. max, used, committed) per metric in a JSON
> object.
> > > But this would apply for all the other metrics of
> TaskManagerMetricsInfo
> > > as
> > > well.
> >
> > I would personally prefer this for metrics but I am not pushing for this.
> >
> > metrics.resource.managedMemory and metrics.resource.networkMemory have
> > > counterparts in metrics.networkMemory[Used|Total] and
> > > metrics.managedMemory[Used|Total]: Is this redundant data or do they
> have
> > > different semantics?
> >
> > As I understand, they have different semantics. The later is about
> > configuration, the former is about current usage metrics.
> >
>
> I see. Makes sense.
>
> >
> > Is metrics.resource.totalProcessMemory a basic sum over all provided
> > > values?
> >
> > this is again about configuration, I do not think it makes sense to come
> up
> > with a usage metric for the totalProcessMemory component.
> >
>
> Got it.
>
>
> > Best,
> > Andrey
> >
> >
> > On Thu, Aug 20, 2020 at 9:06 AM Matthias  wrote:
> >
> > > Hi Jing,
> > > I recently joined Ververica and started looking into FLIP-102. I'm
> trying
> > > to
> > > figure out how we would implement the proposal on the backend side.
> > > I looked into the proposal for the REST API response and a few
> questions
> > > popped up:
> > > - Is there a reason for us to introduce a nested structure
> > > 

[jira] [Created] (FLINK-19768) The shell "./yarn-session.sh " not use log4j-session.properties , it use log4j.properties

2020-10-22 Thread YUJIANBO (Jira)
YUJIANBO created FLINK-19768:


 Summary: The shell  "./yarn-session.sh " not use 
log4j-session.properties , it use log4j.properties
 Key: FLINK-19768
 URL: https://issues.apache.org/jira/browse/FLINK-19768
 Project: Flink
  Issue Type: Bug
  Components: Client / Job Submission
Affects Versions: 1.11.2
Reporter: YUJIANBO


The shell  "./yarn-session.sh " not use log4j-session.properties , it use 
log4j.properties

My Flink Job UI shows  the $internal.yarn.log-config-file  is  
"/usr/local/flink-1.11.2/conf/log4j.properties",is it a bug?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19767) Add AbstractSlotPoolFactory

2020-10-22 Thread Chesnay Schepler (Jira)
Chesnay Schepler created FLINK-19767:


 Summary: Add AbstractSlotPoolFactory
 Key: FLINK-19767
 URL: https://issues.apache.org/jira/browse/FLINK-19767
 Project: Flink
  Issue Type: Task
  Components: Runtime / Coordination
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler
 Fix For: 1.12.0


FLINK-19314 introduces another slot pool implementation, with the factory for 
it being a carbon copy of the factory for the existing slot pool factory.

We can introduce an abstract factory class to reduce code duplication.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19766) Introduce File streaming compaction operators

2020-10-22 Thread Jingsong Lee (Jira)
Jingsong Lee created FLINK-19766:


 Summary: Introduce File streaming compaction operators
 Key: FLINK-19766
 URL: https://issues.apache.org/jira/browse/FLINK-19766
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Runtime
Reporter: Jingsong Lee
Assignee: Jingsong Lee
 Fix For: 1.12.0


Introduce CompactCoordinator and CompactOperator.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19765) flink SqlUseCatalog.getCatalogName is not unified with SqlCreateCatalog and SqlDropCatalog

2020-10-22 Thread jackylau (Jira)
jackylau created FLINK-19765:


 Summary: flink SqlUseCatalog.getCatalogName is not unified with 
SqlCreateCatalog and SqlDropCatalog
 Key: FLINK-19765
 URL: https://issues.apache.org/jira/browse/FLINK-19765
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.11.0
Reporter: jackylau
 Fix For: 1.12.0


when i develop flink ranger plugin at operation level, i find this method not 
unified.

And SqlToOperationConverter.convert needs has the good order for user to find 
code.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19764) Add More Metrics to TaskManager in Web UI

2020-10-22 Thread Yadong Xie (Jira)
Yadong Xie created FLINK-19764:
--

 Summary: Add More Metrics to TaskManager in Web UI
 Key: FLINK-19764
 URL: https://issues.apache.org/jira/browse/FLINK-19764
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Web Frontend
Reporter: Yadong Xie


update the UI since https://issues.apache.org/jira/browse/FLINK-14422 and 
https://issues.apache.org/jira/browse/FLINK-14406 has been fixed



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19763) Missing test MetricUtilsTest.testNonHeapMetricUsageNotStatic

2020-10-22 Thread Matthias (Jira)
Matthias created FLINK-19763:


 Summary: Missing test 
MetricUtilsTest.testNonHeapMetricUsageNotStatic
 Key: FLINK-19763
 URL: https://issues.apache.org/jira/browse/FLINK-19763
 Project: Flink
  Issue Type: Test
Affects Versions: 1.11.2, 1.10.2
Reporter: Matthias
 Fix For: 1.12.0


We have tests for the heap and metaspace to check whether the metric is 
dynamically generated. The test for the non-heap space is missing. There was a 
test added in [296107e|https://github.com/apache/flink/commit/296107e] but 
reverted in [2d86256|https://github.com/apache/flink/commit/2d86256] as it 
appeared that the test is partially failing.

We should might want to add the test again fixing the issue.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19762) Selecting Job-ID in web UI covers more than the ID

2020-10-22 Thread Matthias (Jira)
Matthias created FLINK-19762:


 Summary: Selecting Job-ID in web UI covers more than the ID
 Key: FLINK-19762
 URL: https://issues.apache.org/jira/browse/FLINK-19762
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Web Frontend
Affects Versions: 1.11.2, 1.10.2
Reporter: Matthias
 Fix For: 1.12.0
 Attachments: Screenshot 2020-10-22 at 09.47.41.png

Not only the ID is selected when trying to copy the Job ID from the web UI by 
double-clicking it. See the attached screenshot.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19761) Add lookup method for registered ShuffleDescriptor in ShuffleMaster

2020-10-22 Thread Xuannan Su (Jira)
Xuannan Su created FLINK-19761:
--

 Summary: Add lookup method for registered ShuffleDescriptor in 
ShuffleMaster
 Key: FLINK-19761
 URL: https://issues.apache.org/jira/browse/FLINK-19761
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Network
Reporter: Xuannan Su


Currently, the ShuffleMaster can register a partition and get the shuffle 
descriptor. However, it lacks the ability to look up the registered 
ShuffleDescriptors belongs to an IntermediateResult by the 
IntermediateDataSetID.
Adding the lookup method to the ShuffleMaster can make reusing the cluster 
partition more easily. For example, we don't have to return the 
ShuffleDescriptor to the client just so that the other job can somehow encode 
the ShuffleDescriptor in the JobGraph to consume the cluster partition. 
Instead, we only need to return the IntermediateDatSetID and use it to lookup 
the ShuffleDescriptor by another job.
By adding the lookup method in ShuffleMaster, if we have an external shuffle 
service and the lifecycle of the IntermediateResult is not bounded to the 
cluster, we can look up the ShuffleDescriptor and reuse the IntermediateResult 
by a job running on another cluster even if the cluster that produced the 
IntermediateResult is shutdown.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19760) Make the `GlobalCommitter` a standalone interface that does not extend the `Committer`

2020-10-22 Thread Guowei Ma (Jira)
Guowei Ma created FLINK-19760:
-

 Summary: Make the `GlobalCommitter` a standalone interface that 
does not extend the `Committer`
 Key: FLINK-19760
 URL: https://issues.apache.org/jira/browse/FLINK-19760
 Project: Flink
  Issue Type: Sub-task
Reporter: Guowei Ma






--
This message was sent by Atlassian Jira
(v8.3.4#803005)