Flink 1.9 Sql Rowtime Error

2019-11-01 Thread Polarisary
Hi All:
I have define kafka connector Descriptor, and registe Table

tEnv.connect(new Kafka()
.version("universal")
.topic(tableName)
.startFromEarliest()
.property("zookeeper.connect", “xxx")
.property("bootstrap.servers", “xxx")
.property("group.id", “xxx"))
.withFormat(new Json().deriveSchema())
.withSchema(new Schema()
.field("rowtime", Types.SQL_TIMESTAMP)
.rowtime(new Rowtime()
.timestampsFromField("createTime")
.watermarksPeriodicBounded(300_000))
.field("data", Types.ROW(dataFieldTypes)))
.inAppendMode().registerTableSource(tableName);


kafka input is:  
{
   "data": [
  18140781,
   ],
   "createTime": 1572577137596
}
Exception as follows:
Caused by: java.io.IOException: Failed to deserialize JSON object.
at 
org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:129)
at 
org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:72)
at 
org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:45)
at 
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:140)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:203)
Caused by: java.time.format.DateTimeParseException: Text '1553080631582' could 
not be parsed at index 0
at 
java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1949)
at java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1777)
at 
org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$createTimestampConverter$1dee6515$1(JsonRowDeserializationSchema.java:334)
at 
org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:232)
at 
org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:403)
at 
org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:382)
at 
org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:232)
at 
org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:127)
... 7 more

polaris...@gmail.com






Re: Flink 1.9 Sql Rowtime Error

2019-11-01 Thread OpenInx
Hi Polarisary.

Checked the flink codebase and your stacktraces, seems you need to format
the timestamp as :  "-MM-dd'T'HH:mm:ss.SSS'Z'"

The code is here:
https://github.com/apache/flink/blob/38e4e2b8f9bc63a793a2bddef5a578e3f80b7376/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java#L340

On Fri, Nov 1, 2019 at 3:50 PM Polarisary  wrote:

> Hi All:
> I have define kafka connector Descriptor, and registe Table
>
> tEnv.connect(new Kafka()
> .version("universal")
> .topic(tableName)
> .startFromEarliest()
> .property("zookeeper.connect", *“*xxx")
> .property("bootstrap.servers", *“*xxx")
> .property("group.id", *“*xxx"))
> .withFormat(new Json().deriveSchema())
> .withSchema(new Schema()
>
> .field("rowtime", Types.SQL_TIMESTAMP)
> .rowtime(new Rowtime()
> .timestampsFromField("createTime")
> .watermarksPeriodicBounded(300_000))
> .field("data", Types.ROW(dataFieldTypes)))
>
> .inAppendMode().registerTableSource(tableName);
>
>
>
> kafka input is:
> {
>
>"data": [
>   18140781,
>],
>"createTime": 1572577137596
> }
>
> Exception as follows:
> Caused by: java.io.IOException: Failed to deserialize JSON object.
> at
> org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:129)
> at
> org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:72)
> at
> org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:45)
> at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:140)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:203)
> Caused by: java.time.format.DateTimeParseException: Text '1553080631582'
> could not be parsed at index 0
> at
> java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1949)
> at java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1777)
> at
> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$createTimestampConverter$1dee6515$1(JsonRowDeserializationSchema.java:334)
> at
> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:232)
> at
> org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:403)
> at
> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:382)
> at
> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:232)
> at
> org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:127)
> ... 7 more
>
> polaris...@gmail.com
>
>
>
>
>


Re: RemoteEnvironment cannot execute job from local.

2019-11-01 Thread Till Rohrmann
No it is the expected behaviour. As I've said, you should give the
createRemoteEnvironment the user code jar of your program. Otherwise Flink
cannot find your filter function. Hence, it works if you comment it out
because it is not needed.

Cheers,
Till

On Thu, Oct 31, 2019 at 11:41 AM Simon Su  wrote:

> Hi Till
> Thanks for your reply. Actually I modify the code like this:
> I commented the filter part, and re-run the code, then it works well !!
> The jar passed to createRemoteEnvironment is a udf jar, which does not
> contain my code
> My flink version is 1.9.0, So I’m confused about the actual behaviors of
> ‘createRemoteEnvironment’. is it a potential bugs?
>
> ExecutionEnvironment env = ExecutionEnvironment
> .createRemoteEnvironment("localhost", 8081, “/tmp/udfs.jar");
>
> DataSet data = env.readTextFile("/tmp/file");
>
> data
> //.filter(new FilterFunction() {
> //public boolean filter(String value) {
> //return value.startsWith("http://";);
> //}
> //})
> .writeAsText("/tmp/file313");
>
> env.execute();
>
>
> Thanks,
> SImon
>
> On 10/31/2019 17:23,Till Rohrmann
>  wrote:
>
> In order to run the program on a remote cluster from the IDE you need to
> first build the jar containing your user code. This jar needs to passed
> to createRemoteEnvironment() so that the Flink client knows which jar to
> upload. Hence, please make sure that /tmp/myudf.jar contains your user code.
>
> Cheers,
> Till
>
> On Thu, Oct 31, 2019 at 9:01 AM Simon Su  wrote:
>
>>
>> Hi all
>>I want to test to submit a job from my local IDE and I deployed a
>> Flink cluster in my vm.
>>Here is my code from Flink 1.9 document and add some of my parameters.
>>
>> public static void main(String[] args) throws Exception {
>> ExecutionEnvironment env = ExecutionEnvironment
>> .createRemoteEnvironment("localhost", 8081, "/tmp/myudf.jar");
>>
>> DataSet data = env.readTextFile("/tmp/file");
>>
>> data
>> .filter(new FilterFunction() {
>> public boolean filter(String value) {
>> return value.startsWith("http://";);
>> }
>> })
>> .writeAsText("/tmp/file1");
>>
>> env.execute();
>> }
>>
>> When I run the program, I raises the error like:
>>
>> Exception in thread "main"
>> org.apache.flink.client.program.ProgramInvocationException: Job failed.
>> (JobID: 1f32190552e955bb2048c31930edfb0e)
>> at
>> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:262)
>> at
>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:338)
>> at
>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:326)
>> at
>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:301)
>> at
>> org.apache.flink.client.RemoteExecutor.executePlanWithJars(RemoteExecutor.java:209)
>> at
>> org.apache.flink.client.RemoteExecutor.executePlan(RemoteExecutor.java:186)
>> at
>> org.apache.flink.api.java.RemoteEnvironment.execute(RemoteEnvironment.java:173)
>> at
>> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:820)
>> at TestMain.main(TestMain.java:25)
>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
>> execution failed.
>> at
>> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
>> at
>> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:259)
>> ... 8 more
>> *Caused by: java.lang.RuntimeException: The initialization of the
>> DataSource's outputs caused an error: Could not read the user code wrapper:
>> TestMain$1*
>> at
>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:109)
>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>> at java.lang.Thread.run(Thread.java:748)
>> *Caused by:
>> org.apache.flink.runtime.operators.util.CorruptConfigurationException:
>> Could not read the user code wrapper: TestMain$1*
>> at
>> org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:290)
>> at
>> org.apache.flink.runtime.operators.BatchTask.instantiateUserCode(BatchTask.java:1448)
>> at
>> org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.setup(ChainedFlatMapDriver.java:39)
>> at
>> org.apache.flink.runtime.operators.chaining.ChainedDriver.setup(ChainedDriver.java:90)
>> at
>> org.apache.flink.runtime.operators.BatchTask.initOutputs(BatchTask.java:1315)
>> at
>> org.apache.flink.runtime.operators.DataSourceTask.initOutputs(DataSourceTask.java:317)
>> at
>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:106)
>> ... 3 more
>> *Caused by: java.lang.ClassNotFoundException: TestMain$1*
>> at java.net.URLClassLoader.findClass(URLClas

Re: The RMClient's and YarnResourceManagers internal state about the number of pending container requests has diverged

2019-11-01 Thread Till Rohrmann
Hi Regina,

at the moment the community works towards the 1.10 release with a lot of
features trying to be completed. The intended feature freeze is end of
November. Due to this it is quite hard to tell when exactly this problem
will be properly fixed but we'll try our best.

Cheers,
Till

On Thu, Oct 31, 2019 at 4:59 PM Chan, Regina  wrote:

> Yeah I saw FLINK-13184 earlier and started watching it. I can see the
> second optimization being helpful too in a large cluster. I’ll be watching
> this as well. Do you have an estimate as to turn around time? Would be
> helpful planning-wise.
>
>
>
>
>
> *From:* Yang Wang 
> *Sent:* Thursday, October 31, 2019 4:08 AM
> *To:* Chan, Regina [Engineering] 
> *Cc:* Till Rohrmann ; user 
> *Subject:* Re: The RMClient's and YarnResourceManagers internal state
> about the number of pending container requests has diverged
>
>
>
> I think till's analysis is right. I just want to share more information.
>
>
>
> After dive into the logs of Flink resource manager and Yarn resource
> manager, i found that the excess
>
> containers come from two sides.
>
>
>
> ** Yarn Container Allocation Mechanism **
>
> Receive containers more than allocated is inescapable. Imagine that we
> want to allocate 120 containers
>
> from Yarn. The size of container request in the *heartbeat1* will be 120.
> When Yarn RM received the
>
> request and can not allocate any container because of not enough resource.
> So the allocated containers in
>
> response of *heartbeat1 *will be 0. The Flink resource manager does not
> get any containers and will
>
> set the size of container request in *heartbeat2 *to 120. However, Yarn
> resource manager has allocated
>
> 120 containers between *heartbeat1* to *heartbeat2*. When Yarn Resource
> Manager receives *heartbeat2*, it will
>
> set the 120 containers to response of *heartbeat2*. And it start to
> allocate for the new request of 120. Since
>
> Flink resource manager has received all containers, it will set the size
> of container request in *heartbeat3* to 0.
>
> Yarn Resource Manager allocate 100 containers between *heartbeat2* to
> *heartbeat3*, it will set the 100 containers
>
> to response of *heartbeat3*. So Flink Resource Manager gets the 100
> excess containers.
>
>
>
> Note: Heartbeat means the heartbeat between Flink resource manager(Yarn
> client) and Yarn resource manager.
>
>
>
>
>
> ** Flink resource manager allocates more than it really needs **
>
> Now in the onContainersAllocated of FlinkYarnResourceManager, we iterate
> through each container.
>
> And each process will take more than 50ms. The most time cost at
> uploading {uuid}-taskmanager-conf.yaml to hdfs
>
> and starting container. So if the allocated containers are more than 10,
> FlinkYarnResourceManager could not remove
>
> container request timely and will allocate more than it really needs.
>
>
>
>
>
> The first cause of Yarn, we could not do anything more from Flink.
> However, for the second, we could reduce the time
>
> costof each allocated container so that FlinkYarnResource will allocate as
> it really need.  We could have two optimizations
>
> here. The first is use NMClientAsync instead of NMClient to reduce the
> start container time.[1] The
>
> second is *do not *upload {uuid}-taskmanager-conf.yaml, use java options
> or environments instead. [2]
>
>
>
>
>
>
>
>
>
> 1.https://issues.apache.org/jira/browse/FLINK-13184
> 
>
> 2. https://issues.apache.org/jira/browse/FLINK-14582
> 
>
>
>
> Chan, Regina  于2019年10月31日周四 上午5:09写道:
>
> Just to provide a little bit of context, this behavior is highly
> problematic since we run these jobs at scale. This one job when running on
> 1.6 over allocated *2500* containers. On 1.9, with a one-minute heartbeat
> interval, we were able to bring that number of excess containers down to
> 230. My fear is that 230 excess containers is due to the fact that we also
> moved this to a smaller cluster so that it doesn’t have the potential of
> causing wider impact it did on the main cluster. We have over 70K jobs
> running in a day so imagine how bad this could become so I definitely
> appreciate your attention to this.
>
>
>
> I’m open to a minimum and max number of TaskExecutors, the max number is
> probably the biggest concern. Can help test this whenever it’s ready and
> again greatly appreciate it.
>
>
>
> Separately I think this loosely ties to into another thread

Re: low performance in running queries

2019-11-01 Thread Habib Mostafaei
I used streaming WordCount provided by Flink and the file contains text 
like "This is some text...". I just copied several times.


Best,

Habib

On 11/1/2019 6:03 AM, Zhenghua Gao wrote:

2019-10-30 15:59:52,122 INFO  org.apache.flink.runtime.taskmanager.Task
 - Split Reader: Custom File Source -> Flat Map (1/1) 
(6a17c410c3e36f524bb774d2dffed4a4) switched from DEPLOYING to RUNNING.
2019-10-30 17:45:10,943 INFO  org.apache.flink.runtime.taskmanager.Task
 - Split Reader: Custom File Source -> Flat Map (1/1) 
(6a17c410c3e36f524bb774d2dffed4a4) switched from RUNNING to FINISHED.
It's surprise that the source task uses 95 mins to read a 2G file.
Could you give me your code snippets and some sample lines of the 2G file?
I will try to reproduce your scenario and dig the root causes.
*Best Regards,*
*Zhenghua Gao*


On Thu, Oct 31, 2019 at 9:05 PM Habib Mostafaei 
mailto:ha...@inet.tu-berlin.de>> wrote:


I enclosed all logs from the run and for this run I used
parallelism one. However, for other runs I checked and found that
all parallel workers were working properly. Is there a simple way
to get profiling information in Flink?

Best,

Habib

On 10/31/2019 2:54 AM, Zhenghua Gao wrote:

I think more runtime information would help figure
outwheretheproblem is.
1) how many parallelisms actually working
2) the metrics for each operator
3) the jvm profiling information, etc

*Best Regards,*
*Zhenghua Gao*


On Wed, Oct 30, 2019 at 8:25 PM Habib Mostafaei
mailto:ha...@inet.tu-berlin.de>> wrote:

Thanks Gao for the reply. I used the parallelism parameter
with different values like 6 and 8 but still the execution
time is not comparable with a single threaded python script.
What would be the reasonable value for the parallelism?

Best,

Habib

On 10/30/2019 1:17 PM, Zhenghua Gao wrote:

The reason might be the parallelism of your task is only 1,
that's too low.
See [1] to specify proper parallelism  for your job, and the
execution time should be reduced significantly.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/parallel.html

*Best Regards,*
*Zhenghua Gao*


On Tue, Oct 29, 2019 at 9:27 PM Habib Mostafaei
mailto:ha...@inet.tu-berlin.de>>
wrote:

Hi all,

I am running Flink on a standalone cluster and getting
very long
execution time for the streaming queries like WordCount
for a fixed text
file. My VM runs on a Debian 10 with 16 cpu cores and
32GB of RAM. I
have a text file with size of 2GB. When I run the Flink
on a standalone
cluster, i.e., one JobManager and one taskManager with
25GB of heapsize,
it took around two hours to finish counting this file
while a simple
python script can do it in around 7 minutes. Just
wondering what is
wrong with my setup. I ran the experiments on a cluster
with six
taskManagers, but I still get very long execution time
like 25 minutes
or so. I tried to increase the JVM heap size to have
lower execution
time but it did not help. I attached the log file and
the Flink
configuration file to this email.

Best,

Habib





Re: low performance in running queries

2019-11-01 Thread Piotr Nowojski
Hi,

>  Is there a simple way to get profiling information in Flink?

Flink doesn’t provide any special tooling for that. Just use your chosen 
profiler, for example: Oracle’s Mission Control (free on non production 
clusters, no need to install anything if already using Oracle’s JVM), VisualVM 
(I think free), YourKit (paid). For each one of them there is a plenty of 
online support how to use them both for local and remote profiling.

Piotrek

> On 31 Oct 2019, at 14:05, Habib Mostafaei  wrote:
> 
> I enclosed all logs from the run and for this run I used parallelism one. 
> However, for other runs I checked and found that all parallel workers were 
> working properly. Is there a simple way to get profiling information in Flink?
> 
> Best,
> 
> Habib
> 
> On 10/31/2019 2:54 AM, Zhenghua Gao wrote:
>> I think more runtime information would help figure out where the problem is.
>> 1) how many parallelisms actually working
>> 2) the metrics for each operator
>> 3) the jvm profiling information, etc
>> 
>> Best Regards,
>> Zhenghua Gao
>> 
>> 
>> On Wed, Oct 30, 2019 at 8:25 PM Habib Mostafaei > > wrote:
>> Thanks Gao for the reply. I used the parallelism parameter with different 
>> values like 6 and 8 but still the execution time is not comparable with a 
>> single threaded python script. What would be the reasonable value for the 
>> parallelism?
>> 
>> Best,
>> 
>> Habib
>> 
>> On 10/30/2019 1:17 PM, Zhenghua Gao wrote:
>>> The reason might be the parallelism of your task is only 1, that's too low.
>>> See [1] to specify proper parallelism  for your job, and the execution time 
>>> should be reduced significantly.
>>> 
>>> [1] 
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/parallel.html 
>>> 
>>> 
>>> Best Regards,
>>> Zhenghua Gao
>>> 
>>> 
>>> On Tue, Oct 29, 2019 at 9:27 PM Habib Mostafaei >> > wrote:
>>> Hi all,
>>> 
>>> I am running Flink on a standalone cluster and getting very long 
>>> execution time for the streaming queries like WordCount for a fixed text 
>>> file. My VM runs on a Debian 10 with 16 cpu cores and 32GB of RAM. I 
>>> have a text file with size of 2GB. When I run the Flink on a standalone 
>>> cluster, i.e., one JobManager and one taskManager with 25GB of heapsize, 
>>> it took around two hours to finish counting this file while a simple 
>>> python script can do it in around 7 minutes. Just wondering what is 
>>> wrong with my setup. I ran the experiments on a cluster with six 
>>> taskManagers, but I still get very long execution time like 25 minutes 
>>> or so. I tried to increase the JVM heap size to have lower execution 
>>> time but it did not help. I attached the log file and the Flink 
>>> configuration file to this email.
>>> 
>>> Best,
>>> 
>>> Habib
>>> 
> 
> 



Re: No FileSystem for scheme "file" for S3A in and state processor api in 1.9

2019-11-01 Thread Piotr Nowojski
Ok, thanks for the explanation now it makes sense. Previously I haven’t noticed 
that those snapshot state calls visible in your stack trace come from State 
Processor API. We will try to reproduce it, so we might have more questions 
later, but those information might be enough.

One more question for now, have you tried using Presto for the 
bootstrapping/batch job as well?

Piotrek

> On 31 Oct 2019, at 23:49, spoganshev  wrote:
> 
> The problem happens in batch jobs (the ones that use ExecutionEnvironment)
> that use state processor api for bootstrapping initial savepoint for
> streaming job. 
> 
> We are building a single docker image for streaming and batch versions of
> the job. In that image we put both presto (which we use for checkpoints in
> streaming job) and hadoop to separate plugin folders. When we run a batch
> job using this image the aforementioned exception happens.
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



How to emit changed data only w/ Flink trigger?

2019-11-01 Thread Qi Kang
Hi all,


We have a Flink job which aggregates sales volume and GMV data of each site on 
a daily basis. The code skeleton is shown as follows.


```
sourceStream
 .map(message -> JSON.parseObject(message, OrderDetail.class))
 .keyby("siteId")
 .window(TumblingProcessingTimeWindows.of(Time.days(1), Time.hours(-8)))
 .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(1)))
 .aggregate(new VolumeGmvAggregateFunc());
```


The window is triggered every second in order to refresh the data displayed on 
a real-time dashboard. Is there some way to output only those sites’ data which 
changed in 1 second period? Currently we’ve got 1000+ sites, so frequently 
emitting all aggregation records seems somewhat expensive.


BR, Qi Kang




is Streaming Ledger open source?

2019-11-01 Thread kant kodali
Hi All,

Is https://github.com/dataArtisans/da-streamingledger an open-source
project? Looks to me that this project is not actively maintained. is that
correct? since the last commit is one year ago and it shows there are 0
contributors?

Thanks!


Re: Flink 1.5+ performance in a Java standalone environment

2019-11-01 Thread Till Rohrmann
Hi Jakub,

what are the cluster settings and the exact job settings you are running
your job with? I'm asking because one difference between legacy and FLIP-6
mode is that the legacy mode spreads out tasks across all available
TaskManagers whereas the FLIP-6 mode tries to bin package them on as few
TaskManagers as possible. If you have more slots than the parallelism of
your job, then I could see how this could affect the performance of your
job if it is not I/O bound but CPU bound. We will add an option to enable
the old spread out strategy again [1].

Another reason why you might see a performance degradation is the placement
of key groups. In the legacy mode, Flink distributed them so that two
TaskManagers with the same number of tasks would only have at most one key
group more. In FLIP-6 it can be up to the number of slots more key groups
on one of the TaskManagers. In order to mitigate this problem I would
recommend to set the maximum parallelism (== number of key groups) to a
multiple of your parallelism.

[1] https://issues.apache.org/jira/browse/FLINK-12122

Cheers,
Till

On Wed, Oct 30, 2019 at 4:28 PM Jakub Danilewicz <
jdanilew...@alto-analytics.com> wrote:

> Hi,
>
> I can confirm that the performance drop is directly related to FLIP-6
> changes. Applying this modification to the code posted above restores the
> previous graph processing speed under Flink 1.5.6:
>
> ---
>
> org.apache.flink.configuration.Configuration customConfig = new
> org.apache.flink.configuration.Configuration();
> customConfig.setString("mode", "legacy");
> final ExecutionEnvironment env =
> ExecutionEnvironment.createLocalEnvironment(customConfig);
> env.setParallelism(parallelism);
>
> ---
>
> Disabling the "taskmanager.network.credit-model" parameter in
> Configuration provides only a very slight improvement in the performance
> under Flink 1.5.6.
>
> Now the big question: what about newer versions where the legacy mode is
> not supported anymore? I checked Flink 1.8.2 and it does not work.
>
> Is there any way to make the new mode as performant as the "legacy" one in
> the standalone scenarios? Alternatively may we expect improvements in this
> area in the upcoming releases?
>
> Best,
>
> Jakub
>
> On 2019/10/30 14:11:19, Piotr Nowojski  wrote:
> > Hi,
> >
> > In Flink 1.5 there were three big changes, that could affect
> performance.
> > 1. FLIP-6 changes (As previously Yang and Fabian mentioned)
> > 2. Credit base flow control (especially if you are using SSL)
> > 3. Low latency network changes
> >
> > I would suspect them in that order. First and second you can disable via
> configuration switches [1] and [2] respectively.
> >
> > [1] “mode:legacy"
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/config.html#core
> <
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/config.html#core
> >
> > [2] "taskmanager.network.credit-model:false”
> >
> > Could you try disabling them out?
> >
> > Piotrek
> >
> > > On 28 Oct 2019, at 14:10, Jakub Danilewicz <
> jdanilew...@alto-analytics.com> wrote:
> > >
> > > Thanks for your replies.
> > >
> > > We use Flink from within a standalone Java 8 application (no Hadoop,
> no clustering), so it's basically boils down to running a simple code like
> this:
> > >
> > > import java.util.*;
> > > import org.apache.flink.api.java.ExecutionEnvironment;
> > > import org.apache.flink.graph.*;
> > > import org.apache.flink.graph.library.CommunityDetection;
> > >
> > > public class FlinkTester {
> > >final Random random = new Random(1);
> > >final float density = 3.0F;
> > >
> > >public static void main(String[] args) throws Exception {
> > >new FlinkTester().execute(100, 4);
> > >}
> > >
> > >private void execute(int numEdges, int parallelism) throws
> Exception {
> > >final ExecutionEnvironment env =
> ExecutionEnvironment.createLocalEnvironment(parallelism);
> > >final Graph graph = createGraph(numEdges,
> env);
> > >
> > >final long start = System.currentTimeMillis();
> > >List> vertices = graph.run(new
> CommunityDetection(10, 0.5)).getVertices().collect();
> > >System.out.println(vertices.size() + " vertices processed in "
> + (System.currentTimeMillis()-start)/1000 + " s");
> > >}
> > >
> > >private Graph createGraph(int numEdges,
> ExecutionEnvironment env) {
> > >System.out.println("Creating new graph of " + numEdges + "
> edges...");
> > >
> > >final int maxNumVertices = (int)(numEdges/density);
> > >final Map> vertexMap = new
> HashMap<>(maxNumVertices);
> > >final Map> edgeMap = new
> HashMap<>(numEdges);
> > >
> > >while (edgeMap.size() < numEdges) {
> > >long sourceId = random.nextInt(maxNumVertices) + 1;
> > >long targetId = sourceId;
> > > 

Re: Stateful functions presentation code (UI part)

2019-11-01 Thread Igal Shilman
Hi Flavio, let me try to clarify:

The intention of this example is to demonstrate how
different entities (drivers, passengers, etc') participates in a protocol
(ride matching). For that we have the stateful functions application, and a
standalone java application that just generates the events to trigger the
simulation (i.e. a passenger requests a ride would be an event that the
simulator emits).
The visualization aspect of this example is not a part of what we are
trying to demonstrate but rather, an out-of-band peek into the live system
(websocket that duplicates the events directly to the UI) which we thought
is a nice addition to the example that visualizes how cars are moving on
the grid :-)

If your goal is to create a scalable visualization of car locations, then I
guess your suggestion to break the grid into distinct geo-regions can work.
For that you would need:
1. An additional egress backed by a Kafka topic.
2. Modify the driver function to send location changes to that egress,
where you'd use the region id (geohash with a resolution to your liking) as
key [1].
3. At the consuming side subscribe to the partition that holds the region
of interest.

But, realistically speaking for this kind of a visualization you would be
better off consuming from the locations update topic (from-driver topic)
and populating an optimized geo-indexed store,
and querying it directly.

I hope this clarifies things,
Igal

[1]
https://github.com/ververica/stateful-functions/blob/master/stateful-functions-examples/stateful-functions-ridesharing-example/stateful-functions-ridesharing-example-functions/src/main/java/com/ververica/statefun/examples/ridesharing/KafkaSpecs.java#L121

On Thu, Oct 31, 2019 at 6:21 PM Flavio Pompermaier 
wrote:

> Thanks Igal, this is more or less what I was expecting..this implies that
> ALL events are received on the UI side.
> I was concerned about the tradeoffs of this choice: when I zoom on the map
> I could simply ignore messages outside the boundaries (but I still spend
> many cpu resource in the reading of useless messages).
>
> In the case of a worldwide company (eg uber or simolar) it's probably better
> to create a topic per geographical arean..but also in this case then the UI
> should know when to attach or detach from queue topics when it reach a
> section of the map served by different topics..in another case I could just
> have too many events also in the single map section (let's think about some
> big city with many user).
>
> Is there any talk at FF ir someone else that faced those issues too?
>
> Il Gio 31 Ott 2019, 17:44 Igal Shilman  ha scritto:
>
>> For that particular example, the simulator [1] is responsible for
>> simulating physical drivers and passengers that interact with their
>> corresponding
>> stateful functions [2].
>> The interaction between the simulator and the stateful functions is
>> happening via four Kafka topics:
>> * to-driver - messages that are sent from the physical drivers (the
>> simulator [1]) to the stateful functions. The messages always carry a
>> driver-id which acts as a routing key (I think that this is what you mean
>> by correlation id) to a specific driver stateful function (FnDriver)
>> * from-driver - messages that are sent from a stateful function with a
>> specific driver id to the simulator
>> * to-passenger - symmetric to to-driver
>> * from-passenger - symmetric to from-driver.
>> The ingress and egress definition are specified here [3], and you may
>> want to checkout how to router is defined as well [4][5].
>>
>> In addition the simulator is also feeding the UI directly by duplicating
>> the messages to a web socket (see [6])
>>
>> I hope this clarifies the examples.
>>
>> Igal.
>>
>> [1]
>> https://github.com/ververica/stateful-functions/tree/master/stateful-functions-examples/stateful-functions-ridesharing-example/stateful-functions-ridesharing-example-simulator/src/main/java/com/ververica/statefun/examples/ridesharing/simulator
>> [2]
>> https://github.com/ververica/stateful-functions/tree/master/stateful-functions-examples/stateful-functions-ridesharing-example/stateful-functions-ridesharing-example-functions
>> [3]
>> https://github.com/ververica/stateful-functions/blob/master/stateful-functions-examples/stateful-functions-ridesharing-example/stateful-functions-ridesharing-example-functions/src/main/java/com/ververica/statefun/examples/ridesharing/KafkaSpecs.java#L43
>> [4]
>> https://github.com/ververica/stateful-functions/blob/master/stateful-functions-examples/stateful-functions-ridesharing-example/stateful-functions-ridesharing-example-functions/src/main/java/com/ververica/statefun/examples/ridesharing/Module.java#L33
>> [5]
>> https://github.com/ververica/stateful-functions/blob/master/stateful-functions-examples/stateful-functions-ridesharing-example/stateful-functions-ridesharing-example-functions/src/main/java/com/ververica/statefun/examples/ridesharing/InboundDriverRouter.java#L26
>> [6]
>> https://github.com/verv

Re: How to emit changed data only w/ Flink trigger?

2019-11-01 Thread kant kodali
I am new to Flink so I am not sure if I am giving you the correct answer so
you might want to wait for others to respond. But I think you should do

.inUpsertMode()


On Fri, Nov 1, 2019 at 2:38 AM Qi Kang  wrote:

> Hi all,
>
>
> We have a Flink job which aggregates sales volume and GMV data of each
> site on a daily basis. The code skeleton is shown as follows.
>
>
> ```
> sourceStream
>  .map(message -> JSON.parseObject(message, OrderDetail.class))
>  .keyby("siteId")
>  .window(TumblingProcessingTimeWindows.of(Time.days(1), Time.hours(-8)))
>  .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(1)))
>  .aggregate(new VolumeGmvAggregateFunc());
> ```
>
>
> The window is triggered every second in order to refresh the data
> displayed on a real-time dashboard. Is there some way to output only those
> sites’ data which changed in 1 second period? Currently we’ve got 1000+
> sites, so frequently emitting all aggregation records seems somewhat
> expensive.
>
>
> BR, Qi Kang
>
>
>


Re: Async operator with a KeyedStream

2019-11-01 Thread vino yang
Hi Bastien,

Your analysis of using KeyedStream in Async I/O is correct. It will not
figure out the key.

In your scene, the good practice about interacting with DB is async I/O +
thread pool[1] + connection Pool.

You can use a connection pool to reuse and limit the mysql connection.

Best,
Vino

[1]:
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.html#implementation-tips

bastien dine  于2019年10月31日周四 下午4:36写道:

> Hello,
>
> I would like to know if you can use a KeyedStream with the Async operator :
> I want to use the async operator to insert some stuff in my database but I
> want to limit 1 request per element (with key=id) at a time
> With a regular keyBy / map, it's working, but it's too slow (i don't have
> enough ressources to increase my parallelism),
>
> As far as I have seen, this is not possible
> When I write something like
> Async.orderedWait(myStream.keyBy(myKeyselector)), the keyBy is totally
> ignored
>
> Have you a solution for this?
>
> Best Regards,
> Bastien
>
> --
>
> Bastien DINE
> Data Architect / Software Engineer / Sysadmin
> bastiendine.io
>


Re: How to emit changed data only w/ Flink trigger?

2019-11-01 Thread Taher Koitawala
You can do this by writing a custom trigger or evictor.

On Fri, Nov 1, 2019 at 3:08 PM Qi Kang  wrote:

> Hi all,
>
>
> We have a Flink job which aggregates sales volume and GMV data of each
> site on a daily basis. The code skeleton is shown as follows.
>
>
> ```
> sourceStream
>  .map(message -> JSON.parseObject(message, OrderDetail.class))
>  .keyby("siteId")
>  .window(TumblingProcessingTimeWindows.of(Time.days(1), Time.hours(-8)))
>  .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(1)))
>  .aggregate(new VolumeGmvAggregateFunc());
> ```
>
>
> The window is triggered every second in order to refresh the data
> displayed on a real-time dashboard. Is there some way to output only those
> sites’ data which changed in 1 second period? Currently we’ve got 1000+
> sites, so frequently emitting all aggregation records seems somewhat
> expensive.
>
>
> BR, Qi Kang
>
>
>


Re: Flink 1.5+ performance in a Java standalone environment

2019-11-01 Thread Jakub Danilewicz
Thanks for your reply, Till.

As mentioned above I execute graph processing in a straight-ahead Java 
standalone environment (no cluster underneath, no specific configuration except 
for parallelism), just as if you simply ran the Java class I pasted upthread 
with a Flink distribution JAR (plus Gelly and Slf4j/Log4j JARs) on its 
classpath. 

I do not know what goes on behind the scenes, but the "legacy" mode 
significantly outperforms the "new" one in every single case. The new mode is a 
few times slower, getting worse and worse with the increasing size of the graph.

As for setting "the maximum parallelism (== number of key groups) to a multiple 
of your parallelism", could you tell me which configuration option from the 
list below is it?

https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/config.html 

Best,

Jakub


On 2019/11/01 10:19:47, Till Rohrmann  wrote: 
> Hi Jakub,
> 
> what are the cluster settings and the exact job settings you are running
> your job with? I'm asking because one difference between legacy and FLIP-6
> mode is that the legacy mode spreads out tasks across all available
> TaskManagers whereas the FLIP-6 mode tries to bin package them on as few
> TaskManagers as possible. If you have more slots than the parallelism of
> your job, then I could see how this could affect the performance of your
> job if it is not I/O bound but CPU bound. We will add an option to enable
> the old spread out strategy again [1].
> 
> Another reason why you might see a performance degradation is the placement
> of key groups. In the legacy mode, Flink distributed them so that two
> TaskManagers with the same number of tasks would only have at most one key
> group more. In FLIP-6 it can be up to the number of slots more key groups
> on one of the TaskManagers. In order to mitigate this problem I would
> recommend to set the maximum parallelism (== number of key groups) to a
> multiple of your parallelism.
> 
> [1] https://issues.apache.org/jira/browse/FLINK-12122
> 
> Cheers,
> Till
> 
> On Wed, Oct 30, 2019 at 4:28 PM Jakub Danilewicz <
> jdanilew...@alto-analytics.com> wrote:
> 
> > Hi,
> >
> > I can confirm that the performance drop is directly related to FLIP-6
> > changes. Applying this modification to the code posted above restores the
> > previous graph processing speed under Flink 1.5.6:
> >
> > ---
> >
> > org.apache.flink.configuration.Configuration customConfig = new
> > org.apache.flink.configuration.Configuration();
> > customConfig.setString("mode", "legacy");
> > final ExecutionEnvironment env =
> > ExecutionEnvironment.createLocalEnvironment(customConfig);
> > env.setParallelism(parallelism);
> >
> > ---
> >
> > Disabling the "taskmanager.network.credit-model" parameter in
> > Configuration provides only a very slight improvement in the performance
> > under Flink 1.5.6.
> >
> > Now the big question: what about newer versions where the legacy mode is
> > not supported anymore? I checked Flink 1.8.2 and it does not work.
> >
> > Is there any way to make the new mode as performant as the "legacy" one in
> > the standalone scenarios? Alternatively may we expect improvements in this
> > area in the upcoming releases?
> >
> > Best,
> >
> > Jakub
> >
> > On 2019/10/30 14:11:19, Piotr Nowojski  wrote:
> > > Hi,
> > >
> > > In Flink 1.5 there were three big changes, that could affect
> > performance.
> > > 1. FLIP-6 changes (As previously Yang and Fabian mentioned)
> > > 2. Credit base flow control (especially if you are using SSL)
> > > 3. Low latency network changes
> > >
> > > I would suspect them in that order. First and second you can disable via
> > configuration switches [1] and [2] respectively.
> > >
> > > [1] “mode:legacy"
> > https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/config.html#core
> > <
> > https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/config.html#core
> > >
> > > [2] "taskmanager.network.credit-model:false”
> > >
> > > Could you try disabling them out?
> > >
> > > Piotrek
> > >
> > > > On 28 Oct 2019, at 14:10, Jakub Danilewicz <
> > jdanilew...@alto-analytics.com> wrote:
> > > >
> > > > Thanks for your replies.
> > > >
> > > > We use Flink from within a standalone Java 8 application (no Hadoop,
> > no clustering), so it's basically boils down to running a simple code like
> > this:
> > > >
> > > > import java.util.*;
> > > > import org.apache.flink.api.java.ExecutionEnvironment;
> > > > import org.apache.flink.graph.*;
> > > > import org.apache.flink.graph.library.CommunityDetection;
> > > >
> > > > public class FlinkTester {
> > > >final Random random = new Random(1);
> > > >final float density = 3.0F;
> > > >
> > > >public static void main(String[] args) throws Exception {
> > > >new FlinkTester().execute(100, 4);
> > > >}
> > > >
> > > >

Re: Stateful functions presentation code (UI part)

2019-11-01 Thread Flavio Pompermaier
Thanks Igal for the detailed explanantion. I know that this was only a
demo, I just wanted to reason a bit on the pros and cons of sending data to
an UI from Flink.

Best,
Flavio


Il Ven 1 Nov 2019, 12:21 Igal Shilman  ha scritto:

> Hi Flavio, let me try to clarify:
>
> The intention of this example is to demonstrate how
> different entities (drivers, passengers, etc') participates in a protocol
> (ride matching). For that we have the stateful functions application, and a
> standalone java application that just generates the events to trigger the
> simulation (i.e. a passenger requests a ride would be an event that the
> simulator emits).
> The visualization aspect of this example is not a part of what we are
> trying to demonstrate but rather, an out-of-band peek into the live system
> (websocket that duplicates the events directly to the UI) which we thought
> is a nice addition to the example that visualizes how cars are moving on
> the grid :-)
>
> If your goal is to create a scalable visualization of car locations, then
> I guess your suggestion to break the grid into distinct geo-regions can
> work. For that you would need:
> 1. An additional egress backed by a Kafka topic.
> 2. Modify the driver function to send location changes to that egress,
> where you'd use the region id (geohash with a resolution to your liking) as
> key [1].
> 3. At the consuming side subscribe to the partition that holds the region
> of interest.
>
> But, realistically speaking for this kind of a visualization you would be
> better off consuming from the locations update topic (from-driver topic)
> and populating an optimized geo-indexed store,
> and querying it directly.
>
> I hope this clarifies things,
> Igal
>
> [1]
> https://github.com/ververica/stateful-functions/blob/master/stateful-functions-examples/stateful-functions-ridesharing-example/stateful-functions-ridesharing-example-functions/src/main/java/com/ververica/statefun/examples/ridesharing/KafkaSpecs.java#L121
>
> On Thu, Oct 31, 2019 at 6:21 PM Flavio Pompermaier 
> wrote:
>
>> Thanks Igal, this is more or less what I was expecting..this implies that
>> ALL events are received on the UI side.
>> I was concerned about the tradeoffs of this choice: when I zoom on the
>> map I could simply ignore messages outside the boundaries (but I still
>> spend many cpu resource in the reading of useless messages).
>>
>> In the case of a worldwide company (eg uber or simolar) it's probably better
>> to create a topic per geographical arean..but also in this case then the UI
>> should know when to attach or detach from queue topics when it reach a
>> section of the map served by different topics..in another case I could just
>> have too many events also in the single map section (let's think about some
>> big city with many user).
>>
>> Is there any talk at FF ir someone else that faced those issues too?
>>
>> Il Gio 31 Ott 2019, 17:44 Igal Shilman  ha scritto:
>>
>>> For that particular example, the simulator [1] is responsible for
>>> simulating physical drivers and passengers that interact with their
>>> corresponding
>>> stateful functions [2].
>>> The interaction between the simulator and the stateful functions is
>>> happening via four Kafka topics:
>>> * to-driver - messages that are sent from the physical drivers (the
>>> simulator [1]) to the stateful functions. The messages always carry a
>>> driver-id which acts as a routing key (I think that this is what you mean
>>> by correlation id) to a specific driver stateful function (FnDriver)
>>> * from-driver - messages that are sent from a stateful function with a
>>> specific driver id to the simulator
>>> * to-passenger - symmetric to to-driver
>>> * from-passenger - symmetric to from-driver.
>>> The ingress and egress definition are specified here [3], and you may
>>> want to checkout how to router is defined as well [4][5].
>>>
>>> In addition the simulator is also feeding the UI directly by duplicating
>>> the messages to a web socket (see [6])
>>>
>>> I hope this clarifies the examples.
>>>
>>> Igal.
>>>
>>> [1]
>>> https://github.com/ververica/stateful-functions/tree/master/stateful-functions-examples/stateful-functions-ridesharing-example/stateful-functions-ridesharing-example-simulator/src/main/java/com/ververica/statefun/examples/ridesharing/simulator
>>> [2]
>>> https://github.com/ververica/stateful-functions/tree/master/stateful-functions-examples/stateful-functions-ridesharing-example/stateful-functions-ridesharing-example-functions
>>> [3]
>>> https://github.com/ververica/stateful-functions/blob/master/stateful-functions-examples/stateful-functions-ridesharing-example/stateful-functions-ridesharing-example-functions/src/main/java/com/ververica/statefun/examples/ridesharing/KafkaSpecs.java#L43
>>> [4]
>>> https://github.com/ververica/stateful-functions/blob/master/stateful-functions-examples/stateful-functions-ridesharing-example/stateful-functions-ridesharing-example-functions/src/main/java/com/ververica/s

[ANNOUNCE] Progress of Apache Flink 1.10 #2

2019-11-01 Thread Gary Yao
Hi community,

Because we have approximately one month of development time left until the
targeted Flink 1.10 feature freeze, we thought now would be a good time to
give another progress update. Below we have included a list of the ongoing
efforts that have made progress since our last release progress update [1].
As
always, if you are working on something that is not included here, feel free
to use this thread to share your progress.

- Support Java 11 [2]
- Implementation is in progress (18/21 subtasks resolved)

- Table API improvements
- Full Data Type Support in Planner [3]
- Implementing (1/8 subtasks resolved)
- FLIP-66 Support Time Attribute in SQL DDL [4]
- Implementation is in progress (1/7 subtasks resolved).
- FLIP-70 Support Computed Column [5]
- FLIP voting [6]
- FLIP-63 Rework Table Partition Support [7]
- Implementation is in progress (3/15 subtasks resolved).
- FLIP-51 Rework of Expression Design [8]
- Implementation is in progress (2/12 subtasks resolved).
- FLIP-64 Support for Temporary Objects in Table Module [9]
- Implementation is in progress

- Hive compatibility completion (DDL/UDF) to support full Hive integration
- FLIP-57 Rework FunctionCatalog [10]
- Implementation is in progress (6/9 subtasks resolved)
- FLIP-68 Extend Core Table System with Modular Plugins [11]
- Implementation is in progress (2/8 subtasks resolved)

- Finer grained resource management
- FLIP-49: Unified Memory Configuration for TaskExecutors [12]
- Implementation is in progress (6/10 subtasks resolved)
- FLIP-53: Fine Grained Operator Resource Management [13]
- Implementation is in progress (1/9 subtasks resolved)

- Finish scheduler re-architecture [14]
- Integration tests are being enabled for new scheduler

- Executor/Client refactoring [15]
- FLIP-81: Executor-related new ConfigOptions [16]
- done
- FLIP-73: Introducing Executors for job submission [17]
- Implementation is in progress

- FLIP-36 Support Interactive Programming [18]
- Is built on top of FLIP-67 [19], which has been accepted
- Implementation in progress

- FLIP-58: Flink Python User-Defined Stateless Function for Table [20]
- Implementation is in progress (12/22 subtask resolved)
- FLIP-50: Spill-able Heap Keyed State Backend [21]
- Implementation is in progress (2/11 subtasks resolved)

- RocksDB Backend Memory Control [22]
- FLIP for resource management on state backend will be opened soon
- Write Buffer Manager will be backported to FRocksDB due to
performance regression [23] in new RocksDB versions

- Unaligned Checkpoints
- FLIP-76 [24] was published and received positive feedback
- Implementation is in progress

- Separate framework and user class loader in per-job mode [25]
- First PR is almost done. Remaining PRs will be ready next week

- Active Kubernetes Integration [26]
- Implementation is in progress (6/11 in review, 3/11 in progress, 2/11
todo)

- FLIP-39 Flink ML pipeline and ML libs [27]
- A few abstract ML classes have been merged (FLINK-13339, FLINK-13513)
- Starting review of algorithms

Again, the feature freeze is targeted to be at the end of November. Please
make sure that all important work threads can be completed until that date.
Feel free to use this thread to communicate any concerns about features that
might not be finished until then. We will send another announcement later in
the release cycle to make the date of the feature freeze official.

Best,
Yu & Gary

[1] https://s.apache.org/wc0dc
[2] https://issues.apache.org/jira/browse/FLINK-10725
[3] https://issues.apache.org/jira/browse/FLINK-14079
[4]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-66%3A+Support+time+attribute+in+SQL+DDL
[5]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-70%3A+Flink+SQL+Computed+Column+Design
[6]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-FLIP-70-Flink-SQL-Computed-Column-Design-td34385.html
[7]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-63%3A+Rework+table+partition+support
[8]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-51%3A+Rework+of+the+Expression+Design
[9]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-64%3A+Support+for+Temporary+Objects+in+Table+module
[10]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-57%3A+Rework+FunctionCatalog
[11]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-68%3A+Extend+Core+Table+System+with+Pluggable+Modules
[12]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-49%3A+Unified+Memory+Configuration+for+TaskExecutors
[13]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-53%3A+Fine+Grained+Operator+Resource+Management
[14] https://issues.apache.org/jira/browse/FLINK-10429
[15]
https://lists.apache.org/thread.html/ce99cba4a10b9dc40eb729d39910f315ae41d80ec74f09a356c73938@%3Cdev.flink.apache.org%3E
[16]
https:

Re: low performance in running queries

2019-11-01 Thread Habib Mostafaei

Hi Piotrek,

Thanks for the list of profilers. I used VisualVM and here is the 
resource usage for taskManager.


Habib


On 11/1/2019 9:48 AM, Piotr Nowojski wrote:

Hi,

>  Is there a simple way to get profiling information in Flink?

Flink doesn’t provide any special tooling for that. Just use your 
chosen profiler, for example: Oracle’s Mission Control (free on non 
production clusters, no need to install anything if already using 
Oracle’s JVM), VisualVM (I think free), YourKit (paid). For each one 
of them there is a plenty of online support how to use them both for 
local and remote profiling.


Piotrek

On 31 Oct 2019, at 14:05, Habib Mostafaei > wrote:


I enclosed all logs from the run and for this run I used parallelism 
one. However, for other runs I checked and found that all parallel 
workers were working properly. Is there a simple way to get profiling 
information in Flink?


Best,

Habib

On 10/31/2019 2:54 AM, Zhenghua Gao wrote:
I think more runtime information would help figure 
outwheretheproblem is.

1) how many parallelisms actually working
2) the metrics for each operator
3) the jvm profiling information, etc

*Best Regards,*
*Zhenghua Gao*


On Wed, Oct 30, 2019 at 8:25 PM Habib Mostafaei 
mailto:ha...@inet.tu-berlin.de>> wrote:


Thanks Gao for the reply. I used the parallelism parameter with
different values like 6 and 8 but still the execution time is
not comparable with a single threaded python script. What would
be the reasonable value for the parallelism?

Best,

Habib

On 10/30/2019 1:17 PM, Zhenghua Gao wrote:

The reason might be the parallelism of your task is only 1,
that's too low.
See [1] to specify proper parallelism  for your job, and the
execution time should be reduced significantly.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/parallel.html

*Best Regards,*
*Zhenghua Gao*


On Tue, Oct 29, 2019 at 9:27 PM Habib Mostafaei
mailto:ha...@inet.tu-berlin.de>> wrote:

Hi all,

I am running Flink on a standalone cluster and getting very
long
execution time for the streaming queries like WordCount for
a fixed text
file. My VM runs on a Debian 10 with 16 cpu cores and 32GB
of RAM. I
have a text file with size of 2GB. When I run the Flink on
a standalone
cluster, i.e., one JobManager and one taskManager with 25GB
of heapsize,
it took around two hours to finish counting this file while
a simple
python script can do it in around 7 minutes. Just wondering
what is
wrong with my setup. I ran the experiments on a cluster
with six
taskManagers, but I still get very long execution time like
25 minutes
or so. I tried to increase the JVM heap size to have lower
execution
time but it did not help. I attached the log file and the
Flink
configuration file to this email.

Best,

Habib







--
Habib Mostafaei, Ph.D.
Postdoctoral researcher
TU Berlin,
FG INET, MAR 4.003
Marchstraße 23, 10587 Berlin



Re: low performance in running queries

2019-11-01 Thread Piotr Nowojski
Hi,

More important would be the code profiling output. I think VisualVM allows to 
share the code profiling result as “snapshots”? If you could analyse or share 
this, it would be helpful.

From the attached screenshot the only thing that is visible is that there are 
no GC issues, and secondly the application is running only on one (out of 10?) 
CPU cores. Which hints one obvious way how to improve the performance - scale 
out. However the WordCount example might not be the best for this, as I’m 
pretty sure its source is fundamentally not parallel.

Piotrek

> On 1 Nov 2019, at 15:57, Habib Mostafaei  wrote:
> 
> Hi Piotrek,
> 
> Thanks for the list of profilers. I used VisualVM and here is the resource 
> usage for taskManager.
> 
> 
> 
> Habib
> 
> 
> 
> On 11/1/2019 9:48 AM, Piotr Nowojski wrote:
>> Hi,
>> 
>> >  Is there a simple way to get profiling information in Flink?
>> 
>> Flink doesn’t provide any special tooling for that. Just use your chosen 
>> profiler, for example: Oracle’s Mission Control (free on non production 
>> clusters, no need to install anything if already using Oracle’s JVM), 
>> VisualVM (I think free), YourKit (paid). For each one of them there is a 
>> plenty of online support how to use them both for local and remote profiling.
>> 
>> Piotrek
>> 
>>> On 31 Oct 2019, at 14:05, Habib Mostafaei >> > wrote:
>>> 
>>> I enclosed all logs from the run and for this run I used parallelism one. 
>>> However, for other runs I checked and found that all parallel workers were 
>>> working properly. Is there a simple way to get profiling information in 
>>> Flink?
>>> 
>>> Best,
>>> 
>>> Habib
>>> 
>>> On 10/31/2019 2:54 AM, Zhenghua Gao wrote:
 I think more runtime information would help figure out where the problem 
 is.
 1) how many parallelisms actually working
 2) the metrics for each operator
 3) the jvm profiling information, etc
 
 Best Regards,
 Zhenghua Gao
 
 
 On Wed, Oct 30, 2019 at 8:25 PM Habib Mostafaei >>> > wrote:
 Thanks Gao for the reply. I used the parallelism parameter with different 
 values like 6 and 8 but still the execution time is not comparable with a 
 single threaded python script. What would be the reasonable value for the 
 parallelism?
 
 Best,
 
 Habib
 
 On 10/30/2019 1:17 PM, Zhenghua Gao wrote:
> The reason might be the parallelism of your task is only 1, that's too 
> low.
> See [1] to specify proper parallelism  for your job, and the execution 
> time should be reduced significantly.
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/parallel.html 
> 
> 
> Best Regards,
> Zhenghua Gao
> 
> 
> On Tue, Oct 29, 2019 at 9:27 PM Habib Mostafaei  > wrote:
> Hi all,
> 
> I am running Flink on a standalone cluster and getting very long 
> execution time for the streaming queries like WordCount for a fixed text 
> file. My VM runs on a Debian 10 with 16 cpu cores and 32GB of RAM. I 
> have a text file with size of 2GB. When I run the Flink on a standalone 
> cluster, i.e., one JobManager and one taskManager with 25GB of heapsize, 
> it took around two hours to finish counting this file while a simple 
> python script can do it in around 7 minutes. Just wondering what is 
> wrong with my setup. I ran the experiments on a cluster with six 
> taskManagers, but I still get very long execution time like 25 minutes 
> or so. I tried to increase the JVM heap size to have lower execution 
> time but it did not help. I attached the log file and the Flink 
> configuration file to this email.
> 
> Best,
> 
> Habib
> 
>>> 
>>> 
>> 
> -- 
> Habib Mostafaei, Ph.D.
> Postdoctoral researcher
> TU Berlin,
> FG INET, MAR 4.003
> Marchstraße 23, 10587 Berlin



Re: Preserving (best effort) messages order between operators

2019-11-01 Thread Averell
Hi Yun,

I found the cause of the issue.
That ContinuousFileReaderOperator (my operator B) is using a PriorityQueue
which maintains a buffer sorted by modTime, thus my records were re-ordered.
I don't understand the reason behind using PriorityQueue instead of an
ordinary Queue though.

Thanks.
Averell 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: is Streaming Ledger open source?

2019-11-01 Thread kanth909
Got it! 

Sent from my iPhone

> On Nov 1, 2019, at 6:23 AM, Seth Wiesman  wrote:
> 
> Hi Kant,
> 
> Streaming Ledger is actively maintained but is not open source. That repo 
> contains the sdk which is open source along with a single threaded runner for 
> testing. The parallel execution engine is only offered as part of Ververica 
> Platform River edition. 
> 
> Seth 
> 
> https://docs.ververica.com/streaming_ledger/getting_started/index.html
> 
> 
>> On Fri, Nov 1, 2019 at 5:16 AM kant kodali  wrote:
>> Hi All,
>> 
>> Is https://github.com/dataArtisans/da-streamingledger an open-source 
>> project? Looks to me that this project is not actively maintained. is that 
>> correct? since the last commit is one year ago and it shows there are 0 
>> contributors?
>> 
>> Thanks!
> -- 
> Seth Wiesman | Solutions Architect
> +1 314 387 1463
> 
> 
> 
> Follow us @VervericaData
> --
> Join Flink Forward - The Apache Flink Conference
> Stream Processing | Event Driven | Real Time
> --
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen


Re: No FileSystem for scheme "file" for S3A in and state processor api in 1.9

2019-11-01 Thread spoganshev
No, I didn't because it's inconvenient for us to have 2 different docker
images for streaming and batch jobs.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: is Streaming Ledger open source?

2019-11-01 Thread kanth909
Got it! So will go with spark delta.

Sent from my iPhone

> On Nov 1, 2019, at 6:23 AM, Seth Wiesman  wrote:
> 
> Hi Kant,
> 
> Streaming Ledger is actively maintained but is not open source. That repo 
> contains the sdk which is open source along with a single threaded runner for 
> testing. The parallel execution engine is only offered as part of Ververica 
> Platform River edition. 
> 
> Seth 
> 
> https://docs.ververica.com/streaming_ledger/getting_started/index.html
> 
> 
>> On Fri, Nov 1, 2019 at 5:16 AM kant kodali  wrote:
>> Hi All,
>> 
>> Is https://github.com/dataArtisans/da-streamingledger an open-source 
>> project? Looks to me that this project is not actively maintained. is that 
>> correct? since the last commit is one year ago and it shows there are 0 
>> contributors?
>> 
>> Thanks!
> -- 
> Seth Wiesman | Solutions Architect
> +1 314 387 1463
> 
> 
> 
> Follow us @VervericaData
> --
> Join Flink Forward - The Apache Flink Conference
> Stream Processing | Event Driven | Real Time
> --
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen


Re: [ANNOUNCE] Progress of Apache Flink 1.10 #2

2019-11-01 Thread Steven Wu
Gary,  FLIP-27 seems to get omitted in the 2nd update. below is the info
from update #1.

- FLIP-27: Refactor Source Interface [20]
-  FLIP accepted. Implementation is in progress.



On Fri, Nov 1, 2019 at 7:01 AM Gary Yao  wrote:

> Hi community,
>
> Because we have approximately one month of development time left until the
> targeted Flink 1.10 feature freeze, we thought now would be a good time to
> give another progress update. Below we have included a list of the ongoing
> efforts that have made progress since our last release progress update
> [1]. As
> always, if you are working on something that is not included here, feel
> free
> to use this thread to share your progress.
>
> - Support Java 11 [2]
> - Implementation is in progress (18/21 subtasks resolved)
>
> - Table API improvements
> - Full Data Type Support in Planner [3]
> - Implementing (1/8 subtasks resolved)
> - FLIP-66 Support Time Attribute in SQL DDL [4]
> - Implementation is in progress (1/7 subtasks resolved).
> - FLIP-70 Support Computed Column [5]
> - FLIP voting [6]
> - FLIP-63 Rework Table Partition Support [7]
> - Implementation is in progress (3/15 subtasks resolved).
> - FLIP-51 Rework of Expression Design [8]
> - Implementation is in progress (2/12 subtasks resolved).
> - FLIP-64 Support for Temporary Objects in Table Module [9]
> - Implementation is in progress
>
> - Hive compatibility completion (DDL/UDF) to support full Hive integration
> - FLIP-57 Rework FunctionCatalog [10]
> - Implementation is in progress (6/9 subtasks resolved)
> - FLIP-68 Extend Core Table System with Modular Plugins [11]
> - Implementation is in progress (2/8 subtasks resolved)
>
> - Finer grained resource management
> - FLIP-49: Unified Memory Configuration for TaskExecutors [12]
> - Implementation is in progress (6/10 subtasks resolved)
> - FLIP-53: Fine Grained Operator Resource Management [13]
> - Implementation is in progress (1/9 subtasks resolved)
>
> - Finish scheduler re-architecture [14]
> - Integration tests are being enabled for new scheduler
>
> - Executor/Client refactoring [15]
> - FLIP-81: Executor-related new ConfigOptions [16]
> - done
> - FLIP-73: Introducing Executors for job submission [17]
> - Implementation is in progress
>
> - FLIP-36 Support Interactive Programming [18]
> - Is built on top of FLIP-67 [19], which has been accepted
> - Implementation in progress
>
> - FLIP-58: Flink Python User-Defined Stateless Function for Table [20]
> - Implementation is in progress (12/22 subtask resolved)
> - FLIP-50: Spill-able Heap Keyed State Backend [21]
> - Implementation is in progress (2/11 subtasks resolved)
>
> - RocksDB Backend Memory Control [22]
> - FLIP for resource management on state backend will be opened soon
> - Write Buffer Manager will be backported to FRocksDB due to
> performance regression [23] in new RocksDB versions
>
> - Unaligned Checkpoints
> - FLIP-76 [24] was published and received positive feedback
> - Implementation is in progress
>
> - Separate framework and user class loader in per-job mode [25]
> - First PR is almost done. Remaining PRs will be ready next week
>
> - Active Kubernetes Integration [26]
> - Implementation is in progress (6/11 in review, 3/11 in progress,
> 2/11 todo)
>
> - FLIP-39 Flink ML pipeline and ML libs [27]
> - A few abstract ML classes have been merged (FLINK-13339,
> FLINK-13513)
> - Starting review of algorithms
>
> Again, the feature freeze is targeted to be at the end of November. Please
> make sure that all important work threads can be completed until that date.
> Feel free to use this thread to communicate any concerns about features
> that
> might not be finished until then. We will send another announcement later
> in
> the release cycle to make the date of the feature freeze official.
>
> Best,
> Yu & Gary
>
> [1] https://s.apache.org/wc0dc
> [2] https://issues.apache.org/jira/browse/FLINK-10725
> [3] https://issues.apache.org/jira/browse/FLINK-14079
> [4]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-66%3A+Support+time+attribute+in+SQL+DDL
> [5]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-70%3A+Flink+SQL+Computed+Column+Design
> [6]
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-FLIP-70-Flink-SQL-Computed-Column-Design-td34385.html
> [7]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-63%3A+Rework+table+partition+support
> [8]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-51%3A+Rework+of+the+Expression+Design
> [9]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-64%3A+Support+for+Temporary+Objects+in+Table+module
> [10]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-57%3A+Rework+FunctionCatalog
> [11]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-68%3A+Extend+Core+Table+Syste

Re ordering events with flink

2019-11-01 Thread Vishwas Siravara
Hi guys,
I want to know if it's possible to sort events in a flink data stream. I
know I can't sort a stream but is there a way in which I can buffer for a
very short time and sort those events before sending it to a data sink.

In our scenario we consume from a kafka topic which has multiple partitions
but the data in these brokers are *not* partitioned by a key(its round
robin) , for example we want to time order transactions associated with a
particular account but since the same account number ends up in
different partitions at the source for different transactions we are not
able to maintain event time order in our stream processing system since the
same account number ends up in different task managers and slots. We do
however partition by account number when we send the events to downstream
kafka sink so that transactions from the same account number end up in the
same partition. This is however not good enough since the events are not
sorted at the source.

Any ideas for doing this is much appreciated.


Best,
Vishwas


Re: Flink SQL + savepoint

2019-11-01 Thread Fanbin Bu
Kurt,

What do you recommend for Flink SQL to use savepoints?



On Thu, Oct 31, 2019 at 12:03 AM Yun Tang  wrote:

> Hi Fanbin
>
>
>
> If you do not change the parallelism or add and remove operators, you
> could still use savepoint to resume your jobs with Flink SQL.
>
>
>
> However, as far as I know, Flink SQL might not configure the uid currently
> and I’m pretty sure blink branch contains this part of setting uid to
> stream node. [1]
>
>
>
> Already CC Kurt as he could provide more detail information of this.
>
>
>
> [1]
> https://github.com/apache/flink/blob/blink/flink-libraries/flink-table/src/main/java/org/apache/flink/table/util/resource/StreamNodeUtil.java#L44
>
>
>
> Best
>
> Yun Tang
>
>
>
>
>
> *From: *Fanbin Bu 
> *Date: *Thursday, October 31, 2019 at 1:17 PM
> *To: *user 
> *Subject: *Flink SQL + savepoint
>
>
>
> Hi,
>
>
>
> it is highly recommended that we assign the uid to the operator for the
> sake of savepoint. How do we do this for Flink SQL? According to
> https://stackoverflow.com/questions/55464955/how-to-add-uid-to-operator-in-flink-table-api,
> it is not possible.
>
>
>
> Does that mean, I can't use savepoint to restart my program if I use Flink
> SQL?
>
>
>
> Thanks,
>
>
>
> Fanbin
>


Re: [ANNOUNCE] Progress of Apache Flink 1.10 #2

2019-11-01 Thread Thomas Weise
Is there any activity on FLIP-27 that would make it relevant for 1.10
release?

Thanks Gary for the update, it provides excellent visibility on current
activity and what we can expect with the release.


On Fri, Nov 1, 2019 at 1:52 PM Steven Wu  wrote:

> Gary,  FLIP-27 seems to get omitted in the 2nd update. below is the info
> from update #1.
>
> - FLIP-27: Refactor Source Interface [20]
> -  FLIP accepted. Implementation is in progress.
>
>
>
> On Fri, Nov 1, 2019 at 7:01 AM Gary Yao  wrote:
>
> > Hi community,
> >
> > Because we have approximately one month of development time left until
> the
> > targeted Flink 1.10 feature freeze, we thought now would be a good time
> to
> > give another progress update. Below we have included a list of the
> ongoing
> > efforts that have made progress since our last release progress update
> > [1]. As
> > always, if you are working on something that is not included here, feel
> > free
> > to use this thread to share your progress.
> >
> > - Support Java 11 [2]
> > - Implementation is in progress (18/21 subtasks resolved)
> >
> > - Table API improvements
> > - Full Data Type Support in Planner [3]
> > - Implementing (1/8 subtasks resolved)
> > - FLIP-66 Support Time Attribute in SQL DDL [4]
> > - Implementation is in progress (1/7 subtasks resolved).
> > - FLIP-70 Support Computed Column [5]
> > - FLIP voting [6]
> > - FLIP-63 Rework Table Partition Support [7]
> > - Implementation is in progress (3/15 subtasks resolved).
> > - FLIP-51 Rework of Expression Design [8]
> > - Implementation is in progress (2/12 subtasks resolved).
> > - FLIP-64 Support for Temporary Objects in Table Module [9]
> > - Implementation is in progress
> >
> > - Hive compatibility completion (DDL/UDF) to support full Hive
> integration
> > - FLIP-57 Rework FunctionCatalog [10]
> > - Implementation is in progress (6/9 subtasks resolved)
> > - FLIP-68 Extend Core Table System with Modular Plugins [11]
> > - Implementation is in progress (2/8 subtasks resolved)
> >
> > - Finer grained resource management
> > - FLIP-49: Unified Memory Configuration for TaskExecutors [12]
> > - Implementation is in progress (6/10 subtasks resolved)
> > - FLIP-53: Fine Grained Operator Resource Management [13]
> > - Implementation is in progress (1/9 subtasks resolved)
> >
> > - Finish scheduler re-architecture [14]
> > - Integration tests are being enabled for new scheduler
> >
> > - Executor/Client refactoring [15]
> > - FLIP-81: Executor-related new ConfigOptions [16]
> > - done
> > - FLIP-73: Introducing Executors for job submission [17]
> > - Implementation is in progress
> >
> > - FLIP-36 Support Interactive Programming [18]
> > - Is built on top of FLIP-67 [19], which has been accepted
> > - Implementation in progress
> >
> > - FLIP-58: Flink Python User-Defined Stateless Function for Table [20]
> > - Implementation is in progress (12/22 subtask resolved)
> > - FLIP-50: Spill-able Heap Keyed State Backend [21]
> > - Implementation is in progress (2/11 subtasks resolved)
> >
> > - RocksDB Backend Memory Control [22]
> > - FLIP for resource management on state backend will be opened soon
> > - Write Buffer Manager will be backported to FRocksDB due to
> > performance regression [23] in new RocksDB versions
> >
> > - Unaligned Checkpoints
> > - FLIP-76 [24] was published and received positive feedback
> > - Implementation is in progress
> >
> > - Separate framework and user class loader in per-job mode [25]
> > - First PR is almost done. Remaining PRs will be ready next week
> >
> > - Active Kubernetes Integration [26]
> > - Implementation is in progress (6/11 in review, 3/11 in progress,
> > 2/11 todo)
> >
> > - FLIP-39 Flink ML pipeline and ML libs [27]
> > - A few abstract ML classes have been merged (FLINK-13339,
> > FLINK-13513)
> > - Starting review of algorithms
> >
> > Again, the feature freeze is targeted to be at the end of November.
> Please
> > make sure that all important work threads can be completed until that
> date.
> > Feel free to use this thread to communicate any concerns about features
> > that
> > might not be finished until then. We will send another announcement later
> > in
> > the release cycle to make the date of the feature freeze official.
> >
> > Best,
> > Yu & Gary
> >
> > [1] https://s.apache.org/wc0dc
> > [2] https://issues.apache.org/jira/browse/FLINK-10725
> > [3] https://issues.apache.org/jira/browse/FLINK-14079
> > [4]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-66%3A+Support+time+attribute+in+SQL+DDL
> > [5]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-70%3A+Flink+SQL+Computed+Column+Design
> > [6]
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-FLIP-70-Flink-SQL-Computed-Column-Design-td34385.html
> > [7]
> >
> htt