[GitHub] [flink] zhijiangW commented on a change in pull request #8242: [FLINK-6227][network] Introduce the DataConsumptionException for downstream task failure

2019-05-04 Thread GitBox
zhijiangW commented on a change in pull request #8242: [FLINK-6227][network] 
Introduce the DataConsumptionException for downstream task failure
URL: https://github.com/apache/flink/pull/8242#discussion_r281005469
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 ##
 @@ -162,10 +164,13 @@ void assignExclusiveSegments(List 
segments) {
public void requestSubpartition(int subpartitionIndex) throws 
IOException, InterruptedException {
if (partitionRequestClient == null) {
// Create a client and request the partition
-   partitionRequestClient = connectionManager
-   .createPartitionRequestClient(connectionId);
+   try {
+   partitionRequestClient = 
connectionManager.createPartitionRequestClient(connectionId);
+   } catch (RemoteTransportException ex) {
 
 Review comment:
   It should check `IOException` here and only wrap for 
`RemoteTransportException` instance.
   For the case of `LocalTransportException` it should not need to restart 
producer side.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on issue #8230: [FLINK-10977][table] Add streaming non-window FlatAggregate to Table API

2019-05-04 Thread GitBox
sunjincheng121 commented on issue #8230: [FLINK-10977][table] Add streaming 
non-window FlatAggregate to Table API
URL: https://github.com/apache/flink/pull/8230#issuecomment-489392533
 
 
   Thanks for create the JIRA @hequn8128!
   +1 to merged.
   @flinkbot approve all


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-12401) Support incremental emit for non-window streaming FlatAggregate on Table API

2019-05-04 Thread Hequn Cheng (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12401?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16833223#comment-16833223
 ] 

Hequn Cheng commented on FLINK-12401:
-

[~jark] Hi, great that you are also interested in this.

This jira is going to support emit incremental value either in Acc or 
AccRetract. Whether Acc or AccRetract will be decided by the query. 

As for local-global, I think you raised a good point. Similar to the normal 
Aggregate, flatAggregate could also support local-global and miniBatch. 
However, I planned to support these optimizations later.

What do you think?

> Support incremental emit for non-window streaming FlatAggregate on Table API
> 
>
> Key: FLINK-12401
> URL: https://issues.apache.org/jira/browse/FLINK-12401
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>
> As described in 
> [Flip-29|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=97552739],
>  there are two output modes for non-window streaming flatAggregate. One is 
> emitting with full values, the other is emitting with incremental values. 
> [FLINK-10977|https://issues.apache.org/jira/browse/FLINK-10977] supports the 
> former one, this jira is going to support the latter one. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] hequn8128 commented on issue #8230: [FLINK-10977][table] Add streaming non-window FlatAggregate to Table API

2019-05-04 Thread GitBox
hequn8128 commented on issue #8230: [FLINK-10977][table] Add streaming 
non-window FlatAggregate to Table API
URL: https://github.com/apache/flink/pull/8230#issuecomment-489390597
 
 
   @sunjincheng121  Thank you for the reminder. I have created another 
jira([FLINK-12401](https://issues.apache.org/jira/browse/FLINK-12401)) for the 
incremental mode.
   
   Best, Hequn


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy

2019-05-04 Thread GitBox
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] 
[runtime] Implement LazyFromSourcesScheduling Strategy
URL: https://github.com/apache/flink/pull/8309#discussion_r281004053
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java
 ##
 @@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.api.common.InputDependencyConstraint;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.scheduler.DeploymentOption;
+import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
+import org.apache.flink.runtime.scheduler.SchedulerOperations;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link SchedulingStrategy} instance for batch job which schedule vertices 
when input data are ready.
+ */
+public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy {
+
+   private final SchedulerOperations schedulerOperations;
+
+   private final SchedulingTopology schedulingTopology;
+
+   private final DeploymentOption deploymentOption = new 
DeploymentOption(true);
+
+   private final JobGraph jobGraph;
+
+   public LazyFromSourcesSchedulingStrategy(
+   SchedulerOperations schedulerOperations,
+   SchedulingTopology schedulingTopology,
+   JobGraph jobGraph) {
+   this.schedulerOperations = checkNotNull(schedulerOperations);
+   this.schedulingTopology = checkNotNull(schedulingTopology);
+   this.jobGraph = checkNotNull(jobGraph);
+   }
+
+   @Override
+   public void startScheduling() {
+   List 
executionVertexDeploymentOptions = new LinkedList<>();
+   for (SchedulingVertex schedulingVertex : 
schedulingTopology.getVertices()) {
+   if 
(jobGraph.findVertexByID(schedulingVertex.getJobVertexId()).isInputVertex()) {
+   // schedule vertices without consumed result 
partition
+   executionVertexDeploymentOptions.add(
+   new 
ExecutionVertexDeploymentOption(schedulingVertex.getId(), deploymentOption));
+   }
+   }
+   if (!executionVertexDeploymentOptions.isEmpty()) {
+   
schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions);
+   }
+   }
+
+   @Override
+   public void restartTasks(Set verticesNeedingRestart) 
{
+   List 
executionVertexDeploymentOptions = new 
ArrayList<>(verticesNeedingRestart.size());
+   for (ExecutionVertexID executionVertexID : 
verticesNeedingRestart) {
+   executionVertexDeploymentOptions.add(
+   new 
ExecutionVertexDeploymentOption(executionVertexID, deploymentOption));
+   }
+   
schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions);
+   }
+
+   @Override
+   public void onExecutionStateChange(ExecutionVertexID executionVertexId, 
ExecutionState executionState) {
 
 Review comment:
   Since using vertex state transitions to schedule vertices has the benefit of 
avoiding flood `onPartitionConsumable` notifications, while there may be 
operators that take a very long time until they produce a result of PIPELINED 
shuffle mode. So, I think we could keep both the benefit by relying on both 
vertex state transitions and  `onPartitionConsumable` notifications.
   1. DeploymentOption#sendScheduleOrUpdateConsumerMessage set to true only the 
vertex has PIPELINED produced result partition.
   2. Schedule vertices with BLOCKING input result partition using vertex state 
transition.
   3. Schedule vertices with 

[GitHub] [flink] zhijiangW edited a comment on issue #8242: [FLINK-6227][network] Introduce the DataConsumptionException for downstream task failure

2019-05-04 Thread GitBox
zhijiangW edited a comment on issue #8242: [FLINK-6227][network] Introduce the 
DataConsumptionException for downstream task failure
URL: https://github.com/apache/flink/pull/8242#issuecomment-489389513
 
 
   Thanks for your professional reviews and valuable suggestions @tillrohrmann .
   
I think there are mainly two issues to be confirmed:
   
   1 . Whether we need a new special `DataConsumptionException` or why not use 
existing `PartitionNotFoundException`?
   
   In semantic/conceptional aspect, `PartitionNotFoundException` is actually a 
subtype of `DataConsumptionException`. In functional aspect they provide the 
same information currently, so I agree to maintain only one type of exception 
for simplification.  That means to use `PartitionNotFoundException` directly or 
rename it for covering more semantics.
   
   2. In which cases we should report this `PartitionNotFoundException` for 
JobMaster to restart the producer?
   
   Your above mentioned cases are very guidable and I am trying to further 
specify every case:
   
   -> TaskExecutor no longer reachable
   - TE unavailable during requesting sub partition: Executor shutdown/killed 
would cause exception in establishing network connection or requesting sub 
partition, which is covered by `RemoteInputChannel#requestSubpartition` in PR.
   
   - TE unavailable during transferring partition data: Executor 
shutdown/killed would cause network channel inactive, which is also covered by 
`RemoteInputChannel#onError` in PR.
   
   -> TaskExecutor reachable but result partition not available
   
   It is covered in `RemoteInputChannel#failPartitionRequest` in PR.
   
   -> TaskExecutor reachable, result partition available but sub partition view 
cannot be created (spilled file has been removed)
   
   The producer would response `ErrorResponse` for requesting sub partition, 
then the consumer would call `RemoteInputChannel#onError` for covering this 
case. Because this `ErrorResponse` is fatal error, then it would bring all the 
remote input channels report `DataConsumptionException`. So maybe it is better 
to wrap `DataConsumptionException` on producer side in this case to avoid 
affecting all the remote input channels.
   
   Besides above three cases, there also exists another cause of partition data 
corrupt/deleted during transferring, which is covered by 
`PartitionRequestQueue#exceptionCaught` in PR ATM. Then the consumer would 
receive `ErrorResponse` to call `RemoteInputChannel#onError`. 
   
   In this case I ever considered wrapping `DataConsumptionException` directly 
on producer side in specific `SpilledSubpartitionView#getNextBuffer` to avoid 
reusing the general `ErrorResponse`, because you might think some internal 
network exceptions via `ErrorResponse` do not need to restart the producer 
side. My previous thought was some network exceptions including 
environment/hardware issues might also be suitable to restart producer in other 
executors, otherwise the consumer might still encounter the same problem after 
restarting to consume data from previous executor. So the `ErrorResponse` with 
fatal property is easy to be reused to represent the partition problem here. Or 
we could regard network environment/hardware as separate issue to be solved, 
then wrap the special `DataConsumptionException` in specific views to 
distinguish with existing `ErrorResponse`.
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on issue #8242: [FLINK-6227][network] Introduce the DataConsumptionException for downstream task failure

2019-05-04 Thread GitBox
zhijiangW commented on issue #8242: [FLINK-6227][network] Introduce the 
DataConsumptionException for downstream task failure
URL: https://github.com/apache/flink/pull/8242#issuecomment-489389513
 
 
   Thanks for your professional reviews and valuable suggestions @tillrohrmann .
   
I think there are mainly two issues to be confirmed:
   1 . Whether we need a new special `DataConsumptionException` or why not use 
existing `PartitionNotFoundException`?
   In semantic/conceptional aspect, `PartitionNotFoundException` is actually a 
subtype of `DataConsumptionException`. In functional aspect they provide the 
same information currently, so I agree to maintain only one type of exception 
for simplification.  That means to use `PartitionNotFoundException` directly or 
rename it for covering more semantics.
   
   2. In which cases we should report this `PartitionNotFoundException` for 
JobMaster to restart the producer?
   
   Your above mentioned cases are very guidable and I am trying to further 
specify every case:
   
   -> TaskExecutor no longer reachable
   - TE unavailable during requesting sub partition: Executor shutdown/killed 
would cause exception in establishing network connection or requesting sub 
partition, which is covered by `RemoteInputChannel#requestSubpartition` in PR.
   
   - TE unavailable during transferring partition data: Executor 
shutdown/killed would cause network channel inactive, which is also covered by 
`RemoteInputChannel#onError` in PR.
   
   -> TaskExecutor reachable but result partition not available
   It is covered in `RemoteInputChannel#failPartitionRequest` in PR.
   
   -> TaskExecutor reachable, result partition available but sub partition view 
cannot be created (spilled file has been removed)
   The producer would response `ErrorResponse` for requesting sub partition, 
then the consumer would call `RemoteInputChannel#onError` for covering this 
case. Because this `ErrorResponse` is fatal error, then it would bring all the 
remote input channels report `DataConsumptionException`. So maybe it is better 
to wrap `DataConsumptionException` on producer side in this case to avoid 
affecting all the remote input channels.
   
   Besides above three cases, there also exists another cause of partition data 
corrupt/deleted during transferring, which is covered by 
`PartitionRequestQueue#exceptionCaught` in PR ATM. Then the consumer would 
receive `ErrorResponse` to call `RemoteInputChannel#onError`. 
   
   In this case I ever considered wrapping `DataConsumptionException` directly 
on producer side in specific `SpilledSubpartitionView#getNextBuffer` to avoid 
reusing the general `ErrorResponse`, because you might think some internal 
network exceptions via `ErrorResponse` do not need to restart the producer 
side. My previous thought was some network exceptions including 
environment/hardware issues might also be suitable to restart producer in other 
executors, otherwise the consumer might still encounter the same problem after 
restarting to consume data from previous executor. So the `ErrorResponse` with 
fatal property is easy to be reused to represent the partition problem here. Or 
we could regard network environment/hardware as separate issue to be solved, 
then wrap the special `DataConsumptionException` in specific views to 
distinguish with existing `ErrorResponse`.
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW edited a comment on issue #8242: [FLINK-6227][network] Introduce the DataConsumptionException for downstream task failure

2019-05-04 Thread GitBox
zhijiangW edited a comment on issue #8242: [FLINK-6227][network] Introduce the 
DataConsumptionException for downstream task failure
URL: https://github.com/apache/flink/pull/8242#issuecomment-489389513
 
 
   Thanks for your professional reviews and valuable suggestions @tillrohrmann .
   
I think there are mainly two issues to be confirmed:
   
   1 . Whether we need a new special `DataConsumptionException` or why not use 
existing `PartitionNotFoundException`?
   In semantic/conceptional aspect, `PartitionNotFoundException` is actually a 
subtype of `DataConsumptionException`. In functional aspect they provide the 
same information currently, so I agree to maintain only one type of exception 
for simplification.  That means to use `PartitionNotFoundException` directly or 
rename it for covering more semantics.
   
   2. In which cases we should report this `PartitionNotFoundException` for 
JobMaster to restart the producer?
   
   Your above mentioned cases are very guidable and I am trying to further 
specify every case:
   
   -> TaskExecutor no longer reachable
   - TE unavailable during requesting sub partition: Executor shutdown/killed 
would cause exception in establishing network connection or requesting sub 
partition, which is covered by `RemoteInputChannel#requestSubpartition` in PR.
   
   - TE unavailable during transferring partition data: Executor 
shutdown/killed would cause network channel inactive, which is also covered by 
`RemoteInputChannel#onError` in PR.
   
   -> TaskExecutor reachable but result partition not available
   It is covered in `RemoteInputChannel#failPartitionRequest` in PR.
   
   -> TaskExecutor reachable, result partition available but sub partition view 
cannot be created (spilled file has been removed)
   The producer would response `ErrorResponse` for requesting sub partition, 
then the consumer would call `RemoteInputChannel#onError` for covering this 
case. Because this `ErrorResponse` is fatal error, then it would bring all the 
remote input channels report `DataConsumptionException`. So maybe it is better 
to wrap `DataConsumptionException` on producer side in this case to avoid 
affecting all the remote input channels.
   
   Besides above three cases, there also exists another cause of partition data 
corrupt/deleted during transferring, which is covered by 
`PartitionRequestQueue#exceptionCaught` in PR ATM. Then the consumer would 
receive `ErrorResponse` to call `RemoteInputChannel#onError`. 
   
   In this case I ever considered wrapping `DataConsumptionException` directly 
on producer side in specific `SpilledSubpartitionView#getNextBuffer` to avoid 
reusing the general `ErrorResponse`, because you might think some internal 
network exceptions via `ErrorResponse` do not need to restart the producer 
side. My previous thought was some network exceptions including 
environment/hardware issues might also be suitable to restart producer in other 
executors, otherwise the consumer might still encounter the same problem after 
restarting to consume data from previous executor. So the `ErrorResponse` with 
fatal property is easy to be reused to represent the partition problem here. Or 
we could regard network environment/hardware as separate issue to be solved, 
then wrap the special `DataConsumptionException` in specific views to 
distinguish with existing `ErrorResponse`.
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW edited a comment on issue #8242: [FLINK-6227][network] Introduce the DataConsumptionException for downstream task failure

2019-05-04 Thread GitBox
zhijiangW edited a comment on issue #8242: [FLINK-6227][network] Introduce the 
DataConsumptionException for downstream task failure
URL: https://github.com/apache/flink/pull/8242#issuecomment-489389513
 
 
   Thanks for your professional reviews and valuable suggestions @tillrohrmann .
   
I think there are mainly two issues to be confirmed:
   
   1 . Whether we need a new special `DataConsumptionException` or why not use 
existing `PartitionNotFoundException`?
   
   In semantic/conceptional aspect, `PartitionNotFoundException` is actually a 
subtype of `DataConsumptionException`. In functional aspect they provide the 
same information currently, so I agree to maintain only one type of exception 
for simplification.  That means to use `PartitionNotFoundException` directly or 
rename it for covering more semantics.
   
   2. In which cases we should report this `PartitionNotFoundException` for 
JobMaster to restart the producer?
   
   Your above mentioned cases are very guidable and I am trying to further 
specify every case:
   
   -> TaskExecutor no longer reachable
   - TE unavailable during requesting sub partition: Executor shutdown/killed 
would cause exception in establishing network connection or requesting sub 
partition, which is covered by `RemoteInputChannel#requestSubpartition` in PR.
   
   - TE unavailable during transferring partition data: Executor 
shutdown/killed would cause network channel inactive, which is also covered by 
`RemoteInputChannel#onError` in PR.
   
   -> TaskExecutor reachable but result partition not available
   It is covered in `RemoteInputChannel#failPartitionRequest` in PR.
   
   -> TaskExecutor reachable, result partition available but sub partition view 
cannot be created (spilled file has been removed)
   The producer would response `ErrorResponse` for requesting sub partition, 
then the consumer would call `RemoteInputChannel#onError` for covering this 
case. Because this `ErrorResponse` is fatal error, then it would bring all the 
remote input channels report `DataConsumptionException`. So maybe it is better 
to wrap `DataConsumptionException` on producer side in this case to avoid 
affecting all the remote input channels.
   
   Besides above three cases, there also exists another cause of partition data 
corrupt/deleted during transferring, which is covered by 
`PartitionRequestQueue#exceptionCaught` in PR ATM. Then the consumer would 
receive `ErrorResponse` to call `RemoteInputChannel#onError`. 
   
   In this case I ever considered wrapping `DataConsumptionException` directly 
on producer side in specific `SpilledSubpartitionView#getNextBuffer` to avoid 
reusing the general `ErrorResponse`, because you might think some internal 
network exceptions via `ErrorResponse` do not need to restart the producer 
side. My previous thought was some network exceptions including 
environment/hardware issues might also be suitable to restart producer in other 
executors, otherwise the consumer might still encounter the same problem after 
restarting to consume data from previous executor. So the `ErrorResponse` with 
fatal property is easy to be reused to represent the partition problem here. Or 
we could regard network environment/hardware as separate issue to be solved, 
then wrap the special `DataConsumptionException` in specific views to 
distinguish with existing `ErrorResponse`.
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-12348) Use TableConfig in api module to replace TableConfig in blink-planner module.

2019-05-04 Thread Jark Wu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12348?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16833212#comment-16833212
 ] 

Jark Wu commented on FLINK-12348:
-

Hi [~twalthr], I find that the updates to TableConfig mentioned in FLIP-32 is 
mainly for a builder pattern. But the most important change Blink introduced is 
making TableConfig configurable using plain key-value pairs (similar to runtime 
options in {{flink-conf.yaml}}). So that we can setup a cluster or a job using 
yaml files, for example:

{code:java}
sql.timeZone: UTC
sql.codegen.length.max: 64000
{code}

In order to support this feature, we need a member 
{{org.apache.flink.configuration.Configuration}} in {{TableConfig}}. And move 
{{TableConfigOptions}} into api module. What do you think? [~twalthr] 
[~hequn8128] [~dawidwys]
 

> Use TableConfig in api module to replace TableConfig in blink-planner module.
> -
>
> Key: FLINK-12348
> URL: https://issues.apache.org/jira/browse/FLINK-12348
> Project: Flink
>  Issue Type: Task
>  Components: Table SQL / API
>Reporter: Jing Zhang
>Assignee: Jing Zhang
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Since TableConfig already moved to API module in 
> [FLINK-11067|https://issues.apache.org/jira/browse/FLINK-11067], TableConfig 
> in blink-planner-module should not exist anymore. The issue aims to remove 
> the TableConfig in blink-planner-module, use TableConfig in API module 
> instead.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-11820) SimpleStringSchema handle message record which value is null

2019-05-04 Thread Yu Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11820?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yu Li updated FLINK-11820:
--
Affects Version/s: 1.8.0
Fix Version/s: (was: 1.7.3)
   Issue Type: Bug  (was: Improvement)

> SimpleStringSchema handle message record which value is null
> 
>
> Key: FLINK-11820
> URL: https://issues.apache.org/jira/browse/FLINK-11820
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.7.2, 1.8.0
>Reporter: lamber-ken
>Assignee: lamber-ken
>Priority: Critical
>
> when kafka msg queue contains some records which value is null, 
> flink-kafka-connector can't process these records.
> for example, msg queue like bellow.
> |msg|{color:#ff}null{color}|msg|msg|msg|msg|
>  for normal, use +SimpleStringSchema+ to process msg queue data
> {code:java}
> env.addSource(new FlinkKafkaConsumer010("topic", new SimpleStringSchema(), 
> properties));
> {code}
>  but, will get NullPointerException
> {code:java}
> java.lang.NullPointerException
>   at java.lang.String.(String.java:515)
>   at 
> org.apache.flink.api.common.serialization.SimpleStringSchema.deserialize(SimpleStringSchema.java:75)
>   at 
> org.apache.flink.api.common.serialization.SimpleStringSchema.deserialize(SimpleStringSchema.java:36)
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] carp84 commented on issue #7987: [FLINK-11820][serialization] SimpleStringSchema handle message record which value is null

2019-05-04 Thread GitBox
carp84 commented on issue #7987: [FLINK-11820][serialization] 
SimpleStringSchema handle message record which value is null
URL: https://github.com/apache/flink/pull/7987#issuecomment-489388048
 
 
   @aljoscha @tzulitai Mind take a look here? Thanks.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Resolved] (FLINK-12400) NullpointerException using SimpleStringSchema with Kafka

2019-05-04 Thread Yu Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12400?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yu Li resolved FLINK-12400.
---
Resolution: Duplicate

> NullpointerException using SimpleStringSchema with Kafka
> 
>
> Key: FLINK-12400
> URL: https://issues.apache.org/jira/browse/FLINK-12400
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Type Serialization System
>Affects Versions: 1.7.2, 1.8.0
> Environment: Flink 1.7.2 job on 1.8 cluster
> Kafka 0.10 with a topic in log-compaction
>Reporter: Pierre Zemb
>Assignee: Pierre Zemb
>Priority: Minor
>
> Hi!
> Yesterday, we saw a strange behavior with our Flink job and Kafka. We are 
> consuming a Kafka topic setup in 
> [log-compaction|https://kafka.apache.org/documentation/#compaction] mode. As 
> such, sending a message with a null payload acts like a tombstone.
> We are consuming Kafka like this:
> {code:java}
> new FlinkKafkaConsumer010<>  ("topic", new SimpleStringSchema(), 
> this.kafkaProperties)
> {code}
> When we sent the message, job failed because of a NullPointerException 
> [here|https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/serialization/SimpleStringSchema.java#L75].
>  `byte[] message` was null, causing the NPE. 
> We forked the class and added a basic nullable check, returning null if so. 
> It fixed our issue. 
> Should we add it to the main class?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy

2019-05-04 Thread GitBox
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] 
[runtime] Implement LazyFromSourcesScheduling Strategy
URL: https://github.com/apache/flink/pull/8309#discussion_r281002555
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategyTest.java
 ##
 @@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.api.common.InputDependencyConstraint;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Unit test for {@link LazyFromSourcesSchedulingStrategy}.
+ */
+public class LazyFromSourcesSchedulingStrategyTest extends TestLogger {
+
+   /**
+* Tests that when start scheduling lazy from sources scheduling 
strategy will start input vertices in scheduling topology.
+*/
+   @Test
+   public void testStartScheduling() {
+   JobVertex jobVertex1 = new JobVertex("vertex#1");
+   JobVertex jobVertex2 = new JobVertex("vertex#2");
+   JobGraph graph = new JobGraph(jobVertex1, jobVertex2);
+   jobVertex2.connectNewDataSetAsInput(jobVertex1, 
DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
+
+   TestingSchedulingTopology schedulingTopology = new 
TestingSchedulingTopology();
+
+   for (int i = 0; i < 3; i++) {
+   schedulingTopology.addSchedulingVertex(new 
TestingSchedulingVertex(jobVertex1.getID(), i));
+   schedulingTopology.addSchedulingVertex(new 
TestingSchedulingVertex(jobVertex2.getID(), i));
+   }
+
+   TestingSchedulerOperation testingSchedulerOperation = new 
TestingSchedulerOperation();
+   LazyFromSourcesSchedulingStrategy schedulingStrategy = new 
LazyFromSourcesSchedulingStrategy(
+   testingSchedulerOperation,
+   schedulingTopology,
+   graph);
+
+   schedulingStrategy.startScheduling();
+
+   assertEquals(3, 
testingSchedulerOperation.getScheduledVertices().size());
+   }
+
+   /**
+* Tests that when on execution state change will start available 
downstream vertices.
+* vertex#0vertex#1
+*   \ /
+*\   /
+* \ /
+*  (BLOCKING, ALL)
+* vertex#3  vertex#2
+*   \/
+*\  /
+* \/
+*  (BLOCKING, ANY)
+* vertex#4
+*|
+*|
+*|
+*(PIPELINED)
+* vertex#5
+*/
+   @Test
+   public void testOnExecutionStateChange() {
+   TestingSchedulingTopology schedulingTopology = new 
TestingSchedulingTopology();
+   JobVertex[] jobVertices = new JobVertex[6];
+
+   for (int i = 0; i < 6; i++) {
+   jobVertices[i] = new JobVertex("vertex#" + i);
+   }
+
+   jobVertices[3].connectNewDataSetAsInput(jobVertices[0], 
DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
+   jobVertices[3].connectNewDataSetAsInput(jobVertices[1], 
DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
+   jobVertices[4].connectNewDataSetAsInput(jobVertices[3], 
DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);

[GitHub] [flink] WeiZhong94 commented on a change in pull request #8267: [FLINK-12311][table][python] Add base python framework and Add Scan, Projection, and Filter operator support

2019-05-04 Thread GitBox
WeiZhong94 commented on a change in pull request #8267: 
[FLINK-12311][table][python] Add base python framework and Add Scan, 
Projection, and Filter operator support
URL: https://github.com/apache/flink/pull/8267#discussion_r281002536
 
 

 ##
 File path: flink-python/pyflink/table/table.py
 ##
 @@ -0,0 +1,117 @@
+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from py4j.java_gateway import get_method
+
+__all__ = ['Table']
+
+
+class Table(object):
+
+"""
+A :class:`Table` is the core component of the Table API.
+Similar to how the batch and streaming APIs have DataSet and DataStream,
+the Table API is built around :class:`Table`.
+
+Use the methods of :class:`Table` to transform data.
+
+Example:
+::
+>>> t_config = 
TableConfig.Builder().as_streaming_execution().set_parallelism(1).build()
+>>> t_env = TableEnvironment.get_table_environment(t_config)
+>>> ...
+>>> t = t_env.scan("source")
+>>> t.select(...)
+...
+>>> t.insert_into("print")
+>>> t_env.execute()
+
+Operations such as :func:`~pyflink.table.Table.join`, 
:func:`~pyflink.table.Table.select`,
+:func:`~pyflink.table.Table.where` and 
:func:`~pyflink.table.Table.group_by`
+take arguments in an expression string. Please refer to the documentation 
for
+the expression syntax.
+"""
+
+def __init__(self, j_table):
+self._j_table = j_table
+
+def select(self, fields):
+"""
+Performs a selection operation. Similar to a SQL SELECT statement. The 
field expressions
+can contain complex expressions.
+
+Example:
+::
+>>> t = tab.select("key, value + 'hello'")
 
 Review comment:
   For this point it makes sense. I'll adjust these document to be consistent 
with scala documents.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy

2019-05-04 Thread GitBox
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] 
[runtime] Implement LazyFromSourcesScheduling Strategy
URL: https://github.com/apache/flink/pull/8309#discussion_r281002374
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategyTest.java
 ##
 @@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.api.common.InputDependencyConstraint;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Unit test for {@link LazyFromSourcesSchedulingStrategy}.
+ */
+public class LazyFromSourcesSchedulingStrategyTest extends TestLogger {
+
+   /**
+* Tests that when start scheduling lazy from sources scheduling 
strategy will start input vertices in scheduling topology.
+*/
+   @Test
+   public void testStartScheduling() {
+   JobVertex jobVertex1 = new JobVertex("vertex#1");
+   JobVertex jobVertex2 = new JobVertex("vertex#2");
+   JobGraph graph = new JobGraph(jobVertex1, jobVertex2);
+   jobVertex2.connectNewDataSetAsInput(jobVertex1, 
DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
+
+   TestingSchedulingTopology schedulingTopology = new 
TestingSchedulingTopology();
+
+   for (int i = 0; i < 3; i++) {
+   schedulingTopology.addSchedulingVertex(new 
TestingSchedulingVertex(jobVertex1.getID(), i));
+   schedulingTopology.addSchedulingVertex(new 
TestingSchedulingVertex(jobVertex2.getID(), i));
+   }
+
+   TestingSchedulerOperation testingSchedulerOperation = new 
TestingSchedulerOperation();
+   LazyFromSourcesSchedulingStrategy schedulingStrategy = new 
LazyFromSourcesSchedulingStrategy(
+   testingSchedulerOperation,
+   schedulingTopology,
+   graph);
+
+   schedulingStrategy.startScheduling();
+
+   assertEquals(3, 
testingSchedulerOperation.getScheduledVertices().size());
+   }
+
+   /**
+* Tests that when on execution state change will start available 
downstream vertices.
+* vertex#0vertex#1
+*   \ /
+*\   /
+* \ /
+*  (BLOCKING, ALL)
+* vertex#3  vertex#2
+*   \/
+*\  /
+* \/
+*  (BLOCKING, ANY)
+* vertex#4
+*|
+*|
+*|
+*(PIPELINED)
+* vertex#5
+*/
+   @Test
+   public void testOnExecutionStateChange() {
+   TestingSchedulingTopology schedulingTopology = new 
TestingSchedulingTopology();
+   JobVertex[] jobVertices = new JobVertex[6];
+
+   for (int i = 0; i < 6; i++) {
+   jobVertices[i] = new JobVertex("vertex#" + i);
+   }
+
+   jobVertices[3].connectNewDataSetAsInput(jobVertices[0], 
DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
+   jobVertices[3].connectNewDataSetAsInput(jobVertices[1], 
DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
+   jobVertices[4].connectNewDataSetAsInput(jobVertices[3], 
DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);

[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy

2019-05-04 Thread GitBox
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] 
[runtime] Implement LazyFromSourcesScheduling Strategy
URL: https://github.com/apache/flink/pull/8309#discussion_r281002364
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategyTest.java
 ##
 @@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.api.common.InputDependencyConstraint;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Unit test for {@link LazyFromSourcesSchedulingStrategy}.
+ */
+public class LazyFromSourcesSchedulingStrategyTest extends TestLogger {
+
+   /**
+* Tests that when start scheduling lazy from sources scheduling 
strategy will start input vertices in scheduling topology.
+*/
+   @Test
+   public void testStartScheduling() {
+   JobVertex jobVertex1 = new JobVertex("vertex#1");
+   JobVertex jobVertex2 = new JobVertex("vertex#2");
+   JobGraph graph = new JobGraph(jobVertex1, jobVertex2);
+   jobVertex2.connectNewDataSetAsInput(jobVertex1, 
DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
+
+   TestingSchedulingTopology schedulingTopology = new 
TestingSchedulingTopology();
+
+   for (int i = 0; i < 3; i++) {
+   schedulingTopology.addSchedulingVertex(new 
TestingSchedulingVertex(jobVertex1.getID(), i));
+   schedulingTopology.addSchedulingVertex(new 
TestingSchedulingVertex(jobVertex2.getID(), i));
+   }
+
+   TestingSchedulerOperation testingSchedulerOperation = new 
TestingSchedulerOperation();
+   LazyFromSourcesSchedulingStrategy schedulingStrategy = new 
LazyFromSourcesSchedulingStrategy(
+   testingSchedulerOperation,
+   schedulingTopology,
+   graph);
+
+   schedulingStrategy.startScheduling();
+
+   assertEquals(3, 
testingSchedulerOperation.getScheduledVertices().size());
+   }
+
+   /**
+* Tests that when on execution state change will start available 
downstream vertices.
+* vertex#0vertex#1
+*   \ /
+*\   /
+* \ /
+*  (BLOCKING, ALL)
+* vertex#3  vertex#2
+*   \/
+*\  /
+* \/
+*  (BLOCKING, ANY)
+* vertex#4
+*|
+*|
+*|
+*(PIPELINED)
+* vertex#5
+*/
+   @Test
+   public void testOnExecutionStateChange() {
+   TestingSchedulingTopology schedulingTopology = new 
TestingSchedulingTopology();
 
 Review comment:
   OK


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy

2019-05-04 Thread GitBox
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] 
[runtime] Implement LazyFromSourcesScheduling Strategy
URL: https://github.com/apache/flink/pull/8309#discussion_r281002357
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java
 ##
 @@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.api.common.InputDependencyConstraint;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.scheduler.DeploymentOption;
+import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
+import org.apache.flink.runtime.scheduler.SchedulerOperations;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link SchedulingStrategy} instance for batch job which schedule vertices 
when input data are ready.
+ */
+public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy {
+
+   private final SchedulerOperations schedulerOperations;
+
+   private final SchedulingTopology schedulingTopology;
+
+   private final DeploymentOption deploymentOption = new 
DeploymentOption(true);
+
+   private final JobGraph jobGraph;
+
+   private final Map 
intermediateDataSets;
+
+   public LazyFromSourcesSchedulingStrategy(
+   SchedulerOperations schedulerOperations,
+   SchedulingTopology schedulingTopology,
+   JobGraph jobGraph) {
+   this.schedulerOperations = checkNotNull(schedulerOperations);
+   this.schedulingTopology = checkNotNull(schedulingTopology);
+   this.jobGraph = checkNotNull(jobGraph);
+   this.intermediateDataSets = new HashMap<>();
+   }
+
+   @Override
+   public void startScheduling() {
+   List 
executionVertexDeploymentOptions = new LinkedList<>();
+   for (SchedulingVertex schedulingVertex : 
schedulingTopology.getVertices()) {
+   if 
(jobGraph.findVertexByID(schedulingVertex.getJobVertexId()).isInputVertex()) {
+   // schedule vertices without consumed result 
partition
+   executionVertexDeploymentOptions.add(
+   new 
ExecutionVertexDeploymentOption(schedulingVertex.getId(), deploymentOption));
+   }
+
+   Collection partitions = 
schedulingVertex.getProducedResultPartitions();
+   if (partitions != null) {
+   for (SchedulingResultPartition srp : 
partitions) {
+   SchedulingIntermediateDataSet sid = 
intermediateDataSets.computeIfAbsent(srp.getResultId(),
+   (key) -> new 
SchedulingIntermediateDataSet());
+   sid.increaseProducerCnt();
+   }
+   }
+   }
+   if (!executionVertexDeploymentOptions.isEmpty()) {
+   
schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions);
+   }
+   }
+
+   @Override
+   public void restartTasks(Set verticesNeedingRestart) 
{
+   List 
executionVertexDeploymentOptions = new 
ArrayList<>(verticesNeedingRestart.size());
+   for (ExecutionVertexID executionVertexID : 
verticesNeedingRestart) {
+   executionVertexDeploymentOptions.add(
+   new 
ExecutionVertexDeploymentOption(executionVertexID, deploymentOption));

[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy

2019-05-04 Thread GitBox
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] 
[runtime] Implement LazyFromSourcesScheduling Strategy
URL: https://github.com/apache/flink/pull/8309#discussion_r281002317
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java
 ##
 @@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.api.common.InputDependencyConstraint;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.scheduler.DeploymentOption;
+import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
+import org.apache.flink.runtime.scheduler.SchedulerOperations;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link SchedulingStrategy} instance for batch job which schedule vertices 
when input data are ready.
+ */
+public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy {
+
+   private final SchedulerOperations schedulerOperations;
+
+   private final SchedulingTopology schedulingTopology;
+
+   private final DeploymentOption deploymentOption = new 
DeploymentOption(true);
+
+   private final JobGraph jobGraph;
+
+   private final Map 
intermediateDataSets;
+
+   public LazyFromSourcesSchedulingStrategy(
+   SchedulerOperations schedulerOperations,
+   SchedulingTopology schedulingTopology,
+   JobGraph jobGraph) {
+   this.schedulerOperations = checkNotNull(schedulerOperations);
+   this.schedulingTopology = checkNotNull(schedulingTopology);
+   this.jobGraph = checkNotNull(jobGraph);
+   this.intermediateDataSets = new HashMap<>();
+   }
+
+   @Override
+   public void startScheduling() {
+   List 
executionVertexDeploymentOptions = new LinkedList<>();
+   for (SchedulingVertex schedulingVertex : 
schedulingTopology.getVertices()) {
+   if 
(jobGraph.findVertexByID(schedulingVertex.getJobVertexId()).isInputVertex()) {
+   // schedule vertices without consumed result 
partition
+   executionVertexDeploymentOptions.add(
+   new 
ExecutionVertexDeploymentOption(schedulingVertex.getId(), deploymentOption));
+   }
+
+   Collection partitions = 
schedulingVertex.getProducedResultPartitions();
+   if (partitions != null) {
+   for (SchedulingResultPartition srp : 
partitions) {
+   SchedulingIntermediateDataSet sid = 
intermediateDataSets.computeIfAbsent(srp.getResultId(),
+   (key) -> new 
SchedulingIntermediateDataSet());
+   sid.increaseProducerCnt();
+   }
+   }
+   }
+   if (!executionVertexDeploymentOptions.isEmpty()) {
+   
schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions);
+   }
+   }
+
+   @Override
+   public void restartTasks(Set verticesNeedingRestart) 
{
+   List 
executionVertexDeploymentOptions = new 
ArrayList<>(verticesNeedingRestart.size());
+   for (ExecutionVertexID executionVertexID : 
verticesNeedingRestart) {
+   executionVertexDeploymentOptions.add(
+   new 
ExecutionVertexDeploymentOption(executionVertexID, deploymentOption));

[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy

2019-05-04 Thread GitBox
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] 
[runtime] Implement LazyFromSourcesScheduling Strategy
URL: https://github.com/apache/flink/pull/8309#discussion_r281002177
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java
 ##
 @@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.api.common.InputDependencyConstraint;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.scheduler.DeploymentOption;
+import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
+import org.apache.flink.runtime.scheduler.SchedulerOperations;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link SchedulingStrategy} instance for batch job which schedule vertices 
when input data are ready.
+ */
+public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy {
+
+   private final SchedulerOperations schedulerOperations;
+
+   private final SchedulingTopology schedulingTopology;
+
+   private final DeploymentOption deploymentOption = new 
DeploymentOption(true);
+
+   private final JobGraph jobGraph;
+
+   private final Map 
intermediateDataSets;
+
+   public LazyFromSourcesSchedulingStrategy(
+   SchedulerOperations schedulerOperations,
+   SchedulingTopology schedulingTopology,
+   JobGraph jobGraph) {
+   this.schedulerOperations = checkNotNull(schedulerOperations);
+   this.schedulingTopology = checkNotNull(schedulingTopology);
+   this.jobGraph = checkNotNull(jobGraph);
+   this.intermediateDataSets = new HashMap<>();
+   }
+
+   @Override
+   public void startScheduling() {
+   List 
executionVertexDeploymentOptions = new LinkedList<>();
+   for (SchedulingVertex schedulingVertex : 
schedulingTopology.getVertices()) {
+   if 
(jobGraph.findVertexByID(schedulingVertex.getJobVertexId()).isInputVertex()) {
+   // schedule vertices without consumed result 
partition
+   executionVertexDeploymentOptions.add(
+   new 
ExecutionVertexDeploymentOption(schedulingVertex.getId(), deploymentOption));
+   }
+
+   Collection partitions = 
schedulingVertex.getProducedResultPartitions();
+   if (partitions != null) {
+   for (SchedulingResultPartition srp : 
partitions) {
+   SchedulingIntermediateDataSet sid = 
intermediateDataSets.computeIfAbsent(srp.getResultId(),
+   (key) -> new 
SchedulingIntermediateDataSet());
+   sid.increaseProducerCnt();
+   }
+   }
+   }
+   if (!executionVertexDeploymentOptions.isEmpty()) {
+   
schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions);
+   }
+   }
+
+   @Override
+   public void restartTasks(Set verticesNeedingRestart) 
{
+   List 
executionVertexDeploymentOptions = new 
ArrayList<>(verticesNeedingRestart.size());
+   for (ExecutionVertexID executionVertexID : 
verticesNeedingRestart) {
+   executionVertexDeploymentOptions.add(
+   new 
ExecutionVertexDeploymentOption(executionVertexID, deploymentOption));

[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy

2019-05-04 Thread GitBox
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] 
[runtime] Implement LazyFromSourcesScheduling Strategy
URL: https://github.com/apache/flink/pull/8309#discussion_r281002120
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java
 ##
 @@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.api.common.InputDependencyConstraint;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.scheduler.DeploymentOption;
+import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
+import org.apache.flink.runtime.scheduler.SchedulerOperations;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link SchedulingStrategy} instance for batch job which schedule vertices 
when input data are ready.
+ */
+public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy {
+
+   private final SchedulerOperations schedulerOperations;
+
+   private final SchedulingTopology schedulingTopology;
+
+   private final DeploymentOption deploymentOption = new 
DeploymentOption(true);
+
+   private final JobGraph jobGraph;
+
+   private final Map 
intermediateDataSets;
+
+   public LazyFromSourcesSchedulingStrategy(
+   SchedulerOperations schedulerOperations,
+   SchedulingTopology schedulingTopology,
+   JobGraph jobGraph) {
+   this.schedulerOperations = checkNotNull(schedulerOperations);
+   this.schedulingTopology = checkNotNull(schedulingTopology);
+   this.jobGraph = checkNotNull(jobGraph);
+   this.intermediateDataSets = new HashMap<>();
+   }
+
+   @Override
+   public void startScheduling() {
+   List 
executionVertexDeploymentOptions = new LinkedList<>();
+   for (SchedulingVertex schedulingVertex : 
schedulingTopology.getVertices()) {
+   if 
(jobGraph.findVertexByID(schedulingVertex.getJobVertexId()).isInputVertex()) {
+   // schedule vertices without consumed result 
partition
+   executionVertexDeploymentOptions.add(
+   new 
ExecutionVertexDeploymentOption(schedulingVertex.getId(), deploymentOption));
+   }
+
+   Collection partitions = 
schedulingVertex.getProducedResultPartitions();
+   if (partitions != null) {
+   for (SchedulingResultPartition srp : 
partitions) {
+   SchedulingIntermediateDataSet sid = 
intermediateDataSets.computeIfAbsent(srp.getResultId(),
+   (key) -> new 
SchedulingIntermediateDataSet());
+   sid.increaseProducerCnt();
+   }
+   }
+   }
+   if (!executionVertexDeploymentOptions.isEmpty()) {
+   
schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions);
+   }
+   }
+
+   @Override
+   public void restartTasks(Set verticesNeedingRestart) 
{
+   List 
executionVertexDeploymentOptions = new 
ArrayList<>(verticesNeedingRestart.size());
+   for (ExecutionVertexID executionVertexID : 
verticesNeedingRestart) {
+   executionVertexDeploymentOptions.add(
+   new 
ExecutionVertexDeploymentOption(executionVertexID, deploymentOption));

[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy

2019-05-04 Thread GitBox
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] 
[runtime] Implement LazyFromSourcesScheduling Strategy
URL: https://github.com/apache/flink/pull/8309#discussion_r281002108
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingResultPartition.java
 ##
 @@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+
+import java.util.Collection;
+
+/**
+ * Simple implementation of {@link SchedulingResultPartition} for testing.
+ */
+public class TestingSchedulingResultPartition implements 
SchedulingResultPartition {
+   private final IntermediateDataSetID intermediateDataSetID;
+
+   private final IntermediateResultPartitionID 
intermediateResultPartitionID;
+
+   private final ResultPartitionType partitionType;
+
+   private final SchedulingVertex producer;
+
+   private Collection consumers;
 
 Review comment:
   OK


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy

2019-05-04 Thread GitBox
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] 
[runtime] Implement LazyFromSourcesScheduling Strategy
URL: https://github.com/apache/flink/pull/8309#discussion_r281002116
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java
 ##
 @@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.api.common.InputDependencyConstraint;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.scheduler.DeploymentOption;
+import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
+import org.apache.flink.runtime.scheduler.SchedulerOperations;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link SchedulingStrategy} instance for batch job which schedule vertices 
when input data are ready.
+ */
+public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy {
+
+   private final SchedulerOperations schedulerOperations;
+
+   private final SchedulingTopology schedulingTopology;
+
+   private final DeploymentOption deploymentOption = new 
DeploymentOption(true);
+
+   private final JobGraph jobGraph;
+
+   private final Map 
intermediateDataSets;
+
+   public LazyFromSourcesSchedulingStrategy(
+   SchedulerOperations schedulerOperations,
+   SchedulingTopology schedulingTopology,
+   JobGraph jobGraph) {
+   this.schedulerOperations = checkNotNull(schedulerOperations);
+   this.schedulingTopology = checkNotNull(schedulingTopology);
+   this.jobGraph = checkNotNull(jobGraph);
+   this.intermediateDataSets = new HashMap<>();
+   }
+
+   @Override
+   public void startScheduling() {
+   List 
executionVertexDeploymentOptions = new LinkedList<>();
+   for (SchedulingVertex schedulingVertex : 
schedulingTopology.getVertices()) {
+   if 
(jobGraph.findVertexByID(schedulingVertex.getJobVertexId()).isInputVertex()) {
+   // schedule vertices without consumed result 
partition
+   executionVertexDeploymentOptions.add(
+   new 
ExecutionVertexDeploymentOption(schedulingVertex.getId(), deploymentOption));
+   }
+
+   Collection partitions = 
schedulingVertex.getProducedResultPartitions();
+   if (partitions != null) {
 
 Review comment:
   OK


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy

2019-05-04 Thread GitBox
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] 
[runtime] Implement LazyFromSourcesScheduling Strategy
URL: https://github.com/apache/flink/pull/8309#discussion_r281002114
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java
 ##
 @@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.api.common.InputDependencyConstraint;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.scheduler.DeploymentOption;
+import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
+import org.apache.flink.runtime.scheduler.SchedulerOperations;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link SchedulingStrategy} instance for batch job which schedule vertices 
when input data are ready.
+ */
+public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy {
+
+   private final SchedulerOperations schedulerOperations;
+
+   private final SchedulingTopology schedulingTopology;
+
+   private final DeploymentOption deploymentOption = new 
DeploymentOption(true);
+
+   private final JobGraph jobGraph;
+
+   private final Map 
intermediateDataSets;
+
+   public LazyFromSourcesSchedulingStrategy(
+   SchedulerOperations schedulerOperations,
+   SchedulingTopology schedulingTopology,
+   JobGraph jobGraph) {
+   this.schedulerOperations = checkNotNull(schedulerOperations);
+   this.schedulingTopology = checkNotNull(schedulingTopology);
+   this.jobGraph = checkNotNull(jobGraph);
+   this.intermediateDataSets = new HashMap<>();
+   }
+
+   @Override
+   public void startScheduling() {
+   List 
executionVertexDeploymentOptions = new LinkedList<>();
+   for (SchedulingVertex schedulingVertex : 
schedulingTopology.getVertices()) {
+   if 
(jobGraph.findVertexByID(schedulingVertex.getJobVertexId()).isInputVertex()) {
+   // schedule vertices without consumed result 
partition
+   executionVertexDeploymentOptions.add(
+   new 
ExecutionVertexDeploymentOption(schedulingVertex.getId(), deploymentOption));
+   }
+
+   Collection partitions = 
schedulingVertex.getProducedResultPartitions();
+   if (partitions != null) {
+   for (SchedulingResultPartition srp : 
partitions) {
+   SchedulingIntermediateDataSet sid = 
intermediateDataSets.computeIfAbsent(srp.getResultId(),
+   (key) -> new 
SchedulingIntermediateDataSet());
+   sid.increaseProducerCnt();
+   }
+   }
+   }
+   if (!executionVertexDeploymentOptions.isEmpty()) {
 
 Review comment:
   OK


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy

2019-05-04 Thread GitBox
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] 
[runtime] Implement LazyFromSourcesScheduling Strategy
URL: https://github.com/apache/flink/pull/8309#discussion_r281002106
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingResultPartition.java
 ##
 @@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+
+import java.util.Collection;
+
+/**
+ * Simple implementation of {@link SchedulingResultPartition} for testing.
+ */
+public class TestingSchedulingResultPartition implements 
SchedulingResultPartition {
+   private final IntermediateDataSetID intermediateDataSetID;
+
+   private final IntermediateResultPartitionID 
intermediateResultPartitionID;
+
+   private final ResultPartitionType partitionType;
+
+   private final SchedulingVertex producer;
+
+   private Collection consumers;
+
+   TestingSchedulingResultPartition(IntermediateDataSetID dataSetID,
+   IntermediateResultPartitionID partitionID, 
ResultPartitionType type, SchedulingVertex producer,
+   Collection consumers) {
+   this.intermediateDataSetID = dataSetID;
+   this.intermediateResultPartitionID = partitionID;
+   this.partitionType = type;
+   this.producer = producer;
+   this.consumers = consumers;
+   }
+
+   @Override
+   public IntermediateResultPartitionID getId() {
+   return intermediateResultPartitionID;
+   }
+
+   @Override
+   public IntermediateDataSetID getResultId() {
+   return intermediateDataSetID;
+   }
+
+   @Override
+   public ResultPartitionType getPartitionType() {
+   return partitionType;
+   }
+
+   @Override
+   public ResultPartitionState getState() {
+   return ResultPartitionState.DONE;
+   }
+
+   @Override
+   public SchedulingVertex getProducer() {
+   return producer;
+   }
+
+   @Override
+   public Collection getConsumers() {
+   return consumers;
+   }
+
+   public void setConsumers(Collection consumers) {
+   this.consumers = consumers;
 
 Review comment:
   OK


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy

2019-05-04 Thread GitBox
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] 
[runtime] Implement LazyFromSourcesScheduling Strategy
URL: https://github.com/apache/flink/pull/8309#discussion_r281002110
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java
 ##
 @@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.api.common.InputDependencyConstraint;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.scheduler.DeploymentOption;
+import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
+import org.apache.flink.runtime.scheduler.SchedulerOperations;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link SchedulingStrategy} instance for batch job which schedule vertices 
when input data are ready.
+ */
+public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy {
+
+   private final SchedulerOperations schedulerOperations;
+
+   private final SchedulingTopology schedulingTopology;
+
+   private final DeploymentOption deploymentOption = new 
DeploymentOption(true);
+
+   private final JobGraph jobGraph;
+
+   private final Map 
intermediateDataSets;
+
+   public LazyFromSourcesSchedulingStrategy(
+   SchedulerOperations schedulerOperations,
+   SchedulingTopology schedulingTopology,
+   JobGraph jobGraph) {
+   this.schedulerOperations = checkNotNull(schedulerOperations);
+   this.schedulingTopology = checkNotNull(schedulingTopology);
+   this.jobGraph = checkNotNull(jobGraph);
+   this.intermediateDataSets = new HashMap<>();
+   }
+
+   @Override
+   public void startScheduling() {
+   List 
executionVertexDeploymentOptions = new LinkedList<>();
+   for (SchedulingVertex schedulingVertex : 
schedulingTopology.getVertices()) {
+   if 
(jobGraph.findVertexByID(schedulingVertex.getJobVertexId()).isInputVertex()) {
+   // schedule vertices without consumed result 
partition
+   executionVertexDeploymentOptions.add(
+   new 
ExecutionVertexDeploymentOption(schedulingVertex.getId(), deploymentOption));
+   }
+
+   Collection partitions = 
schedulingVertex.getProducedResultPartitions();
+   if (partitions != null) {
+   for (SchedulingResultPartition srp : 
partitions) {
+   SchedulingIntermediateDataSet sid = 
intermediateDataSets.computeIfAbsent(srp.getResultId(),
+   (key) -> new 
SchedulingIntermediateDataSet());
+   sid.increaseProducerCnt();
+   }
+   }
+   }
+   if (!executionVertexDeploymentOptions.isEmpty()) {
+   
schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions);
+   }
+   }
+
+   @Override
+   public void restartTasks(Set verticesNeedingRestart) 
{
+   List 
executionVertexDeploymentOptions = new 
ArrayList<>(verticesNeedingRestart.size());
+   for (ExecutionVertexID executionVertexID : 
verticesNeedingRestart) {
+   executionVertexDeploymentOptions.add(
+   new 
ExecutionVertexDeploymentOption(executionVertexID, deploymentOption));

[GitHub] [flink] sunjincheng121 commented on a change in pull request #8267: [FLINK-12311][table][python] Add base python framework and Add Scan, Projection, and Filter operator support

2019-05-04 Thread GitBox
sunjincheng121 commented on a change in pull request #8267: 
[FLINK-12311][table][python] Add base python framework and Add Scan, 
Projection, and Filter operator support
URL: https://github.com/apache/flink/pull/8267#discussion_r281002095
 
 

 ##
 File path: flink-dist/src/main/flink-bin/bin/pyflink2.sh
 ##
 @@ -0,0 +1,48 @@
+#!/usr/bin/env bash
+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+# =
+bin=`dirname "$0"`
+bin=`cd "$bin"; pwd`
+
+. "$bin"/config.sh
+
+FLINK_CLASSPATH=`constructFlinkClassPath`
+
+ARGS=()
+
+while [[ $# -gt 0 ]]
+do
+key="$1"
+case $key in
+-c|--class)
+DRIVER=$2
+shift
+shift
+;;
+*)
+   ARGS+=("$1")
+   shift
+   ;;
+esac
+done
+
+PYTHON_JAR_PATH=`echo "$FLINK_ROOT_DIR"/opt/flink-python-*.jar`
+TABLE_JAR_PATH=`echo "$FLINK_ROOT_DIR"/opt/flink-table*.jar`
+exec $JAVA_RUN $JVM_ARGS -cp 
${FLINK_CLASSPATH}:${TABLE_JAR_PATH}:${PYTHON_JAR_PATH} ${DRIVER} ${ARGS[@]}
 
 Review comment:
   Make sense, if we change the` pyflink2.sh` to `pyflink-gateway-server.sh` 
which user never use it directly.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy

2019-05-04 Thread GitBox
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] 
[runtime] Implement LazyFromSourcesScheduling Strategy
URL: https://github.com/apache/flink/pull/8309#discussion_r281001965
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java
 ##
 @@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.api.common.InputDependencyConstraint;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.scheduler.DeploymentOption;
+import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
+import org.apache.flink.runtime.scheduler.SchedulerOperations;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link SchedulingStrategy} instance for batch job which schedule vertices 
when input data are ready.
+ */
+public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy {
+
+   private final SchedulerOperations schedulerOperations;
+
+   private final SchedulingTopology schedulingTopology;
+
+   private final DeploymentOption deploymentOption = new 
DeploymentOption(true);
+
+   private final JobGraph jobGraph;
+
+   private final Map 
intermediateDataSets;
+
+   public LazyFromSourcesSchedulingStrategy(
+   SchedulerOperations schedulerOperations,
+   SchedulingTopology schedulingTopology,
+   JobGraph jobGraph) {
+   this.schedulerOperations = checkNotNull(schedulerOperations);
+   this.schedulingTopology = checkNotNull(schedulingTopology);
+   this.jobGraph = checkNotNull(jobGraph);
+   this.intermediateDataSets = new HashMap<>();
+   }
+
+   @Override
+   public void startScheduling() {
+   List 
executionVertexDeploymentOptions = new LinkedList<>();
+   for (SchedulingVertex schedulingVertex : 
schedulingTopology.getVertices()) {
+   if 
(jobGraph.findVertexByID(schedulingVertex.getJobVertexId()).isInputVertex()) {
+   // schedule vertices without consumed result 
partition
+   executionVertexDeploymentOptions.add(
+   new 
ExecutionVertexDeploymentOption(schedulingVertex.getId(), deploymentOption));
+   }
+
+   Collection partitions = 
schedulingVertex.getProducedResultPartitions();
+   if (partitions != null) {
+   for (SchedulingResultPartition srp : 
partitions) {
+   SchedulingIntermediateDataSet sid = 
intermediateDataSets.computeIfAbsent(srp.getResultId(),
+   (key) -> new 
SchedulingIntermediateDataSet());
+   sid.increaseProducerCnt();
+   }
+   }
+   }
+   if (!executionVertexDeploymentOptions.isEmpty()) {
+   
schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions);
+   }
+   }
+
+   @Override
+   public void restartTasks(Set verticesNeedingRestart) 
{
+   List 
executionVertexDeploymentOptions = new 
ArrayList<>(verticesNeedingRestart.size());
+   for (ExecutionVertexID executionVertexID : 
verticesNeedingRestart) {
+   executionVertexDeploymentOptions.add(
+   new 
ExecutionVertexDeploymentOption(executionVertexID, deploymentOption));

[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy

2019-05-04 Thread GitBox
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] 
[runtime] Implement LazyFromSourcesScheduling Strategy
URL: https://github.com/apache/flink/pull/8309#discussion_r281001882
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java
 ##
 @@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.api.common.InputDependencyConstraint;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.scheduler.DeploymentOption;
+import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
+import org.apache.flink.runtime.scheduler.SchedulerOperations;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link SchedulingStrategy} instance for batch job which schedule vertices 
when input data are ready.
+ */
+public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy {
+
+   private final SchedulerOperations schedulerOperations;
+
+   private final SchedulingTopology schedulingTopology;
+
+   private final DeploymentOption deploymentOption = new 
DeploymentOption(true);
+
+   private final JobGraph jobGraph;
+
+   private final Map 
intermediateDataSets;
+
+   public LazyFromSourcesSchedulingStrategy(
+   SchedulerOperations schedulerOperations,
+   SchedulingTopology schedulingTopology,
+   JobGraph jobGraph) {
+   this.schedulerOperations = checkNotNull(schedulerOperations);
+   this.schedulingTopology = checkNotNull(schedulingTopology);
+   this.jobGraph = checkNotNull(jobGraph);
+   this.intermediateDataSets = new HashMap<>();
+   }
+
+   @Override
+   public void startScheduling() {
+   List 
executionVertexDeploymentOptions = new LinkedList<>();
+   for (SchedulingVertex schedulingVertex : 
schedulingTopology.getVertices()) {
+   if 
(jobGraph.findVertexByID(schedulingVertex.getJobVertexId()).isInputVertex()) {
 
 Review comment:
   Yes, `schedulingVertex.getConsumedResultPartitions().isEmpty()` would work.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy

2019-05-04 Thread GitBox
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] 
[runtime] Implement LazyFromSourcesScheduling Strategy
URL: https://github.com/apache/flink/pull/8309#discussion_r281001867
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java
 ##
 @@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.api.common.InputDependencyConstraint;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.scheduler.DeploymentOption;
+import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
+import org.apache.flink.runtime.scheduler.SchedulerOperations;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link SchedulingStrategy} instance for batch job which schedule vertices 
when input data are ready.
+ */
+public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy {
+
+   private final SchedulerOperations schedulerOperations;
+
+   private final SchedulingTopology schedulingTopology;
+
+   private final DeploymentOption deploymentOption = new 
DeploymentOption(true);
+
+   private final JobGraph jobGraph;
+
+   private final Map 
intermediateDataSets;
+
+   public LazyFromSourcesSchedulingStrategy(
+   SchedulerOperations schedulerOperations,
+   SchedulingTopology schedulingTopology,
+   JobGraph jobGraph) {
+   this.schedulerOperations = checkNotNull(schedulerOperations);
+   this.schedulingTopology = checkNotNull(schedulingTopology);
+   this.jobGraph = checkNotNull(jobGraph);
+   this.intermediateDataSets = new HashMap<>();
+   }
+
+   @Override
+   public void startScheduling() {
+   List 
executionVertexDeploymentOptions = new LinkedList<>();
+   for (SchedulingVertex schedulingVertex : 
schedulingTopology.getVertices()) {
+   if 
(jobGraph.findVertexByID(schedulingVertex.getJobVertexId()).isInputVertex()) {
+   // schedule vertices without consumed result 
partition
+   executionVertexDeploymentOptions.add(
+   new 
ExecutionVertexDeploymentOption(schedulingVertex.getId(), deploymentOption));
+   }
+
+   Collection partitions = 
schedulingVertex.getProducedResultPartitions();
+   if (partitions != null) {
+   for (SchedulingResultPartition srp : 
partitions) {
+   SchedulingIntermediateDataSet sid = 
intermediateDataSets.computeIfAbsent(srp.getResultId(),
+   (key) -> new 
SchedulingIntermediateDataSet());
+   sid.increaseProducerCnt();
+   }
+   }
+   }
+   if (!executionVertexDeploymentOptions.isEmpty()) {
+   
schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions);
+   }
+   }
+
+   @Override
+   public void restartTasks(Set verticesNeedingRestart) 
{
 
 Review comment:
   OK, I thought the `verticesNeedingRestart` of `restartTasks` are well chosen 
for directly scheduling vertices before.
   There would be another little difference between `startScheduling` and 
`restartTasks` in implementation, the former schedules the vertices without 
input result partition of all vertices in the topology, and the 

[jira] [Assigned] (FLINK-10976) Add Aggregate operator to Table API

2019-05-04 Thread Hequn Cheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10976?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hequn Cheng reassigned FLINK-10976:
---

Assignee: Hequn Cheng

> Add Aggregate operator to Table API
> ---
>
> Key: FLINK-10976
> URL: https://issues.apache.org/jira/browse/FLINK-10976
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: sunjincheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> Add Aggregate operator to Table API as described in [Google 
> doc|https://docs.google.com/document/d/1tnpxg31EQz2-MEzSotwFzqatsB4rNLz0I-l_vPa5H4Q/edit#heading=h.q23rny2iglsr].
> The usage:
> {code:java}
> val res = tab
>  .groupBy('a) // leave out groupBy-clause to define global aggregates
>  .agg(fun: AggregateFunction)  // output has columns 'a, 'b, 'c
>  .select('a, 'c)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] flinkbot commented on issue #8343: [hotfix][runtime] Remove useless code in JSONGenerator

2019-05-04 Thread GitBox
flinkbot commented on issue #8343: [hotfix][runtime] Remove useless code in 
JSONGenerator
URL: https://github.com/apache/flink/pull/8343#issuecomment-489383552
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #8295: [FLINK-11974][runtime] Introduce StreamOperatorFactory

2019-05-04 Thread GitBox
JingsongLi commented on a change in pull request #8295: [FLINK-11974][runtime] 
Introduce StreamOperatorFactory
URL: https://github.com/apache/flink/pull/8295#discussion_r281001725
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/JSONGenerator.java
 ##
 @@ -187,8 +186,6 @@ private void decorateNode(Integer vertexID, ObjectNode 
node) {
node.put(PACT, "Operator");
}
 
-   StreamOperator operator = 
streamGraph.getStreamNode(vertexID).getOperator();
 
 Review comment:
   Yes, new pr: https://github.com/apache/flink/pull/8343


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi opened a new pull request #8343: [hotfix][runtime] Remove useless code in JSONGenerator

2019-05-04 Thread GitBox
JingsongLi opened a new pull request #8343: [hotfix][runtime] Remove useless 
code in JSONGenerator
URL: https://github.com/apache/flink/pull/8343
 
 
   ## What is the purpose of the change
   
   Remove useless code in JSONGenerator for 
https://github.com/apache/flink/pull/8295
   
   ## Verifying this change
   none
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Assigned] (FLINK-10976) Add Aggregate operator to Table API

2019-05-04 Thread Dian Fu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10976?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dian Fu reassigned FLINK-10976:
---

Assignee: (was: Dian Fu)

> Add Aggregate operator to Table API
> ---
>
> Key: FLINK-10976
> URL: https://issues.apache.org/jira/browse/FLINK-10976
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: sunjincheng
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> Add Aggregate operator to Table API as described in [Google 
> doc|https://docs.google.com/document/d/1tnpxg31EQz2-MEzSotwFzqatsB4rNLz0I-l_vPa5H4Q/edit#heading=h.q23rny2iglsr].
> The usage:
> {code:java}
> val res = tab
>  .groupBy('a) // leave out groupBy-clause to define global aggregates
>  .agg(fun: AggregateFunction)  // output has columns 'a, 'b, 'c
>  .select('a, 'c)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] dianfu closed pull request #7235: [FLINK-10976] [table] Add support for aggregate to table API

2019-05-04 Thread GitBox
dianfu closed pull request #7235: [FLINK-10976] [table] Add support for 
aggregate to table API
URL: https://github.com/apache/flink/pull/7235
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy

2019-05-04 Thread GitBox
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] 
[runtime] Implement LazyFromSourcesScheduling Strategy
URL: https://github.com/apache/flink/pull/8309#discussion_r281001575
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java
 ##
 @@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.api.common.InputDependencyConstraint;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.scheduler.DeploymentOption;
+import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
+import org.apache.flink.runtime.scheduler.SchedulerOperations;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link SchedulingStrategy} instance for batch job which schedule vertices 
when input data are ready.
+ */
+public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy {
+
+   private final SchedulerOperations schedulerOperations;
+
+   private final SchedulingTopology schedulingTopology;
+
+   private final DeploymentOption deploymentOption = new 
DeploymentOption(true);
+
+   private final JobGraph jobGraph;
+
+   private final Map 
intermediateDataSets;
+
+   public LazyFromSourcesSchedulingStrategy(
+   SchedulerOperations schedulerOperations,
+   SchedulingTopology schedulingTopology,
+   JobGraph jobGraph) {
+   this.schedulerOperations = checkNotNull(schedulerOperations);
+   this.schedulingTopology = checkNotNull(schedulingTopology);
+   this.jobGraph = checkNotNull(jobGraph);
+   this.intermediateDataSets = new HashMap<>();
+   }
+
+   @Override
+   public void startScheduling() {
+   List 
executionVertexDeploymentOptions = new LinkedList<>();
+   for (SchedulingVertex schedulingVertex : 
schedulingTopology.getVertices()) {
+   if 
(jobGraph.findVertexByID(schedulingVertex.getJobVertexId()).isInputVertex()) {
+   // schedule vertices without consumed result 
partition
+   executionVertexDeploymentOptions.add(
+   new 
ExecutionVertexDeploymentOption(schedulingVertex.getId(), deploymentOption));
+   }
+
+   Collection partitions = 
schedulingVertex.getProducedResultPartitions();
+   if (partitions != null) {
+   for (SchedulingResultPartition srp : 
partitions) {
+   SchedulingIntermediateDataSet sid = 
intermediateDataSets.computeIfAbsent(srp.getResultId(),
+   (key) -> new 
SchedulingIntermediateDataSet());
+   sid.increaseProducerCnt();
+   }
+   }
+   }
+   if (!executionVertexDeploymentOptions.isEmpty()) {
+   
schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions);
+   }
+   }
+
+   @Override
+   public void restartTasks(Set verticesNeedingRestart) 
{
+   List 
executionVertexDeploymentOptions = new 
ArrayList<>(verticesNeedingRestart.size());
+   for (ExecutionVertexID executionVertexID : 
verticesNeedingRestart) {
+   executionVertexDeploymentOptions.add(
+   new 
ExecutionVertexDeploymentOption(executionVertexID, deploymentOption));

[GitHub] [flink] flinkbot commented on issue #8342: [FLINK-12360][chinese-translation]Translate Jobs and Scheduling Page …

2019-05-04 Thread GitBox
flinkbot commented on issue #8342: [FLINK-12360][chinese-translation]Translate 
Jobs and Scheduling Page …
URL: https://github.com/apache/flink/pull/8342#issuecomment-489383011
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-12360) Translate "Jobs and Scheduling" Page into Chinese

2019-05-04 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12360?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-12360:
---
Labels: pull-request-available  (was: )

> Translate "Jobs and Scheduling" Page into Chinese
> -
>
> Key: FLINK-12360
> URL: https://issues.apache.org/jira/browse/FLINK-12360
> Project: Flink
>  Issue Type: Task
>  Components: chinese-translation, Documentation
>Reporter: Armstrong Nova
>Assignee: Armstrong Nova
>Priority: Major
>  Labels: pull-request-available
>
> Translate the internal page 
> "[https://ci.apache.org/projects/flink/flink-docs-master/internals/job_scheduling.html];
>  to Chinese 
> the doc locates in  "flink/docs/internals/job_scheduling.md", the translated 
> doc in "flink/docs/internals/job_scheduling.zh.md"
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] Armstrongya opened a new pull request #8342: [FLINK-12360][chinese-translation]Translate Jobs and Scheduling Page …

2019-05-04 Thread GitBox
Armstrongya opened a new pull request #8342: 
[FLINK-12360][chinese-translation]Translate Jobs and Scheduling Page …
URL: https://github.com/apache/flink/pull/8342
 
 
   Translate the internal page 
"https://ci.apache.org/projects/flink/flink-docs-master/internals/job_scheduling.html;
 to Chinese 
   
   
   
   ## What is the purpose of the change
   This pull request completes the Chinese translation of "job_scheduling" from 
official documents.
   
   
   
   ## Brief change log
- *translates internal document "job_scheduling" into Chinese*
   
   
   ## Verifying this change
   
   - *This change is to add a new translated document.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #8233: [FLINK-12227] [runtime] introduce SchedulingStrategy interface

2019-05-04 Thread GitBox
zhuzhurk commented on a change in pull request #8233: [FLINK-12227] [runtime] 
introduce SchedulingStrategy interface
URL: https://github.com/apache/flink/pull/8233#discussion_r281001358
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexDeploymentOption.java
 ##
 @@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+
+/**
+ * Component that stores the task need to be scheduled and the option for 
deployment.
+ */
+public class ExecutionVertexDeploymentOption {
+
+   private final ExecutionVertexID executionVertexId;
+
+   private final DeploymentOption deploymentOption;
+
+   public ExecutionVertexDeploymentOption(ExecutionVertexID 
executionVertexId, DeploymentOption deploymentOption) {
 
 Review comment:
   Sorry I made a mistake here. I was bit confused of DeploymentOption and 
ExecutionVertexDeploymentOption due to the similar name. Now I understand they 
actually are different components. ExecutionVertexDeploymentOption is a 
deployment descriptor and the DeploymentOption is a wrapper of the options.
   
   Just wonder whether we can have different name patterns for them, like 
ExecutionVertexDeploymentDescriptor and DeploymentOption?
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dianfu commented on issue #8311: [FLINK-10976][table] Add Aggregate operator to Table API

2019-05-04 Thread GitBox
dianfu commented on issue #8311: [FLINK-10976][table] Add Aggregate operator to 
Table API
URL: https://github.com/apache/flink/pull/8311#issuecomment-489382199
 
 
   @hequn8128 Thanks a lot for the PR. LGTM overall with just a few comments.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dianfu commented on a change in pull request #8311: [FLINK-10976][table] Add Aggregate operator to Table API

2019-05-04 Thread GitBox
dianfu commented on a change in pull request #8311: [FLINK-10976][table] Add 
Aggregate operator to Table API
URL: https://github.com/apache/flink/pull/8311#discussion_r281000530
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/operations/OperationTreeBuilder.scala
 ##
 @@ -159,6 +159,66 @@ class OperationTreeBuilder(private val tableEnv: 
TableEnvImpl) {
 aggregateOperationFactory.createAggregate(resolvedGroupings, 
resolvedAggregates, child)
   }
 
+  def rowBasedAggregate(
+groupingExpressions: JList[Expression],
 
 Review comment:
   indentation


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dianfu commented on a change in pull request #8311: [FLINK-10976][table] Add Aggregate operator to Table API

2019-05-04 Thread GitBox
dianfu commented on a change in pull request #8311: [FLINK-10976][table] Add 
Aggregate operator to Table API
URL: https://github.com/apache/flink/pull/8311#discussion_r281000842
 
 

 ##
 File path: docs/dev/table/tableApi.md
 ##
 @@ -1889,6 +1889,57 @@ tableEnv.registerFunction("func", func);
 
 Table table = input
   .flatMap("func(c)").as("a, b")
+{% endhighlight %}
+  
+
+
+
+  
+Aggregate
+Batch Streaming
+  
+  
+Performs an aggregate operation with an aggregate function. You 
have to close the "aggregate" with a select statement. The output of aggregate 
will be flattened if the output type is a composite type.
+{% highlight java %}
+public class MyMinMaxAcc {
+public int min = 0;
+public int max = 0;
+}
+
+public class MyMinMax extends AggregateFunction {
+
+public void accumulate(MyMinMaxAcc acc, int value) {
+if (value < acc.min) {
+acc.min = value;
+}
+if (value > acc.max) {
+acc.max = value;
+}
+}
+
+@Override
+public MyMinMaxAcc createAccumulator() {
+return new MyMinMaxAcc();
+}
+
+@Override
+public Row getValue(MyMinMaxAcc acc) {
+return Row.of(acc.min, acc.max);
+}
+
+@Override
+public TypeInformation getResultType() {
+return new RowTypeInfo(Types.INT, Types.INT);
+}
+}
+
+AggregateFunction myAggFunc = new MyMinMax();
+
+tableEnv.registerFunction("myAggFunc", myAggFunc);
+Table table = input
+  .groupBy("key")
+  .aggregate("myAggFunc(a, b) as (x, y, z)")
 
 Review comment:
   as (x, y)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dianfu commented on a change in pull request #8311: [FLINK-10976][table] Add Aggregate operator to Table API

2019-05-04 Thread GitBox
dianfu commented on a change in pull request #8311: [FLINK-10976][table] Add 
Aggregate operator to Table API
URL: https://github.com/apache/flink/pull/8311#discussion_r281001161
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/validation/AggregateValidationTest.scala
 ##
 @@ -47,4 +47,54 @@ class AggregateValidationTest extends TableTestBase {
   // must fail. 'c is not a grouping key or aggregation
   .select('c)
   }
+
+  @Test(expected = classOf[ValidationException])
+  def testTableFunctionInSelection(): Unit = {
 
 Review comment:
   This test failed with the exception:
   org.apache.flink.table.api.ValidationException: Given parameters of function 
'func' do not match any signature. 
   Actual: (java.lang.Long) 
   Expected: (java.lang.String)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dianfu commented on a change in pull request #8311: [FLINK-10976][table] Add Aggregate operator to Table API

2019-05-04 Thread GitBox
dianfu commented on a change in pull request #8311: [FLINK-10976][table] Add 
Aggregate operator to Table API
URL: https://github.com/apache/flink/pull/8311#discussion_r281000705
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/operations/OperationTreeBuilder.scala
 ##
 @@ -159,6 +159,66 @@ class OperationTreeBuilder(private val tableEnv: 
TableEnvImpl) {
 aggregateOperationFactory.createAggregate(resolvedGroupings, 
resolvedAggregates, child)
   }
 
+  def rowBasedAggregate(
+groupingExpressions: JList[Expression],
+aggregate: Expression,
+child: TableOperation)
+  : TableOperation = {
+// resolve for java string case, i.e., turn LookupCallExpression to 
CallExpression.
+val resolver = resolverFor(tableCatalog, functionCatalog, child).build
+val resolvedAggregate = resolveSingleExpression(aggregate, resolver)
+
+// extract alias and aggregate function
+var alias: Seq[String] = Seq()
+val aggWithoutAlias = resolvedAggregate match {
+  case c: CallExpression
+if c.getFunctionDefinition.getName == 
BuiltInFunctionDefinitions.AS.getName => {
+alias = c.getChildren
+  .drop(1)
+  .map(e => 
e.asInstanceOf[ValueLiteralExpression].getValue.asInstanceOf[String])
+c.getChildren.get(0)
+  }
+  case c: CallExpression
+if c.getFunctionDefinition.isInstanceOf[AggregateFunctionDefinition] 
=> {
+if (alias.isEmpty) alias = UserDefinedFunctionUtils.getFieldInfo(
+  
c.getFunctionDefinition.asInstanceOf[AggregateFunctionDefinition].getResultTypeInfo)._1
+c
+  }
+  case e => e
 
 Review comment:
   this line can be removed


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dianfu commented on a change in pull request #8311: [FLINK-10976][table] Add Aggregate operator to Table API

2019-05-04 Thread GitBox
dianfu commented on a change in pull request #8311: [FLINK-10976][table] Add 
Aggregate operator to Table API
URL: https://github.com/apache/flink/pull/8311#discussion_r281000442
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/tableImpl.scala
 ##
 @@ -580,3 +596,30 @@ class OverWindowedTableImpl(
 )
   }
 }
+
+/**
+  * The implementation of a [[AggregatedTable]] that has been grouped on a set 
of grouping keys.
 
 Review comment:
   a -> an
   that has been performed on an aggregate function.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dianfu commented on a change in pull request #8311: [FLINK-10976][table] Add Aggregate operator to Table API

2019-05-04 Thread GitBox
dianfu commented on a change in pull request #8311: [FLINK-10976][table] Add 
Aggregate operator to Table API
URL: https://github.com/apache/flink/pull/8311#discussion_r281000507
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/tableImpl.scala
 ##
 @@ -580,3 +596,30 @@ class OverWindowedTableImpl(
 )
   }
 }
+
+/**
+  * The implementation of a [[AggregatedTable]] that has been grouped on a set 
of grouping keys.
+  */
+class AggregatedTableImpl(
+  private[flink] val table: Table,
 
 Review comment:
   indentation


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dianfu commented on a change in pull request #8311: [FLINK-10976][table] Add Aggregate operator to Table API

2019-05-04 Thread GitBox
dianfu commented on a change in pull request #8311: [FLINK-10976][table] Add 
Aggregate operator to Table API
URL: https://github.com/apache/flink/pull/8311#discussion_r281000356
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java
 ##
 @@ -1058,4 +1058,39 @@
 * 
 */
Table flatMap(Expression tableFunction);
+
+   /**
+* Performs a global aggregate operation with an aggregate function. 
Use this before a selection
+* to perform the selection operation. The output will be flattened if 
the output type is a
+* composite type.
+*
+* Example:
+*
+* 
+* {@code
+*   AggregateFunction aggFunc = new MyAggregateFunction()
+*   tableEnv.registerFunction("aggFunc", aggFunc);
+*   table.aggregate("aggFunc(a, b) as (f0, f1, f2)")
+* .select("key, f0, f1")
 
 Review comment:
   select("f0, f1")?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dianfu commented on a change in pull request #8311: [FLINK-10976][table] Add Aggregate operator to Table API

2019-05-04 Thread GitBox
dianfu commented on a change in pull request #8311: [FLINK-10976][table] Add 
Aggregate operator to Table API
URL: https://github.com/apache/flink/pull/8311#discussion_r281000946
 
 

 ##
 File path: docs/dev/table/tableApi.md
 ##
 @@ -1957,6 +2008,52 @@ class MyFlatMapFunction extends TableFunction[Row] {
 val func = new MyFlatMapFunction
 val table = input
   .flatMap(func('c)).as('a, 'b)
+{% endhighlight %}
+  
+
+
+
+  
+Aggregate
+Batch Streaming
+  
+  
+Performs an aggregate operation with an aggregate function. You 
have to close the "aggregate" with a select statement. The output of aggregate 
will be flattened if the output type is a composite type.
+{% highlight scala %}
+case class MyMinMaxAcc(var min: Int, var max: Int)
+
+class MyMinMax extends AggregateFunction[Row, MyMinMaxAcc] {
+
+  def accumulate(acc: MyMinMaxAcc, value: Int): Unit = {
+if (value < acc.min) {
+  acc.min = value
+}
+if (value > acc.max) {
+  acc.max = value
+}
+  }
+
+  def resetAccumulator(acc: MyMinMaxAcc): Unit = {
 
 Review comment:
   The Scala example has resetAccumulator, while the Java example has not. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dianfu commented on a change in pull request #8311: [FLINK-10976][table] Add Aggregate operator to Table API

2019-05-04 Thread GitBox
dianfu commented on a change in pull request #8311: [FLINK-10976][table] Add 
Aggregate operator to Table API
URL: https://github.com/apache/flink/pull/8311#discussion_r281000689
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/operations/OperationTreeBuilder.scala
 ##
 @@ -159,6 +159,66 @@ class OperationTreeBuilder(private val tableEnv: 
TableEnvImpl) {
 aggregateOperationFactory.createAggregate(resolvedGroupings, 
resolvedAggregates, child)
   }
 
+  def rowBasedAggregate(
+groupingExpressions: JList[Expression],
+aggregate: Expression,
+child: TableOperation)
+  : TableOperation = {
+// resolve for java string case, i.e., turn LookupCallExpression to 
CallExpression.
+val resolver = resolverFor(tableCatalog, functionCatalog, child).build
+val resolvedAggregate = resolveSingleExpression(aggregate, resolver)
+
+// extract alias and aggregate function
+var alias: Seq[String] = Seq()
+val aggWithoutAlias = resolvedAggregate match {
+  case c: CallExpression
+if c.getFunctionDefinition.getName == 
BuiltInFunctionDefinitions.AS.getName => {
 
 Review comment:
   the brace is not necessary.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dianfu commented on a change in pull request #8311: [FLINK-10976][table] Add Aggregate operator to Table API

2019-05-04 Thread GitBox
dianfu commented on a change in pull request #8311: [FLINK-10976][table] Add 
Aggregate operator to Table API
URL: https://github.com/apache/flink/pull/8311#discussion_r281001183
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/validation/AggregateValidationTest.scala
 ##
 @@ -47,4 +47,54 @@ class AggregateValidationTest extends TableTestBase {
   // must fail. 'c is not a grouping key or aggregation
   .select('c)
   }
+
+  @Test(expected = classOf[ValidationException])
+  def testTableFunctionInSelection(): Unit = {
+val util = streamTestUtil()
+val table = util.addTable[(Long, Int, String)]('a, 'b, 'c)
+
+util.tableEnv.registerFunction("func", new TableFunc0)
+table
+  .groupBy('a)
+  .aggregate('b.sum as 'd)
+  // must fail. Cannot use TableFunction in select after aggregate
+  .select("func(a)")
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidExpressionInAggregate(): Unit = {
+val util = streamTestUtil()
+val table = util.addTable[(Long, Int, String)]('a, 'b, 'c)
+
+table
+  .groupBy('a)
+  // must fail. Only AggregateFunction can be used in aggregate
 
 Review comment:
   The exception message is not friendly for users:
   org.apache.flink.table.api.ValidationException: Invalid arguments [log(b), 
'd'] for function: as


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dianfu commented on a change in pull request #8311: [FLINK-10976][table] Add Aggregate operator to Table API

2019-05-04 Thread GitBox
dianfu commented on a change in pull request #8311: [FLINK-10976][table] Add 
Aggregate operator to Table API
URL: https://github.com/apache/flink/pull/8311#discussion_r281001232
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/validation/AggregateValidationTest.scala
 ##
 @@ -47,4 +47,54 @@ class AggregateValidationTest extends TableTestBase {
   // must fail. 'c is not a grouping key or aggregation
   .select('c)
   }
+
+  @Test(expected = classOf[ValidationException])
+  def testTableFunctionInSelection(): Unit = {
+val util = streamTestUtil()
+val table = util.addTable[(Long, Int, String)]('a, 'b, 'c)
+
+util.tableEnv.registerFunction("func", new TableFunc0)
+table
+  .groupBy('a)
+  .aggregate('b.sum as 'd)
+  // must fail. Cannot use TableFunction in select after aggregate
+  .select("func(a)")
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidExpressionInAggregate(): Unit = {
+val util = streamTestUtil()
+val table = util.addTable[(Long, Int, String)]('a, 'b, 'c)
+
+table
+  .groupBy('a)
+  // must fail. Only AggregateFunction can be used in aggregate
+  .aggregate('b.log as 'd)
+  .select('a, 'd)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidExpressionInAggregate2(): Unit = {
+val util = streamTestUtil()
+val table = util.addTable[(Long, Int, String)]('a, 'b, 'c)
+
+util.tableEnv.registerFunction("func", new TableFunc0)
+table
+  .groupBy('a)
+  // must fail. Only AggregateFunction can be used in aggregate
+  .aggregate("func(c) as d")
+  .select('a, 'd)
+  }
+
+  @Test(expected = classOf[ExpressionParserException])
+  def testMultipleAggregateExpressionInAggregate(): Unit = {
+val util = streamTestUtil()
+val table = util.addTable[(Long, Int, String)]('a, 'b, 'c)
+
+util.tableEnv.registerFunction("func", new TableFunc0)
+table
+  .groupBy('a)
+  // must fail. Only AggregateFunction can be used in aggregate
 
 Review comment:
   Only AggregateFunction -> Only one AggregateFunction 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dianfu commented on a change in pull request #8311: [FLINK-10976][table] Add Aggregate operator to Table API

2019-05-04 Thread GitBox
dianfu commented on a change in pull request #8311: [FLINK-10976][table] Add 
Aggregate operator to Table API
URL: https://github.com/apache/flink/pull/8311#discussion_r281000677
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/operations/OperationTreeBuilder.scala
 ##
 @@ -159,6 +159,66 @@ class OperationTreeBuilder(private val tableEnv: 
TableEnvImpl) {
 aggregateOperationFactory.createAggregate(resolvedGroupings, 
resolvedAggregates, child)
   }
 
+  def rowBasedAggregate(
+groupingExpressions: JList[Expression],
+aggregate: Expression,
+child: TableOperation)
+  : TableOperation = {
+// resolve for java string case, i.e., turn LookupCallExpression to 
CallExpression.
+val resolver = resolverFor(tableCatalog, functionCatalog, child).build
+val resolvedAggregate = resolveSingleExpression(aggregate, resolver)
 
 Review comment:
   use resolveExpression?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] klion26 commented on a change in pull request #8319: [FLINK-11636] [docs-zh] Translate "State Schema Evolution" into Chinese

2019-05-04 Thread GitBox
klion26 commented on a change in pull request #8319: [FLINK-11636] [docs-zh] 
Translate "State Schema Evolution" into Chinese
URL: https://github.com/apache/flink/pull/8319#discussion_r281000916
 
 

 ##
 File path: docs/dev/stream/state/schema_evolution.zh.md
 ##
 @@ -26,11 +26,11 @@ under the License.
 {:toc}
 
 Apache Flink 流应用通常被设计为永远或者长时间运行。
-与所有长期运行的服务一样,需要更新应用程序以适应不断变化的需求。
-对于应用程序处理的数据结构来说,这是相同的;它们也随应用程序一起升级。
+与所有长期运行的服务一样,应用程序需要随着业务的迭代而进行调整。
+应用所处理的数据 schema 也会随着进行变化。
 
-此页面概述了如何升级状态类型的数据结构。
-目前的限制因不同类型和状态结构而异(`ValueState`、`ListState` 等)。
+此页面概述了如何升级状态类型的数据 schema 。
+目前对不同类系的状态结构(`ValueState`、`ListState` 等)有不同的限制
 
 Review comment:
   `类系` --> `类型`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] klion26 commented on a change in pull request #8319: [FLINK-11636] [docs-zh] Translate "State Schema Evolution" into Chinese

2019-05-04 Thread GitBox
klion26 commented on a change in pull request #8319: [FLINK-11636] [docs-zh] 
Translate "State Schema Evolution" into Chinese
URL: https://github.com/apache/flink/pull/8319#discussion_r281000985
 
 

 ##
 File path: docs/dev/stream/state/schema_evolution.zh.md
 ##
 @@ -80,19 +80,19 @@ Flink 内部是这样来进行处理的,首先会检查新的序列化器相
 
 Flink 基于下面的规则来支持 [POJO 类型]({{ site.baseurl 
}}/zh/dev/types_serialization.html#pojo-类型的规则)结构的升级:
 
- 1. 不可以删除字段。一旦删除,被删除字段的前值将会在将来的 checkpoints 以及 savepoints 中删除。
+ 1. 可以删除字段。一旦删除,被删除字段的前值将会在将来的 checkpoints 以及 savepoints 中删除。
  2. 可以添加字段。新字段会使用类型对应的默认值进行初始化,比如 [Java 
类型](https://docs.oracle.com/javase/tutorial/java/nutsandbolts/datatypes.html)。  
 
  3. 不可以修改字段的声明类型。
  4. 不可以改变 POJO 类型的类名,包括类的命名空间。
 
-需要注意,只有对使用1.8.0及以上版本的 Flink savepoint 进行恢复时,POJO 类型的状态才可以进行升级。
-对1.8.0版本之前的 Flink 是没有办法进行 POJO 类型升级的。
+需要注意,只有从 1.8.0 及以上版本的 Flink 生产的 savepoint 进行恢复时,POJO 类型的状态才可以进行升级。
+对 1.8.0 版本之前的 Flink 是没有办法进行 POJO 类型升级的。
 
 ### Avro 类型
 
 Flink 完全支持 Avro 状态类型的升级,只要数据结构的修改是被
 [Avro 
的数据结构解析规则](http://avro.apache.org/docs/current/spec.html#Schema+Resolution)认为兼容的即可。
 
-一个限制是,当作业恢复时,Avro 生成的用作状态类型的类无法重定位或具有不同的命名空间。
+如果新的 Avro 数据 schema 生产的类无法被重定位或者使用了不同的命名空间,状态是没办法在作业恢复时进行升级的。
 
 Review comment:
   `状态是没办法在作业恢复时进行升级的。`  seems like spoken language.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] Leeviiii commented on a change in pull request #8319: [FLINK-11636] [docs-zh] Translate "State Schema Evolution" into Chinese

2019-05-04 Thread GitBox
Leev commented on a change in pull request #8319: [FLINK-11636] [docs-zh] 
Translate "State Schema Evolution" into Chinese
URL: https://github.com/apache/flink/pull/8319#discussion_r281000793
 
 

 ##
 File path: docs/dev/stream/state/schema_evolution.zh.md
 ##
 @@ -48,62 +47,52 @@ checkpointedState = 
getRuntimeContext().getListState(descriptor);
 {% endhighlight %}
 
 
-Under the hood, whether or not the schema of state can be evolved depends on 
the serializer used to read / write
-persisted state bytes. Simply put, a registered state's schema can only be 
evolved if its serializer properly
-supports it. This is handled transparently by serializers generated by Flink's 
type serialization framework
-(current scope of support is listed [below]({{ site.baseurl 
}}/dev/stream/state/schema_evolution.html#supported-data-types-for-schema-evolution)).
+在内部,状态是否可以进行升级取决于用于读写持久化状态字节的序列化器。
+简而言之,状态数据结构只有在其序列化器正确支持时才能升级。
+这一过程是被 Flink 的类型序列化框架生成的序列化器透明处理的([下面]({{ site.baseurl 
}}/zh/dev/stream/state/schema_evolution.html#数据结构升级支持的数据类型) 列出了当前的支持范围)。
 
-If you intend to implement a custom `TypeSerializer` for your state type and 
would like to learn how to implement
-the serializer to support state schema evolution, please refer to
-[Custom State Serialization]({{ site.baseurl 
}}/dev/stream/state/custom_serialization.html).
-The documentation there also covers necessary internal details about the 
interplay between state serializers and Flink's
-state backends to support state schema evolution.
+如果你想要为你的状态类型实现自定义的 `TypeSerializer` 并且想要学习如何实现支持状态数据结构升级的序列化器,
+可以参考 [自定义状态序列化器]({{ site.baseurl 
}}/zh/dev/stream/state/custom_serialization.html)。
+本文档也包含一些用于支持状态数据结构升级的状态序列化器与 Flink 状态后端存储相互作用的必要内部细节。
 
-## Evolving state schema
+## 升级状态数据结构
 
-To evolve the schema of a given state type, you would take the following steps:
+为了对给定的状态类型进行升级,你需要采取以下几个步骤:
 
- 1. Take a savepoint of your Flink streaming job.
- 2. Update state types in your application (e.g., modifying your Avro type 
schema).
- 3. Restore the job from the savepoint. When accessing state for the first 
time, Flink will assess whether or not
- the schema had been changed for the state, and migrate state schema if 
necessary.
+ 1. 对 Flink 流作业进行 savepoint 操作。
+ 2. 升级程序中的状态类型(例如:修改你的 Avro 结构)。
+ 3. 从 savepoint 处重启作业。当第一次访问状态数据时,Flink 会评估状态数据结构是否已经改变,并在必要的时候进行状态结构迁移。
 
-The process of migrating state to adapt to changed schemas happens 
automatically, and independently for each state.
-This process is performed internally by Flink by first checking if the new 
serializer for the state has different
-serialization schema than the previous serializer; if so, the previous 
serializer is used to read the state to objects,
-and written back to bytes again with the new serializer.
+用来适应状态结构的改变而进行的状态迁移过程是自动发生的,并且状态之间是互相独立的。
+Flink 内部是这样来进行处理的,首先会检查新的序列化器相对比之前的序列化器是否有不同的状态结构;如果有,
+那么之前的序列化器用来读取状态数据字节到对象,然后使用新的序列化器将对象回写为字节。
 
-Further details about the migration process is out of the scope of this 
documentation; please refer to
-[here]({{ site.baseurl }}/dev/stream/state/custom_serialization.html).
+更多的迁移过程细节不在本文档谈论的范围;可以参考[文档]({{ site.baseurl 
}}/zh/dev/stream/state/custom_serialization.html)。
 
-## Supported data types for schema evolution
+## 数据结构升级支持的数据类型
 
-Currently, schema evolution is supported only for POJO and Avro types. 
Therefore, if you care about schema evolution for
-state, it is currently recommended to always use either Pojo or Avro for state 
data types.
+目前,仅对于 POJO 以及 Avro 类型支持数据结构升级。
+因此,如果你比较关注于状态数据结构的升级,那么目前来看强烈推荐使用 Pojo 或者 Avro 状态数据类型。
 
-There are plans to extend the support for more composite types; for more 
details,
-please refer to 
[FLINK-10896](https://issues.apache.org/jira/browse/FLINK-10896).
+我们有计划支持更多的复合类型;更多的细节可以参考 
[FLINK-10896](https://issues.apache.org/jira/browse/FLINK-10896)。
 
-### POJO types
+### POJO 类型
 
-Flink supports evolving schema of [POJO types]({{ site.baseurl 
}}/dev/types_serialization.html#rules-for-pojo-types),
-based on the following set of rules:
+Flink 基于下面的规则来支持 [POJO 类型]({{ site.baseurl 
}}/zh/dev/types_serialization.html#pojo-类型的规则)结构的升级:
 
- 1. Fields can be removed. Once removed, the previous value for the removed 
field will be dropped in future checkpoints and savepoints.
- 2. New fields can be added. The new field will be initialized to the default 
value for its type, as
-[defined by 
Java](https://docs.oracle.com/javase/tutorial/java/nutsandbolts/datatypes.html).
- 3. Declared fields types cannot change.
- 4. Class name of the POJO type cannot change, including the namespace of the 
class.
+ 1. 不可以删除字段。一旦删除,被删除字段的前值将会在将来的 checkpoints 以及 savepoints 中删除。
+ 2. 可以添加字段。新字段会使用类型对应的默认值进行初始化,比如 [Java 
类型](https://docs.oracle.com/javase/tutorial/java/nutsandbolts/datatypes.html)。  
 
+ 3. 不可以修改字段的声明类型。
+ 4. 不可以改变 POJO 类型的类名,包括类的命名空间。
 
-Note that the schema of POJO type state can only be evolved when restoring 
from a previous savepoint with Flink versions
-newer than 1.8.0. When 

[jira] [Comment Edited] (FLINK-12400) NullpointerException using SimpleStringSchema with Kafka

2019-05-04 Thread Congxian Qiu(klion26) (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12400?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16833187#comment-16833187
 ] 

Congxian Qiu(klion26) edited comment on FLINK-12400 at 5/5/19 1:52 AM:
---

[~PierreZ] thanks for filing this issue, I think there is already an issue want 
to fix this, please have a look at 
https://issues.apache.org/jira/browse/FLINK-11820

And a pr for FLINK-11820  [https://github.com/apache/flink/pull/7987]


was (Author: klion26):
[~PierreZ] thanks for filing this issue, I think there is already an issue want 
to fix this, please have a look at 
https://issues.apache.org/jira/browse/FLINK-11820

> NullpointerException using SimpleStringSchema with Kafka
> 
>
> Key: FLINK-12400
> URL: https://issues.apache.org/jira/browse/FLINK-12400
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Type Serialization System
>Affects Versions: 1.7.2, 1.8.0
> Environment: Flink 1.7.2 job on 1.8 cluster
> Kafka 0.10 with a topic in log-compaction
>Reporter: Pierre Zemb
>Assignee: Pierre Zemb
>Priority: Minor
>
> Hi!
> Yesterday, we saw a strange behavior with our Flink job and Kafka. We are 
> consuming a Kafka topic setup in 
> [log-compaction|https://kafka.apache.org/documentation/#compaction] mode. As 
> such, sending a message with a null payload acts like a tombstone.
> We are consuming Kafka like this:
> {code:java}
> new FlinkKafkaConsumer010<>  ("topic", new SimpleStringSchema(), 
> this.kafkaProperties)
> {code}
> When we sent the message, job failed because of a NullPointerException 
> [here|https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/serialization/SimpleStringSchema.java#L75].
>  `byte[] message` was null, causing the NPE. 
> We forked the class and added a basic nullable check, returning null if so. 
> It fixed our issue. 
> Should we add it to the main class?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12400) NullpointerException using SimpleStringSchema with Kafka

2019-05-04 Thread Congxian Qiu(klion26) (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12400?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16833187#comment-16833187
 ] 

Congxian Qiu(klion26) commented on FLINK-12400:
---

[~PierreZ] thanks for filing this issue, I think there is already an issue want 
to fix this, please have a look at 
https://issues.apache.org/jira/browse/FLINK-11820

> NullpointerException using SimpleStringSchema with Kafka
> 
>
> Key: FLINK-12400
> URL: https://issues.apache.org/jira/browse/FLINK-12400
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Type Serialization System
>Affects Versions: 1.7.2, 1.8.0
> Environment: Flink 1.7.2 job on 1.8 cluster
> Kafka 0.10 with a topic in log-compaction
>Reporter: Pierre Zemb
>Assignee: Pierre Zemb
>Priority: Minor
>
> Hi!
> Yesterday, we saw a strange behavior with our Flink job and Kafka. We are 
> consuming a Kafka topic setup in 
> [log-compaction|https://kafka.apache.org/documentation/#compaction] mode. As 
> such, sending a message with a null payload acts like a tombstone.
> We are consuming Kafka like this:
> {code:java}
> new FlinkKafkaConsumer010<>  ("topic", new SimpleStringSchema(), 
> this.kafkaProperties)
> {code}
> When we sent the message, job failed because of a NullPointerException 
> [here|https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/serialization/SimpleStringSchema.java#L75].
>  `byte[] message` was null, causing the NPE. 
> We forked the class and added a basic nullable check, returning null if so. 
> It fixed our issue. 
> Should we add it to the main class?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12401) Support incremental emit for non-window streaming FlatAggregate on Table API

2019-05-04 Thread Jark Wu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12401?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16833186#comment-16833186
 ] 

Jark Wu commented on FLINK-12401:
-

Incremental value is ACC? Is this issue going to support local-combine + global 
optimization for flatAggregate?

> Support incremental emit for non-window streaming FlatAggregate on Table API
> 
>
> Key: FLINK-12401
> URL: https://issues.apache.org/jira/browse/FLINK-12401
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>
> As described in 
> [Flip-29|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=97552739],
>  there are two output modes for non-window streaming flatAggregate. One is 
> emitting with full values, the other is emitting with incremental values. 
> [FLINK-10977|https://issues.apache.org/jira/browse/FLINK-10977] supports the 
> former one, this jira is going to support the latter one. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] Leeviiii commented on a change in pull request #8319: [FLINK-11636] [docs-zh] Translate "State Schema Evolution" into Chinese

2019-05-04 Thread GitBox
Leev commented on a change in pull request #8319: [FLINK-11636] [docs-zh] 
Translate "State Schema Evolution" into Chinese
URL: https://github.com/apache/flink/pull/8319#discussion_r281000414
 
 

 ##
 File path: docs/dev/stream/state/schema_evolution.zh.md
 ##
 @@ -25,17 +25,16 @@ under the License.
 * ToC
 {:toc}
 
-Apache Flink streaming applications are typically designed to run indefinitely 
or for long periods of time.
-As with all long-running services, the applications need to be updated to 
adapt to changing requirements.
-This goes the same for data schemas that the applications work against; they 
evolve along with the application.
+Apache Flink 流应用通常被设计为永远或者长时间运行。
+与所有长期运行的服务一样,需要更新应用程序以适应不断变化的需求。
 
 Review comment:
   ok thanks your review


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-12401) Support incremental emit for non-window streaming FlatAggregate on Table API

2019-05-04 Thread Hequn Cheng (JIRA)
Hequn Cheng created FLINK-12401:
---

 Summary: Support incremental emit for non-window streaming 
FlatAggregate on Table API
 Key: FLINK-12401
 URL: https://issues.apache.org/jira/browse/FLINK-12401
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / API
Reporter: Hequn Cheng
Assignee: Hequn Cheng


As described in 
[Flip-29|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=97552739],
 there are two output modes for non-window streaming flatAggregate. One is 
emitting with full values, the other is emitting with incremental values. 
[FLINK-10977|https://issues.apache.org/jira/browse/FLINK-10977] supports the 
former one, this jira is going to support the latter one. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] sunjincheng121 commented on a change in pull request #8267: [FLINK-12311][table][python] Add base python framework and Add Scan, Projection, and Filter operator support

2019-05-04 Thread GitBox
sunjincheng121 commented on a change in pull request #8267: 
[FLINK-12311][table][python] Add base python framework and Add Scan, 
Projection, and Filter operator support
URL: https://github.com/apache/flink/pull/8267#discussion_r281000307
 
 

 ##
 File path: tools/verify_scala_suffixes.sh
 ##
 @@ -84,8 +84,9 @@ block_infected=0
 # a) are not deployed during a release
 # b) exist only for dev purposes
 # c) no-one should depend on them
+# exclude flink-python because there are 2 flink-python module currently, 
current logic goes wrong on this situation
 e2e_modules=$(find flink-end-to-end-tests -mindepth 2 -maxdepth 5 -name 
'pom.xml' -printf '%h\n' | sort -u | tr '\n' ',')
-excluded_modules=\!${e2e_modules//,/,\!},!flink-docs
+excluded_modules=\!${e2e_modules//,/,\!},!flink-docs,!flink-python,!flink-libraries/flink-python
 
 Review comment:
   Good points!


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #8267: [FLINK-12311][table][python] Add base python framework and Add Scan, Projection, and Filter operator support

2019-05-04 Thread GitBox
sunjincheng121 commented on a change in pull request #8267: 
[FLINK-12311][table][python] Add base python framework and Add Scan, 
Projection, and Filter operator support
URL: https://github.com/apache/flink/pull/8267#discussion_r281000283
 
 

 ##
 File path: flink-python/pyflink/table/table.py
 ##
 @@ -0,0 +1,117 @@
+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from py4j.java_gateway import get_method
+
+__all__ = ['Table']
+
+
+class Table(object):
+
+"""
+A :class:`Table` is the core component of the Table API.
+Similar to how the batch and streaming APIs have DataSet and DataStream,
+the Table API is built around :class:`Table`.
+
+Use the methods of :class:`Table` to transform data.
+
+Example:
+::
+>>> t_config = 
TableConfig.Builder().as_streaming_execution().set_parallelism(1).build()
+>>> t_env = TableEnvironment.get_table_environment(t_config)
+>>> ...
+>>> t = t_env.scan("source")
+>>> t.select(...)
+...
+>>> t.insert_into("print")
+>>> t_env.execute()
+
+Operations such as :func:`~pyflink.table.Table.join`, 
:func:`~pyflink.table.Table.select`,
+:func:`~pyflink.table.Table.where` and 
:func:`~pyflink.table.Table.group_by`
+take arguments in an expression string. Please refer to the documentation 
for
+the expression syntax.
+"""
+
+def __init__(self, j_table):
+self._j_table = j_table
+
+def select(self, fields):
+"""
+Performs a selection operation. Similar to a SQL SELECT statement. The 
field expressions
+can contain complex expressions.
+
+Example:
+::
+>>> t = tab.select("key, value + 'hello'")
 
 Review comment:
   Why I suggest remove the t is because we do not use the t in the demo. See 
Scala version: 
https://github.com/apache/flink/blob/9975e0393a9e09dbde21ca61f1a5751cc5934411/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java#L114


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #8267: [FLINK-12311][table][python] Add base python framework and Add Scan, Projection, and Filter operator support

2019-05-04 Thread GitBox
sunjincheng121 commented on a change in pull request #8267: 
[FLINK-12311][table][python] Add base python framework and Add Scan, 
Projection, and Filter operator support
URL: https://github.com/apache/flink/pull/8267#discussion_r281000156
 
 

 ##
 File path: flink-python/pyflink/find_flink_home.py
 ##
 @@ -0,0 +1,44 @@
+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+
+from __future__ import print_function
+import os
+import sys
+
+
+def _find_flink_home():
+"""
+Find the FLINK_HOME.
 
 Review comment:
   Yes, add message to log system, is better!


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on issue #8230: [FLINK-10977][table] Add streaming non-window FlatAggregate to Table API

2019-05-04 Thread GitBox
sunjincheng121 commented on issue #8230: [FLINK-10977][table] Add streaming 
non-window FlatAggregate to Table API
URL: https://github.com/apache/flink/pull/8230#issuecomment-489378092
 
 
   Thanks for your update!
   Then changes look good to me now!
   
   And one more thing is in current PR without the implementation of 
incremental mode, can you create a JIRA for incremental mode?
   
   Best,Jincheng


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9679) Implement ConfluentRegistryAvroSerializationSchema

2019-05-04 Thread Bowen Li (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16833108#comment-16833108
 ] 

Bowen Li commented on FLINK-9679:
-

Hi [~dawidwys], what do you think of the relationship between this JIRA and 
[FLINK-12256|https://issues.apache.org/jira/browse/FLINK-12256] ?

> Implement ConfluentRegistryAvroSerializationSchema
> --
>
> Key: FLINK-9679
> URL: https://issues.apache.org/jira/browse/FLINK-9679
> Project: Flink
>  Issue Type: Improvement
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 1.6.0
>Reporter: Yazdan Shirvany
>Assignee: Dominik Wosiński
>Priority: Major
>  Labels: pull-request-available
>
> Implement AvroSerializationSchema using Confluent Schema Registry



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12399) FilterableTableSource does not use filters on job run

2019-05-04 Thread Rong Rong (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12399?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16833107#comment-16833107
 ] 

Rong Rong commented on FLINK-12399:
---

Hi [~josh.bradt]. I think I found the root cause of this issue.

Apparently you have to override the method {{explainSource}} in order to let 
calcite know that the new created TableSource with filter pushedDown is 
different from the original created CustomeTableSource (where you have not 
applyPredicates).
I think this might be related to the #4 changelog point 
https://github.com/apache/flink/pull/8324: when I try upgrading to CALCITE 
1.19.0 I also encounter some weird issues where calcite tries to find the 
correct tablesource from the digest strings. 

I will assigned to myself and start looking into this issue. Please let me know 
if adding the override resolves your issue at this moment.

> FilterableTableSource does not use filters on job run
> -
>
> Key: FLINK-12399
> URL: https://issues.apache.org/jira/browse/FLINK-12399
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.8.0
>Reporter: Josh Bradt
>Priority: Major
> Attachments: flink-filter-bug.tar.gz
>
>
> As discussed [on the mailing 
> list|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Filter-push-down-not-working-for-a-custom-BatchTableSource-tp27654.html],
>  there appears to be a bug where a job that uses a custom 
> FilterableTableSource does not keep the filters that were pushed down into 
> the table source. More specifically, the table source does receive filters 
> via applyPredicates, and a new table source with those filters is returned, 
> but the final job graph appears to use the original table source, which does 
> not contain any filters.
> I attached a minimal example program to this ticket. The custom table source 
> is as follows: 
> {code:java}
> public class CustomTableSource implements BatchTableSource, 
> FilterableTableSource {
> private static final Logger LOG = 
> LoggerFactory.getLogger(CustomTableSource.class);
> private final Filter[] filters;
> private final FilterConverter converter = new FilterConverter();
> public CustomTableSource() {
> this(null);
> }
> private CustomTableSource(Filter[] filters) {
> this.filters = filters;
> }
> @Override
> public DataSet getDataSet(ExecutionEnvironment execEnv) {
> if (filters == null) {
>LOG.info(" No filters defined ");
> } else {
> LOG.info(" Found filters ");
> for (Filter filter : filters) {
> LOG.info("FILTER: {}", filter);
> }
> }
> return execEnv.fromCollection(allModels());
> }
> @Override
> public TableSource applyPredicate(List predicates) {
> LOG.info("Applying predicates");
> List acceptedFilters = new ArrayList<>();
> for (final Expression predicate : predicates) {
> converter.convert(predicate).ifPresent(acceptedFilters::add);
> }
> return new CustomTableSource(acceptedFilters.toArray(new Filter[0]));
> }
> @Override
> public boolean isFilterPushedDown() {
> return filters != null;
> }
> @Override
> public TypeInformation getReturnType() {
> return TypeInformation.of(Model.class);
> }
> @Override
> public TableSchema getTableSchema() {
> return TableSchema.fromTypeInfo(getReturnType());
> }
> private List allModels() {
> List models = new ArrayList<>();
> models.add(new Model(1, 2, 3, 4));
> models.add(new Model(10, 11, 12, 13));
> models.add(new Model(20, 21, 22, 23));
> return models;
> }
> }
> {code}
>  
> When run, it logs
> {noformat}
> 15:24:54,888 INFO  com.klaviyo.filterbug.CustomTableSource
>- Applying predicates
> 15:24:54,901 INFO  com.klaviyo.filterbug.CustomTableSource
>- Applying predicates
> 15:24:54,910 INFO  com.klaviyo.filterbug.CustomTableSource
>- Applying predicates
> 15:24:54,977 INFO  com.klaviyo.filterbug.CustomTableSource
>-  No filters defined {noformat}
> which appears to indicate that although filters are getting pushed down, the 
> final job does not use them.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-12399) FilterableTableSource does not use filters on job run

2019-05-04 Thread Rong Rong (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12399?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rong Rong reassigned FLINK-12399:
-

Assignee: Rong Rong

> FilterableTableSource does not use filters on job run
> -
>
> Key: FLINK-12399
> URL: https://issues.apache.org/jira/browse/FLINK-12399
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.8.0
>Reporter: Josh Bradt
>Assignee: Rong Rong
>Priority: Major
> Attachments: flink-filter-bug.tar.gz
>
>
> As discussed [on the mailing 
> list|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Filter-push-down-not-working-for-a-custom-BatchTableSource-tp27654.html],
>  there appears to be a bug where a job that uses a custom 
> FilterableTableSource does not keep the filters that were pushed down into 
> the table source. More specifically, the table source does receive filters 
> via applyPredicates, and a new table source with those filters is returned, 
> but the final job graph appears to use the original table source, which does 
> not contain any filters.
> I attached a minimal example program to this ticket. The custom table source 
> is as follows: 
> {code:java}
> public class CustomTableSource implements BatchTableSource, 
> FilterableTableSource {
> private static final Logger LOG = 
> LoggerFactory.getLogger(CustomTableSource.class);
> private final Filter[] filters;
> private final FilterConverter converter = new FilterConverter();
> public CustomTableSource() {
> this(null);
> }
> private CustomTableSource(Filter[] filters) {
> this.filters = filters;
> }
> @Override
> public DataSet getDataSet(ExecutionEnvironment execEnv) {
> if (filters == null) {
>LOG.info(" No filters defined ");
> } else {
> LOG.info(" Found filters ");
> for (Filter filter : filters) {
> LOG.info("FILTER: {}", filter);
> }
> }
> return execEnv.fromCollection(allModels());
> }
> @Override
> public TableSource applyPredicate(List predicates) {
> LOG.info("Applying predicates");
> List acceptedFilters = new ArrayList<>();
> for (final Expression predicate : predicates) {
> converter.convert(predicate).ifPresent(acceptedFilters::add);
> }
> return new CustomTableSource(acceptedFilters.toArray(new Filter[0]));
> }
> @Override
> public boolean isFilterPushedDown() {
> return filters != null;
> }
> @Override
> public TypeInformation getReturnType() {
> return TypeInformation.of(Model.class);
> }
> @Override
> public TableSchema getTableSchema() {
> return TableSchema.fromTypeInfo(getReturnType());
> }
> private List allModels() {
> List models = new ArrayList<>();
> models.add(new Model(1, 2, 3, 4));
> models.add(new Model(10, 11, 12, 13));
> models.add(new Model(20, 21, 22, 23));
> return models;
> }
> }
> {code}
>  
> When run, it logs
> {noformat}
> 15:24:54,888 INFO  com.klaviyo.filterbug.CustomTableSource
>- Applying predicates
> 15:24:54,901 INFO  com.klaviyo.filterbug.CustomTableSource
>- Applying predicates
> 15:24:54,910 INFO  com.klaviyo.filterbug.CustomTableSource
>- Applying predicates
> 15:24:54,977 INFO  com.klaviyo.filterbug.CustomTableSource
>-  No filters defined {noformat}
> which appears to indicate that although filters are getting pushed down, the 
> final job does not use them.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-11909) Provide default failure/timeout/backoff handling strategy for AsyncIO functions

2019-05-04 Thread Rong Rong (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11909?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16831421#comment-16831421
 ] 

Rong Rong edited comment on FLINK-11909 at 5/4/19 3:28 PM:
---

I took some time to look at the {{AsyncFunction}} API and the 
{{AsyncWaitOperator}}. My understanding is that:
 when user implements an AsyncFunction, it has full control on how the 
asyncInvoke being executed (including creation of the ExecutorService used, # 
of threads). Thus user can also include any of the retry policies if needed (by 
directly create a retry mechanism)

The limitation of this is: 
 1. Users does not have access to timer service from AsyncFunction
 - On the contrary, {{AsyncWaitOperator}} have access to register / cleanup / 
invoke {{AsyncFunction#timeout}} API which makes it more flexible in terms of 
creating retry policies.

2. Retry within {{AsyncFunction#asyncInvoke}} would mean the retry is 
input-element independent
 - This does not fit the "rate limiting" requirement from the mailing list: I 
am assuming this is more like a retry queue for a specific outbound service, 
not limited to a specific input element. 
 - Or in another way, some sort of token-based rate limit coordination is 
needed across input elements.

3. Retry within {{AsyncFunction}} cannot allow access to 
{{StreamElementQueue}}, which means it cannot alter the ordering of the 
elements being executed
 - This doesn't fit into the description in Shuyi's comment of a DLQ (where 
failed component are put in the back of the queue before it can be retried the 
2nd time)

I have a general idea to extend the {{RuntimeContext}} to create a 
{{RetryContext}} to solve these issues. But I want to also make sure that the 
above concerns I mentioned are valid and should be addressed by Flink. 
otherwise, I think the current AsyncFunction API is good to go for some basic 
retry policies.

Any thoughts? [~till.rohrmann] [~suez1224]


was (Author: walterddr):
I took some time to look at the {{AsyncFunction}} API and the 
{{AsyncWaitOperator}}. My understanding is that:
when user implements an AsyncFunction, it has full control on how the 
asyncInvoke being executed (including creation of the ExecutorService used, # 
of threads). Thus user can also include any of the retry policies if needed (by 
directly create a retry mechanism)

The limitation of this is: 
1. Users does not have access to timer service from AsyncFunction 
  - On the contrary, {{AsyncWaitOperator}} have access to register / cleanup / 
invoke {{AsyncFunction#timeout}} API which makes it more flexible in terms of 
creating retry policies.

2. Retry within {{AsyncFunction#asyncInvoke}} would mean the retry is 
input-element independent 
  - This does not fit the "rate limiting" requirement from the mailing list: I 
am assuming this is more like a retry queue for a specific outbound service, 
not limited to a specific input element.

3. Retry within {{AsyncFunction}} cannot allow access to 
{{StreamElementQueue}}, which means it cannot alter the ordering of the 
elements being executed 
  - This doesn't fit into the description in Shuyi's comment of a DLQ (where 
failed component are put in the back of the queue before it can be retried the 
2nd time)

I have a general idea to extend the {{RuntimeContext}} to create a 
{{RetryContext}} to solve these issues. But I want to also make sure that the 
above concerns I mentioned are valid and should be addressed by Flink. 
otherwise, I think the current AsyncFunction API is good to go for some basic 
retry policies.

Any thoughts? [~till.rohrmann] [~suez1224] 

> Provide default failure/timeout/backoff handling strategy for AsyncIO 
> functions
> ---
>
> Key: FLINK-11909
> URL: https://issues.apache.org/jira/browse/FLINK-11909
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream
>Reporter: Rong Rong
>Assignee: Rong Rong
>Priority: Major
>
> Currently Flink AsyncIO by default fails the entire job when async function 
> invoke fails [1]. It would be nice to have some default Async IO 
> failure/timeout handling strategy, or opens up some APIs for AsyncFunction 
> timeout method to interact with the AsyncWaitOperator. For example (quote 
> [~suez1224] in [2]):
> * FAIL_OPERATOR (default & current behavior)
> * FIX_INTERVAL_RETRY (retry with configurable fixed interval up to N times)
> * EXP_BACKOFF_RETRY (retry with exponential backoff up to N times)
> Discussion also extended to introduce configuration such as: 
> * MAX_RETRY_COUNT
> * RETRY_FAILURE_POLICY
> REF:
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/asyncio.html#timeout-handling
> [2] 
> 

[jira] [Assigned] (FLINK-12351) AsyncWaitOperator should deep copy StreamElement when object reuse is enabled

2019-05-04 Thread Jark Wu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12351?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu reassigned FLINK-12351:
---

Assignee: Jark Wu

> AsyncWaitOperator should deep copy StreamElement when object reuse is enabled
> -
>
> Key: FLINK-12351
> URL: https://issues.apache.org/jira/browse/FLINK-12351
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Reporter: Jark Wu
>Assignee: Jark Wu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Currently, AsyncWaitOperator directly put the input StreamElement into 
> {{StreamElementQueue}}. But when object reuse is enabled, the StreamElement 
> is reused, which means the element in {{StreamElementQueue}} will be 
> modified. As a result, the output of AsyncWaitOperator might be wrong.
> An easy way to fix this might be deep copy the input StreamElement when 
> object reuse is enabled, like this: 
> https://github.com/apache/flink/blob/blink/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java#L209



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] wuchong commented on issue #8302: [FLINK-12269][table-blink] Support Temporal Table Join in blink planner and runtime

2019-05-04 Thread GitBox
wuchong commented on issue #8302: [FLINK-12269][table-blink] Support Temporal 
Table Join in blink planner and runtime
URL: https://github.com/apache/flink/pull/8302#issuecomment-489331877
 
 
   Hi @KurtYoung , I have addressed the review comments.
   
   1. add unit tests for all join runners
   2. fix code generation problem when temporal join with udf filter (a bug 
found recently in internal)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] klion26 commented on a change in pull request #8300: [FLINK-11638][docs-zh] Translate Savepoints page into Chinese

2019-05-04 Thread GitBox
klion26 commented on a change in pull request #8300: [FLINK-11638][docs-zh] 
Translate Savepoints page into Chinese
URL: https://github.com/apache/flink/pull/8300#discussion_r280980371
 
 

 ##
 File path: docs/ops/state/savepoints.zh.md
 ##
 @@ -25,32 +25,22 @@ under the License.
 * toc
 {:toc}
 
-## What is a Savepoint? How is a Savepoint different from a Checkpoint?
+## 什么是 Savepoint ? Savepoint 与 Checkpoint 有什么不同?
 
-A Savepoint is a consistent image of the execution state of a streaming job, 
created via Flink's [checkpointing mechanism]({{ site.baseurl 
}}/internals/stream_checkpointing.html). You can use Savepoints to 
stop-and-resume, fork,
-or update your Flink jobs. Savepoints consist of two parts: a directory with 
(typically large) binary files on stable storage (e.g. HDFS, S3, ...) and a 
(relatively small) meta data file. The files on stable storage represent the 
net data of the job's execution state
-image. The meta data file of a Savepoint contains (primarily) pointers to all 
files on stable storage that are part of the Savepoint, in form of absolute 
paths.
+Savepoint 是依据 Flink [checkpointing 机制]({{ site.baseurl 
}}/internals/stream_checkpointing.html)所创建的流作业执行状态的一致镜像。 你可以使用 Savepoint 进行 
Flink 作业的停止与重启、fork或者更新。 Savepoint 由两部分组成:稳定存储(列入 HDFS,S3,...) 
上包含二进制文件的目录(通常很大),和元数据文件(相对较小)。 稳定存储上的文件表示作业执行状态的数据镜像。 Savepoint 
的元数据文件以(绝对路径)的形式包含(主要)指向作为 Savepoint 一部分的稳定存储上的所有文件的指针。
 
 
-Attention: In order to allow upgrades between programs and 
Flink versions, it is important to check out the following section about assigning IDs to your operators.
+注意: 为了允许程序和 Flink 版本之间的升级,请务必查看以下有关分配算子 ID 的部分 。
 
+从概念上讲, Flink 的 Savepoint 与 Checkpoint 的不同之处类似于传统数据库中的备份与恢复日志之间的差异。 Checkpoint 
的主要目的是为意外失败的作为提供恢复机制。 Checkpoint 的生命周期由 Flink 管理,即 Flink 创建,管理和删除 Checkpoint - 
无需用户交互。 作为一种恢复和定期触发的方法,Checkpoint 实现有两个设计目标:i)轻量级创建和 ii)尽可能快地恢复。 
可能会利用某些特定的属性来达到这个,例如, 工作代码在执行尝试之间不会改变。 在用户终止作业后,通常会删除 Checkpoint(除非明确配置为保留的 
Checkpoint)。
 
-Conceptually, Flink's Savepoints are different from Checkpoints in a similar 
way that backups are different from recovery logs in traditional database 
systems. The primary purpose of Checkpoints is to provide a recovery mechanism 
in case of
-unexpected job failures. A Checkpoint's lifecycle is managed by Flink, i.e. a 
Checkpoint is created, owned, and released by Flink - without user interaction. 
As a method of recovery and being periodically triggered, two main
-design goals for the Checkpoint implementation are i) being as lightweight to 
create and ii) being as fast to restore from as possible. Optimizations towards 
those goals can exploit certain properties, e.g. that the job code
-doesn't change between the execution attempts. Checkpoints are usually dropped 
after the job was terminated by the user (except if explicitly configured as 
retained Checkpoints).
+ 与此相反、Savepoint 由用户创建,拥有和删除。 他们的用例是计划的,手动备份和恢复。 例如,升级 Flink 
版本,调整用户逻辑,改变并行度,以及进行红蓝部署等。 当然,Savepoint 必须在终止工作后继续存在。 从概念上讲,Savepoint 
的生成,恢复成本可能更高一些,Savepoint 更多地关注可移植性和对前面提到的作业更改的支持。
 
-In contrast to all this, Savepoints are created, owned, and deleted by the 
user. Their use-case is for planned, manual backup and resume. For example, 
this could be an update of your Flink version, changing your job graph,
-changing parallelism, forking a second job like for a red/blue deployment, and 
so on. Of course, Savepoints must survive job termination. Conceptually, 
Savepoints can be a bit more expensive to produce and restore and focus
-more on portability and support for the previously mentioned changes to the 
job.
+除去这些概念上的差异,Checkpoint 和 Savepoint 
的当前实现基本上使用相同的代码并生成相同的格式。然而,目前有一个例外,我们可能会在未来引入更多的差异。例外情况是使用 RocksDB 状态后端的增量 
Checkpoint。他们使用了一些 RocksDB 内部格式,而不是 Flink 的本机 Savepoint 格式。这使他们成为了与 Savepoint 
相比,更轻量级的 Checkpoint 机制的第一个实例。
 
-Those conceptual differences aside, the current implementations of Checkpoints 
and Savepoints are basically using the same code and produce the same format. 
However, there is currently one exception from this, and we might
-introduce more differences in the future. The exception are incremental 
checkpoints with the RocksDB state backend. They are using some RocksDB 
internal format instead of Flink’s native savepoint format. This makes them the
-first instance of a more lightweight checkpointing mechanism, compared to 
Savepoints.
+## 分配算子ID
 
-## Assigning Operator IDs
-
-It is **highly recommended** that you adjust your programs as described in 
this section in order to be able to upgrade your programs in the future. The 
main required change is to manually specify operator IDs via the 
**`uid(String)`** method. These IDs are used to scope the state of each 
operator.
+**强烈建议**你按照本节所述调整你的程序,以便将来能够升级你的程序。主要需要的更改是通过**`uid(String)`**方法手动指定算子 ID 。这些 
ID 用于限定每个操作符的状态。
 
 Review comment:
   I think this sentence can be improved `主要需要的更改是通过**`uid(String)`**方法手动指定算子 
ID`


This is an automated message from the Apache Git 

[GitHub] [flink] klion26 commented on a change in pull request #8300: [FLINK-11638][docs-zh] Translate Savepoints page into Chinese

2019-05-04 Thread GitBox
klion26 commented on a change in pull request #8300: [FLINK-11638][docs-zh] 
Translate Savepoints page into Chinese
URL: https://github.com/apache/flink/pull/8300#discussion_r280980514
 
 

 ##
 File path: docs/ops/state/savepoints.zh.md
 ##
 @@ -78,160 +68,158 @@ source-id   | State of StatefulSource
 mapper-id   | State of StatefulMapper
 {% endhighlight %}
 
-In the above example, the print sink is stateless and hence not part of the 
savepoint state. By default, we try to map each entry of the savepoint back to 
the new program.
+在上面的示例中,print sink 是无状态的,因此不是 Savepoint 状态的一部分。默认情况下,我们尝试将 Savepoint 
的每个条目映射回新程序。
 
-## Operations
+## 算子
 
-You can use the [command line client]({{ site.baseurl 
}}/ops/cli.html#savepoints) to *trigger savepoints*, *cancel a job with a 
savepoint*, *resume from savepoints*, and *dispose savepoints*.
+你可以使用[命令行客户端]({{site.baseurl}}/zh/ops/cli.html#Savepoint)来*触发 Savepoint 
*,*取消具有 Savepoint *的作业,*从 Savepoint *恢复,以及*部署 Savepoint *。
 
-With Flink >= 1.2.0 it is also possible to *resume from savepoints* using the 
webui.
+使用 Flink >= 1.2.0,还可以使用 webui *从 Savepoint 恢复*。
 
-### Triggering Savepoints
+### 触发 Savepoint
 
-When triggering a savepoint, a new savepoint directory is created where the 
data as well as the meta data will be stored. The location of this directory 
can be controlled by [configuring a default target directory](#configuration) 
or by specifying a custom target directory with the trigger commands (see the 
[`:targetDirectory` argument](#trigger-a-savepoint)).
+当触发 Savepoint 时,将创建一个新的 Savepoint 
目录,其中存储数据和元数据。可以通过[配置默认目标目录](#configuration)或使用触发器命令指定自定义目标目录(参见[`:targetDirectory
 `参数](#trigger-a-savepoint)来控制该目录的位置。
 
 
-Attention: The target directory has to be a location 
accessible by both the JobManager(s) and TaskManager(s) e.g. a location on a 
distributed file-system.
+注意:目标目录必须是 JobManager(s) 和 TaskManager(s) 
都可以访问的位置,例如分布式文件系统上的位置。
 
 
-For example with a `FsStateBackend` or `RocksDBStateBackend`:
+例如,使用 `FsStateBackend`  或 `RocksDBStateBackend` :
 
 {% highlight shell %}
-# Savepoint target directory
-/savepoints/
+# Savepoint 目标目录
+/Savepoint/
 
-# Savepoint directory
-/savepoints/savepoint-:shortjobid-:savepointid/
+# Savepoint 目录
+/Savepoint/savepoint-:shortjobid-:savepointid/
 
-# Savepoint file contains the checkpoint meta data
-/savepoints/savepoint-:shortjobid-:savepointid/_metadata
+# Savepoint 文件包含 Checkpoint元数据
+/Savepoint/savepoint-:shortjobid-:savepointid/_metadata
 
-# Savepoint state
-/savepoints/savepoint-:shortjobid-:savepointid/...
+# Savepoint 状态
+/Savepoint/savepoint-:shortjobid-:savepointid/...
 {% endhighlight %}
 
 
-  Note:
-Although it looks as if the savepoints may be moved, it is currently not 
possible due to absolute paths in the _metadata file.
-Please follow https://issues.apache.org/jira/browse/FLINK-5778;>FLINK-5778 for 
progress on lifting this restriction.
+  注意:
+虽然看起来好像可以移动 Savepoint ,但由于 _metadata 文件中的绝对路径,目前无法进行保存。
 
 Review comment:
   `但由于 _metadata 文件中的绝对路径,目前无法进行保存。` --> `但由于  _metadata 
 中保存的是绝对路径,因此暂时不支持。


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] klion26 commented on a change in pull request #8300: [FLINK-11638][docs-zh] Translate Savepoints page into Chinese

2019-05-04 Thread GitBox
klion26 commented on a change in pull request #8300: [FLINK-11638][docs-zh] 
Translate Savepoints page into Chinese
URL: https://github.com/apache/flink/pull/8300#discussion_r280980459
 
 

 ##
 File path: docs/ops/state/savepoints.zh.md
 ##
 @@ -78,160 +68,158 @@ source-id   | State of StatefulSource
 mapper-id   | State of StatefulMapper
 {% endhighlight %}
 
-In the above example, the print sink is stateless and hence not part of the 
savepoint state. By default, we try to map each entry of the savepoint back to 
the new program.
+在上面的示例中,print sink 是无状态的,因此不是 Savepoint 状态的一部分。默认情况下,我们尝试将 Savepoint 
的每个条目映射回新程序。
 
-## Operations
+## 算子
 
-You can use the [command line client]({{ site.baseurl 
}}/ops/cli.html#savepoints) to *trigger savepoints*, *cancel a job with a 
savepoint*, *resume from savepoints*, and *dispose savepoints*.
+你可以使用[命令行客户端]({{site.baseurl}}/zh/ops/cli.html#Savepoint)来*触发 Savepoint 
*,*取消具有 Savepoint *的作业,*从 Savepoint *恢复,以及*部署 Savepoint *。
 
-With Flink >= 1.2.0 it is also possible to *resume from savepoints* using the 
webui.
+使用 Flink >= 1.2.0,还可以使用 webui *从 Savepoint 恢复*。
 
 Review comment:
   `使用 Flink >= 1.2.0` --> `从 Flink 1.2.0 开始`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] klion26 commented on a change in pull request #8300: [FLINK-11638][docs-zh] Translate Savepoints page into Chinese

2019-05-04 Thread GitBox
klion26 commented on a change in pull request #8300: [FLINK-11638][docs-zh] 
Translate Savepoints page into Chinese
URL: https://github.com/apache/flink/pull/8300#discussion_r280980436
 
 

 ##
 File path: docs/ops/state/savepoints.zh.md
 ##
 @@ -78,160 +68,158 @@ source-id   | State of StatefulSource
 mapper-id   | State of StatefulMapper
 {% endhighlight %}
 
-In the above example, the print sink is stateless and hence not part of the 
savepoint state. By default, we try to map each entry of the savepoint back to 
the new program.
+在上面的示例中,print sink 是无状态的,因此不是 Savepoint 状态的一部分。默认情况下,我们尝试将 Savepoint 
的每个条目映射回新程序。
 
-## Operations
+## 算子
 
-You can use the [command line client]({{ site.baseurl 
}}/ops/cli.html#savepoints) to *trigger savepoints*, *cancel a job with a 
savepoint*, *resume from savepoints*, and *dispose savepoints*.
+你可以使用[命令行客户端]({{site.baseurl}}/zh/ops/cli.html#Savepoint)来*触发 Savepoint 
*,*取消具有 Savepoint *的作业,*从 Savepoint *恢复,以及*部署 Savepoint *。
 
 Review comment:
   `*取消具有 Savepoint *的作业` --> `*触发 Savepoint 并取消作业*`
   
   `*部署 Savepoint *` -->  `删除 Savepoint`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] klion26 commented on a change in pull request #8300: [FLINK-11638][docs-zh] Translate Savepoints page into Chinese

2019-05-04 Thread GitBox
klion26 commented on a change in pull request #8300: [FLINK-11638][docs-zh] 
Translate Savepoints page into Chinese
URL: https://github.com/apache/flink/pull/8300#discussion_r280980604
 
 

 ##
 File path: docs/ops/state/savepoints.zh.md
 ##
 @@ -78,160 +68,158 @@ source-id   | State of StatefulSource
 mapper-id   | State of StatefulMapper
 {% endhighlight %}
 
-In the above example, the print sink is stateless and hence not part of the 
savepoint state. By default, we try to map each entry of the savepoint back to 
the new program.
+在上面的示例中,print sink 是无状态的,因此不是 Savepoint 状态的一部分。默认情况下,我们尝试将 Savepoint 
的每个条目映射回新程序。
 
-## Operations
+## 算子
 
-You can use the [command line client]({{ site.baseurl 
}}/ops/cli.html#savepoints) to *trigger savepoints*, *cancel a job with a 
savepoint*, *resume from savepoints*, and *dispose savepoints*.
+你可以使用[命令行客户端]({{site.baseurl}}/zh/ops/cli.html#Savepoint)来*触发 Savepoint 
*,*取消具有 Savepoint *的作业,*从 Savepoint *恢复,以及*部署 Savepoint *。
 
-With Flink >= 1.2.0 it is also possible to *resume from savepoints* using the 
webui.
+使用 Flink >= 1.2.0,还可以使用 webui *从 Savepoint 恢复*。
 
-### Triggering Savepoints
+### 触发 Savepoint
 
-When triggering a savepoint, a new savepoint directory is created where the 
data as well as the meta data will be stored. The location of this directory 
can be controlled by [configuring a default target directory](#configuration) 
or by specifying a custom target directory with the trigger commands (see the 
[`:targetDirectory` argument](#trigger-a-savepoint)).
+当触发 Savepoint 时,将创建一个新的 Savepoint 
目录,其中存储数据和元数据。可以通过[配置默认目标目录](#configuration)或使用触发器命令指定自定义目标目录(参见[`:targetDirectory
 `参数](#trigger-a-savepoint)来控制该目录的位置。
 
 
-Attention: The target directory has to be a location 
accessible by both the JobManager(s) and TaskManager(s) e.g. a location on a 
distributed file-system.
+注意:目标目录必须是 JobManager(s) 和 TaskManager(s) 
都可以访问的位置,例如分布式文件系统上的位置。
 
 
-For example with a `FsStateBackend` or `RocksDBStateBackend`:
+例如,使用 `FsStateBackend`  或 `RocksDBStateBackend` :
 
 {% highlight shell %}
-# Savepoint target directory
-/savepoints/
+# Savepoint 目标目录
+/Savepoint/
 
-# Savepoint directory
-/savepoints/savepoint-:shortjobid-:savepointid/
+# Savepoint 目录
+/Savepoint/savepoint-:shortjobid-:savepointid/
 
-# Savepoint file contains the checkpoint meta data
-/savepoints/savepoint-:shortjobid-:savepointid/_metadata
+# Savepoint 文件包含 Checkpoint元数据
+/Savepoint/savepoint-:shortjobid-:savepointid/_metadata
 
-# Savepoint state
-/savepoints/savepoint-:shortjobid-:savepointid/...
+# Savepoint 状态
+/Savepoint/savepoint-:shortjobid-:savepointid/...
 {% endhighlight %}
 
 
-  Note:
-Although it looks as if the savepoints may be moved, it is currently not 
possible due to absolute paths in the _metadata file.
-Please follow https://issues.apache.org/jira/browse/FLINK-5778;>FLINK-5778 for 
progress on lifting this restriction.
+  注意:
+虽然看起来好像可以移动 Savepoint ,但由于 _metadata 文件中的绝对路径,目前无法进行保存。
+请按照https://issues.apache.org/jira/browse/FLINK-5778;> FLINK-5778 
了解取消此限制的进度。
 
-
-Note that if you use the `MemoryStateBackend`, metadata *and* savepoint state 
will be stored in the `_metadata` file. Since it is self-contained, you may 
move the file and restore from any location.
+请注意,如果使用 `MemoryStateBackend`,则元数据*和*  Savepoint 状态将存储在`_metadata`文件中。 
由于它是自包含的,你可以移动文件并从任何位置恢复。
 
 
-  Attention: It is discouraged to move or delete the last 
savepoint of a running job, because this might interfere with failure-recovery. 
Savepoints have side-effects on exactly-once sinks, therefore 
-  to ensure exactly-once semantics, if there is no checkpoint after the last 
savepoint, the savepoint will be used for recovery. 
+  注意: 不建议移动或删除正在运行作业的最后一个 Savepoint 
,因为这可能会干扰故障恢复。因此,Savepoint 对完全一次的接收器有副作用,为了确保精确的一次语义,如果在最后一个 Savepoint 之后没有 
Checkpoint ,那么将使用 Savepoint 进行恢复。
 
 Review comment:
   `完全一次` --> `精确一次`
   
   `精确的一次语义` --> `精确一次的语义`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] klion26 commented on a change in pull request #8300: [FLINK-11638][docs-zh] Translate Savepoints page into Chinese

2019-05-04 Thread GitBox
klion26 commented on a change in pull request #8300: [FLINK-11638][docs-zh] 
Translate Savepoints page into Chinese
URL: https://github.com/apache/flink/pull/8300#discussion_r280980635
 
 

 ##
 File path: docs/ops/state/savepoints.zh.md
 ##
 @@ -78,160 +68,158 @@ source-id   | State of StatefulSource
 mapper-id   | State of StatefulMapper
 {% endhighlight %}
 
-In the above example, the print sink is stateless and hence not part of the 
savepoint state. By default, we try to map each entry of the savepoint back to 
the new program.
+在上面的示例中,print sink 是无状态的,因此不是 Savepoint 状态的一部分。默认情况下,我们尝试将 Savepoint 
的每个条目映射回新程序。
 
-## Operations
+## 算子
 
-You can use the [command line client]({{ site.baseurl 
}}/ops/cli.html#savepoints) to *trigger savepoints*, *cancel a job with a 
savepoint*, *resume from savepoints*, and *dispose savepoints*.
+你可以使用[命令行客户端]({{site.baseurl}}/zh/ops/cli.html#Savepoint)来*触发 Savepoint 
*,*取消具有 Savepoint *的作业,*从 Savepoint *恢复,以及*部署 Savepoint *。
 
-With Flink >= 1.2.0 it is also possible to *resume from savepoints* using the 
webui.
+使用 Flink >= 1.2.0,还可以使用 webui *从 Savepoint 恢复*。
 
-### Triggering Savepoints
+### 触发 Savepoint
 
-When triggering a savepoint, a new savepoint directory is created where the 
data as well as the meta data will be stored. The location of this directory 
can be controlled by [configuring a default target directory](#configuration) 
or by specifying a custom target directory with the trigger commands (see the 
[`:targetDirectory` argument](#trigger-a-savepoint)).
+当触发 Savepoint 时,将创建一个新的 Savepoint 
目录,其中存储数据和元数据。可以通过[配置默认目标目录](#configuration)或使用触发器命令指定自定义目标目录(参见[`:targetDirectory
 `参数](#trigger-a-savepoint)来控制该目录的位置。
 
 
-Attention: The target directory has to be a location 
accessible by both the JobManager(s) and TaskManager(s) e.g. a location on a 
distributed file-system.
+注意:目标目录必须是 JobManager(s) 和 TaskManager(s) 
都可以访问的位置,例如分布式文件系统上的位置。
 
 
-For example with a `FsStateBackend` or `RocksDBStateBackend`:
+例如,使用 `FsStateBackend`  或 `RocksDBStateBackend` :
 
 {% highlight shell %}
-# Savepoint target directory
-/savepoints/
+# Savepoint 目标目录
+/Savepoint/
 
-# Savepoint directory
-/savepoints/savepoint-:shortjobid-:savepointid/
+# Savepoint 目录
+/Savepoint/savepoint-:shortjobid-:savepointid/
 
-# Savepoint file contains the checkpoint meta data
-/savepoints/savepoint-:shortjobid-:savepointid/_metadata
+# Savepoint 文件包含 Checkpoint元数据
+/Savepoint/savepoint-:shortjobid-:savepointid/_metadata
 
-# Savepoint state
-/savepoints/savepoint-:shortjobid-:savepointid/...
+# Savepoint 状态
+/Savepoint/savepoint-:shortjobid-:savepointid/...
 {% endhighlight %}
 
 
-  Note:
-Although it looks as if the savepoints may be moved, it is currently not 
possible due to absolute paths in the _metadata file.
-Please follow https://issues.apache.org/jira/browse/FLINK-5778;>FLINK-5778 for 
progress on lifting this restriction.
+  注意:
+虽然看起来好像可以移动 Savepoint ,但由于 _metadata 文件中的绝对路径,目前无法进行保存。
+请按照https://issues.apache.org/jira/browse/FLINK-5778;> FLINK-5778 
了解取消此限制的进度。
 
-
-Note that if you use the `MemoryStateBackend`, metadata *and* savepoint state 
will be stored in the `_metadata` file. Since it is self-contained, you may 
move the file and restore from any location.
+请注意,如果使用 `MemoryStateBackend`,则元数据*和*  Savepoint 状态将存储在`_metadata`文件中。 
由于它是自包含的,你可以移动文件并从任何位置恢复。
 
 
-  Attention: It is discouraged to move or delete the last 
savepoint of a running job, because this might interfere with failure-recovery. 
Savepoints have side-effects on exactly-once sinks, therefore 
-  to ensure exactly-once semantics, if there is no checkpoint after the last 
savepoint, the savepoint will be used for recovery. 
+  注意: 不建议移动或删除正在运行作业的最后一个 Savepoint 
,因为这可能会干扰故障恢复。因此,Savepoint 对完全一次的接收器有副作用,为了确保精确的一次语义,如果在最后一个 Savepoint 之后没有 
Checkpoint ,那么将使用 Savepoint 进行恢复。
 
 
- Trigger a Savepoint
+
+ 触发 Savepoint
 
 {% highlight shell %}
 $ bin/flink savepoint :jobId [:targetDirectory]
 {% endhighlight %}
 
-This will trigger a savepoint for the job with ID `:jobId`, and returns the 
path of the created savepoint. You need this path to restore and dispose 
savepoints.
+这将触发ID为`:jobId`的作业的Savepoint,并返回创建的 Savepoint 的路径。 你需要此路径来还原和部署 Savepoint 。
 
- Trigger a Savepoint with YARN
+ 使用 YARN 触发 Savepoint
 
 {% highlight shell %}
 $ bin/flink savepoint :jobId [:targetDirectory] -yid :yarnAppId
 {% endhighlight %}
 
-This will trigger a savepoint for the job with ID `:jobId` and YARN 
application ID `:yarnAppId`, and returns the path of the created savepoint.
+这将触发 ID 为`:jobId` 和 YARN 应用程序 ID `:yarnAppId`的作业的 Savepoint,并返回创建的 Savepoint 
的路径。
 
- Cancel Job with Savepoint
+ 使用 Savepoint 取消作业
 
 {% highlight shell %}
 $ bin/flink cancel -s [:targetDirectory] :jobId
 {% endhighlight %}
 
-This will atomically trigger a savepoint for the job with ID `:jobid` and 
cancel the job. Furthermore, you can specify a target file system directory to 
store the savepoint in.  The directory 

[GitHub] [flink] klion26 commented on a change in pull request #8341: [FLINK-11633][docs-zh] Translate "Working with state" into Chinese

2019-05-04 Thread GitBox
klion26 commented on a change in pull request #8341: [FLINK-11633][docs-zh] 
Translate "Working with state" into Chinese
URL: https://github.com/apache/flink/pull/8341#discussion_r280980168
 
 

 ##
 File path: docs/dev/stream/state/state.zh.md
 ##
 @@ -22,122 +22,87 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-This document explains how to use Flink's state abstractions when developing 
an application.
-
-* ToC
+本文档主要介绍如何在 Flink 作业中使用状态
+* 目录
 {:toc}
 
 ## Keyed State and Operator State
 
-There are two basic kinds of state in Flink: `Keyed State` and `Operator 
State`.
+Flink 中有两种基本的状态:`Keyed State` 和 `Operator State`
 
 ### Keyed State
 
-*Keyed State* is always relative to keys and can only be used in functions and 
operators on a `KeyedStream`.
+*Keyed State* 通常和键相关,仅可使用在 `KeyedStream` 的方法和算子中。
 
-You can think of Keyed State as Operator State that has been partitioned,
-or sharded, with exactly one state-partition per key.
-Each keyed-state is logically bound to a unique
-composite of , and since each key
-"belongs" to exactly one parallel instance of a keyed operator, we can
-think of this simply as .
+你可以把 Keyed State 看作分区或者共享的 Operator State, 而且每个键仅出现在一个分区内。
+逻辑上每个 keyed-state 和 <算子并发实例, key> 相绑定,由于每个 key 仅"属于"
+算子的一个并发,因此简化为 <算子, key>。
 
-Keyed State is further organized into so-called *Key Groups*. Key Groups are 
the
-atomic unit by which Flink can redistribute Keyed State;
-there are exactly as many Key Groups as the defined maximum parallelism.
-During execution each parallel instance of a keyed operator works with the keys
-for one or more Key Groups.
+Keyed State 会按照 *Key Group* 进行管理。Key Group 是 Flink 分发 Keyed State 的最小单元;
+Key Groups 的数目等于作业的最大并发数。在执行过程中,每个 keyed operator 会对应到一个或多个 Key Group
 
 ### Operator State
 
-With *Operator State* (or *non-keyed state*), each operator state is
-bound to one parallel operator instance.
-The [Kafka Connector]({{ site.baseurl }}/dev/connectors/kafka.html) is a good 
motivating example for the use of Operator State
-in Flink. Each parallel instance of the Kafka consumer maintains a map
-of topic partitions and offsets as its Operator State.
+对于 *Operator State* (或者 *non-keyed state*) 来说,每个 operator state 和一个并发实例进行绑定。
+[Kafka Connector]({{ site.baseurl }}/zh/dev/connectors/kafka.html) 是 Flink 中使用 
operator state 的一个很好的示例。
+每个 Kafka 消费者的并发在 Operator State 中维护一个 topic partition 到 offset 的映射关系。
 
-The Operator State interfaces support redistributing state among
-parallel operator instances when the parallelism is changed. There can be 
different schemes for doing this redistribution.
+Operator State 在 Flink 作业的并发改变后,会重新分发状态,分发的策略和 Keyed State 不一样。
 
 ## Raw and Managed State
 
-*Keyed State* and *Operator State* exist in two forms: *managed* and *raw*.
+*Keyed State* 和 *Operator State* 分别有两种存在形式:*managed* and *raw*.
 
-*Managed State* is represented in data structures controlled by the Flink 
runtime, such as internal hash tables, or RocksDB.
-Examples are "ValueState", "ListState", etc. Flink's runtime encodes
-the states and writes them into the checkpoints.
+*Managed State* 有 Flink runtime 中的数据结构所控制,比如内部的 hash table 或者 RocksDB。
+比如 "ValueState", "ListState" 等。Flink runtime 会对这些状态进行编码并写入 checkpoint。
 
-*Raw State* is state that operators keep in their own data structures. When 
checkpointed, they only write a sequence of bytes into
-the checkpoint. Flink knows nothing about the state's data structures and sees 
only the raw bytes.
+*Raw State* 则保存在算子自己的数据结构中。checkpoint 的时候,Flink 并不知晓具体的内容,仅仅写入一串字节序列到 
checkpoint。
 
-All datastream functions can use managed state, but the raw state interfaces 
can only be used when implementing operators.
-Using managed state (rather than raw state) is recommended, since with
-managed state Flink is able to automatically redistribute state when the 
parallelism is
-changed, and also do better memory management.
+所有 datastream 的方法都可以使用 managed state, 但是 raw state 则只能在实现算子的时候使用。
+由于 Flink 可以在修改并发是更好的重新分发状态数据,并且能够更好的管理内存,因此建议使用 managed state(而不是 raw state)。
 
-Attention If your managed state needs 
custom serialization logic, please see 
-the [corresponding guide](custom_serialization.html) in order to ensure future 
compatibility. Flink's default serializers 
-don't need special treatment.
+注意 如果你的 managed state 需要定制化的序列化逻辑,
+为了后续的兼容性请参考 [corresponding guide](custom_serialization.html),Flink 
默认提供的序列化器不需要用户做特殊的处理。
 
 ## Using Managed Keyed State
 
-The managed keyed state interface provides access to different types of state 
that are all scoped to
-the key of the current input element. This means that this type of state can 
only be used
-on a `KeyedStream`, which can be created via `stream.keyBy(…)`.
+managed keyed state 接口提供不同类型状态访问的接口,这些状态都限定于当前的输入数据。换句话说,这些状态仅可在 `KeyedStream`
 
 Review comment:
   `不同类型状态访问的接口` --> `不同类型状态的访问接口`
   
   `这些状态都限定于当前的输入数据` --> `这些状态以当前输入数据为键`


This is an automated 

[GitHub] [flink] klion26 commented on a change in pull request #8341: [FLINK-11633][docs-zh] Translate "Working with state" into Chinese

2019-05-04 Thread GitBox
klion26 commented on a change in pull request #8341: [FLINK-11633][docs-zh] 
Translate "Working with state" into Chinese
URL: https://github.com/apache/flink/pull/8341#discussion_r280980039
 
 

 ##
 File path: docs/dev/stream/state/state.zh.md
 ##
 @@ -22,122 +22,87 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-This document explains how to use Flink's state abstractions when developing 
an application.
-
-* ToC
+本文档主要介绍如何在 Flink 作业中使用状态
+* 目录
 {:toc}
 
 ## Keyed State and Operator State
 
-There are two basic kinds of state in Flink: `Keyed State` and `Operator 
State`.
+Flink 中有两种基本的状态:`Keyed State` 和 `Operator State`
 
 ### Keyed State
 
-*Keyed State* is always relative to keys and can only be used in functions and 
operators on a `KeyedStream`.
+*Keyed State* 通常和键相关,仅可使用在 `KeyedStream` 的方法和算子中。
 
-You can think of Keyed State as Operator State that has been partitioned,
-or sharded, with exactly one state-partition per key.
-Each keyed-state is logically bound to a unique
-composite of , and since each key
-"belongs" to exactly one parallel instance of a keyed operator, we can
-think of this simply as .
+你可以把 Keyed State 看作分区或者共享的 Operator State, 而且每个键仅出现在一个分区内。
+逻辑上每个 keyed-state 和 <算子并发实例, key> 相绑定,由于每个 key 仅"属于"
+算子的一个并发,因此简化为 <算子, key>。
 
-Keyed State is further organized into so-called *Key Groups*. Key Groups are 
the
-atomic unit by which Flink can redistribute Keyed State;
-there are exactly as many Key Groups as the defined maximum parallelism.
-During execution each parallel instance of a keyed operator works with the keys
-for one or more Key Groups.
+Keyed State 会按照 *Key Group* 进行管理。Key Group 是 Flink 分发 Keyed State 的最小单元;
+Key Groups 的数目等于作业的最大并发数。在执行过程中,每个 keyed operator 会对应到一个或多个 Key Group
 
 ### Operator State
 
-With *Operator State* (or *non-keyed state*), each operator state is
-bound to one parallel operator instance.
-The [Kafka Connector]({{ site.baseurl }}/dev/connectors/kafka.html) is a good 
motivating example for the use of Operator State
-in Flink. Each parallel instance of the Kafka consumer maintains a map
-of topic partitions and offsets as its Operator State.
+对于 *Operator State* (或者 *non-keyed state*) 来说,每个 operator state 和一个并发实例进行绑定。
+[Kafka Connector]({{ site.baseurl }}/zh/dev/connectors/kafka.html) 是 Flink 中使用 
operator state 的一个很好的示例。
+每个 Kafka 消费者的并发在 Operator State 中维护一个 topic partition 到 offset 的映射关系。
 
-The Operator State interfaces support redistributing state among
-parallel operator instances when the parallelism is changed. There can be 
different schemes for doing this redistribution.
+Operator State 在 Flink 作业的并发改变后,会重新分发状态,分发的策略和 Keyed State 不一样。
 
 ## Raw and Managed State
 
-*Keyed State* and *Operator State* exist in two forms: *managed* and *raw*.
+*Keyed State* 和 *Operator State* 分别有两种存在形式:*managed* and *raw*.
 
-*Managed State* is represented in data structures controlled by the Flink 
runtime, such as internal hash tables, or RocksDB.
-Examples are "ValueState", "ListState", etc. Flink's runtime encodes
-the states and writes them into the checkpoints.
+*Managed State* 有 Flink runtime 中的数据结构所控制,比如内部的 hash table 或者 RocksDB。
 
 Review comment:
   `有 Flink runtime ` --> `由 Flink runtime `


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] klion26 commented on a change in pull request #8341: [FLINK-11633][docs-zh] Translate "Working with state" into Chinese

2019-05-04 Thread GitBox
klion26 commented on a change in pull request #8341: [FLINK-11633][docs-zh] 
Translate "Working with state" into Chinese
URL: https://github.com/apache/flink/pull/8341#discussion_r280979940
 
 

 ##
 File path: docs/dev/stream/state/state.zh.md
 ##
 @@ -22,122 +22,87 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-This document explains how to use Flink's state abstractions when developing 
an application.
-
-* ToC
+本文档主要介绍如何在 Flink 作业中使用状态
+* 目录
 {:toc}
 
 ## Keyed State and Operator State
 
-There are two basic kinds of state in Flink: `Keyed State` and `Operator 
State`.
+Flink 中有两种基本的状态:`Keyed State` 和 `Operator State`
 
 ### Keyed State
 
-*Keyed State* is always relative to keys and can only be used in functions and 
operators on a `KeyedStream`.
+*Keyed State* 通常和键相关,仅可使用在 `KeyedStream` 的方法和算子中。
 
-You can think of Keyed State as Operator State that has been partitioned,
-or sharded, with exactly one state-partition per key.
-Each keyed-state is logically bound to a unique
-composite of , and since each key
-"belongs" to exactly one parallel instance of a keyed operator, we can
-think of this simply as .
+你可以把 Keyed State 看作分区或者共享的 Operator State, 而且每个键仅出现在一个分区内。
+逻辑上每个 keyed-state 和 <算子并发实例, key> 相绑定,由于每个 key 仅"属于"
+算子的一个并发,因此简化为 <算子, key>。
 
-Keyed State is further organized into so-called *Key Groups*. Key Groups are 
the
-atomic unit by which Flink can redistribute Keyed State;
-there are exactly as many Key Groups as the defined maximum parallelism.
-During execution each parallel instance of a keyed operator works with the keys
-for one or more Key Groups.
+Keyed State 会按照 *Key Group* 进行管理。Key Group 是 Flink 分发 Keyed State 的最小单元;
+Key Groups 的数目等于作业的最大并发数。在执行过程中,每个 keyed operator 会对应到一个或多个 Key Group
 
 ### Operator State
 
-With *Operator State* (or *non-keyed state*), each operator state is
-bound to one parallel operator instance.
-The [Kafka Connector]({{ site.baseurl }}/dev/connectors/kafka.html) is a good 
motivating example for the use of Operator State
-in Flink. Each parallel instance of the Kafka consumer maintains a map
-of topic partitions and offsets as its Operator State.
+对于 *Operator State* (或者 *non-keyed state*) 来说,每个 operator state 和一个并发实例进行绑定。
+[Kafka Connector]({{ site.baseurl }}/zh/dev/connectors/kafka.html) 是 Flink 中使用 
operator state 的一个很好的示例。
 
 Review comment:
   `是 Flink 中使用 operator state 的一个很好的示例。` --> `是 Flink 中一个使用 operator state 
的好示例。`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] klion26 commented on a change in pull request #8341: [FLINK-11633][docs-zh] Translate "Working with state" into Chinese

2019-05-04 Thread GitBox
klion26 commented on a change in pull request #8341: [FLINK-11633][docs-zh] 
Translate "Working with state" into Chinese
URL: https://github.com/apache/flink/pull/8341#discussion_r280979906
 
 

 ##
 File path: docs/dev/stream/state/state.zh.md
 ##
 @@ -22,122 +22,87 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-This document explains how to use Flink's state abstractions when developing 
an application.
-
-* ToC
+本文档主要介绍如何在 Flink 作业中使用状态
+* 目录
 {:toc}
 
 ## Keyed State and Operator State
 
-There are two basic kinds of state in Flink: `Keyed State` and `Operator 
State`.
+Flink 中有两种基本的状态:`Keyed State` 和 `Operator State`
 
 ### Keyed State
 
-*Keyed State* is always relative to keys and can only be used in functions and 
operators on a `KeyedStream`.
+*Keyed State* 通常和键相关,仅可使用在 `KeyedStream` 的方法和算子中。
 
-You can think of Keyed State as Operator State that has been partitioned,
-or sharded, with exactly one state-partition per key.
-Each keyed-state is logically bound to a unique
-composite of , and since each key
-"belongs" to exactly one parallel instance of a keyed operator, we can
-think of this simply as .
+你可以把 Keyed State 看作分区或者共享的 Operator State, 而且每个键仅出现在一个分区内。
+逻辑上每个 keyed-state 和 <算子并发实例, key> 相绑定,由于每个 key 仅"属于"
+算子的一个并发,因此简化为 <算子, key>。
 
-Keyed State is further organized into so-called *Key Groups*. Key Groups are 
the
-atomic unit by which Flink can redistribute Keyed State;
-there are exactly as many Key Groups as the defined maximum parallelism.
-During execution each parallel instance of a keyed operator works with the keys
-for one or more Key Groups.
+Keyed State 会按照 *Key Group* 进行管理。Key Group 是 Flink 分发 Keyed State 的最小单元;
+Key Groups 的数目等于作业的最大并发数。在执行过程中,每个 keyed operator 会对应到一个或多个 Key Group
 
 Review comment:
   `Key Groups` can be `Key Group`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] klion26 commented on a change in pull request #8341: [FLINK-11633][docs-zh] Translate "Working with state" into Chinese

2019-05-04 Thread GitBox
klion26 commented on a change in pull request #8341: [FLINK-11633][docs-zh] 
Translate "Working with state" into Chinese
URL: https://github.com/apache/flink/pull/8341#discussion_r280980096
 
 

 ##
 File path: docs/dev/stream/state/state.zh.md
 ##
 @@ -22,122 +22,87 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-This document explains how to use Flink's state abstractions when developing 
an application.
-
-* ToC
+本文档主要介绍如何在 Flink 作业中使用状态
+* 目录
 {:toc}
 
 ## Keyed State and Operator State
 
-There are two basic kinds of state in Flink: `Keyed State` and `Operator 
State`.
+Flink 中有两种基本的状态:`Keyed State` 和 `Operator State`
 
 ### Keyed State
 
-*Keyed State* is always relative to keys and can only be used in functions and 
operators on a `KeyedStream`.
+*Keyed State* 通常和键相关,仅可使用在 `KeyedStream` 的方法和算子中。
 
-You can think of Keyed State as Operator State that has been partitioned,
-or sharded, with exactly one state-partition per key.
-Each keyed-state is logically bound to a unique
-composite of , and since each key
-"belongs" to exactly one parallel instance of a keyed operator, we can
-think of this simply as .
+你可以把 Keyed State 看作分区或者共享的 Operator State, 而且每个键仅出现在一个分区内。
+逻辑上每个 keyed-state 和 <算子并发实例, key> 相绑定,由于每个 key 仅"属于"
+算子的一个并发,因此简化为 <算子, key>。
 
-Keyed State is further organized into so-called *Key Groups*. Key Groups are 
the
-atomic unit by which Flink can redistribute Keyed State;
-there are exactly as many Key Groups as the defined maximum parallelism.
-During execution each parallel instance of a keyed operator works with the keys
-for one or more Key Groups.
+Keyed State 会按照 *Key Group* 进行管理。Key Group 是 Flink 分发 Keyed State 的最小单元;
+Key Groups 的数目等于作业的最大并发数。在执行过程中,每个 keyed operator 会对应到一个或多个 Key Group
 
 ### Operator State
 
-With *Operator State* (or *non-keyed state*), each operator state is
-bound to one parallel operator instance.
-The [Kafka Connector]({{ site.baseurl }}/dev/connectors/kafka.html) is a good 
motivating example for the use of Operator State
-in Flink. Each parallel instance of the Kafka consumer maintains a map
-of topic partitions and offsets as its Operator State.
+对于 *Operator State* (或者 *non-keyed state*) 来说,每个 operator state 和一个并发实例进行绑定。
+[Kafka Connector]({{ site.baseurl }}/zh/dev/connectors/kafka.html) 是 Flink 中使用 
operator state 的一个很好的示例。
+每个 Kafka 消费者的并发在 Operator State 中维护一个 topic partition 到 offset 的映射关系。
 
-The Operator State interfaces support redistributing state among
-parallel operator instances when the parallelism is changed. There can be 
different schemes for doing this redistribution.
+Operator State 在 Flink 作业的并发改变后,会重新分发状态,分发的策略和 Keyed State 不一样。
 
 ## Raw and Managed State
 
-*Keyed State* and *Operator State* exist in two forms: *managed* and *raw*.
+*Keyed State* 和 *Operator State* 分别有两种存在形式:*managed* and *raw*.
 
-*Managed State* is represented in data structures controlled by the Flink 
runtime, such as internal hash tables, or RocksDB.
-Examples are "ValueState", "ListState", etc. Flink's runtime encodes
-the states and writes them into the checkpoints.
+*Managed State* 有 Flink runtime 中的数据结构所控制,比如内部的 hash table 或者 RocksDB。
+比如 "ValueState", "ListState" 等。Flink runtime 会对这些状态进行编码并写入 checkpoint。
 
-*Raw State* is state that operators keep in their own data structures. When 
checkpointed, they only write a sequence of bytes into
-the checkpoint. Flink knows nothing about the state's data structures and sees 
only the raw bytes.
+*Raw State* 则保存在算子自己的数据结构中。checkpoint 的时候,Flink 并不知晓具体的内容,仅仅写入一串字节序列到 
checkpoint。
 
-All datastream functions can use managed state, but the raw state interfaces 
can only be used when implementing operators.
-Using managed state (rather than raw state) is recommended, since with
-managed state Flink is able to automatically redistribute state when the 
parallelism is
-changed, and also do better memory management.
+所有 datastream 的方法都可以使用 managed state, 但是 raw state 则只能在实现算子的时候使用。
+由于 Flink 可以在修改并发是更好的重新分发状态数据,并且能够更好的管理内存,因此建议使用 managed state(而不是 raw state)。
 
-Attention If your managed state needs 
custom serialization logic, please see 
-the [corresponding guide](custom_serialization.html) in order to ensure future 
compatibility. Flink's default serializers 
-don't need special treatment.
+注意 如果你的 managed state 需要定制化的序列化逻辑,
+为了后续的兼容性请参考 [corresponding guide](custom_serialization.html),Flink 
默认提供的序列化器不需要用户做特殊的处理。
 
 Review comment:
   `corresponding guide` --> `相应指南`
   
   `Flink 默认提供的序列化器` --> `Flink 的默认序列化器`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] klion26 commented on a change in pull request #8341: [FLINK-11633][docs-zh] Translate "Working with state" into Chinese

2019-05-04 Thread GitBox
klion26 commented on a change in pull request #8341: [FLINK-11633][docs-zh] 
Translate "Working with state" into Chinese
URL: https://github.com/apache/flink/pull/8341#discussion_r280979970
 
 

 ##
 File path: docs/dev/stream/state/state.zh.md
 ##
 @@ -22,122 +22,87 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-This document explains how to use Flink's state abstractions when developing 
an application.
-
-* ToC
+本文档主要介绍如何在 Flink 作业中使用状态
+* 目录
 {:toc}
 
 ## Keyed State and Operator State
 
-There are two basic kinds of state in Flink: `Keyed State` and `Operator 
State`.
+Flink 中有两种基本的状态:`Keyed State` 和 `Operator State`
 
 ### Keyed State
 
-*Keyed State* is always relative to keys and can only be used in functions and 
operators on a `KeyedStream`.
+*Keyed State* 通常和键相关,仅可使用在 `KeyedStream` 的方法和算子中。
 
-You can think of Keyed State as Operator State that has been partitioned,
-or sharded, with exactly one state-partition per key.
-Each keyed-state is logically bound to a unique
-composite of , and since each key
-"belongs" to exactly one parallel instance of a keyed operator, we can
-think of this simply as .
+你可以把 Keyed State 看作分区或者共享的 Operator State, 而且每个键仅出现在一个分区内。
+逻辑上每个 keyed-state 和 <算子并发实例, key> 相绑定,由于每个 key 仅"属于"
+算子的一个并发,因此简化为 <算子, key>。
 
-Keyed State is further organized into so-called *Key Groups*. Key Groups are 
the
-atomic unit by which Flink can redistribute Keyed State;
-there are exactly as many Key Groups as the defined maximum parallelism.
-During execution each parallel instance of a keyed operator works with the keys
-for one or more Key Groups.
+Keyed State 会按照 *Key Group* 进行管理。Key Group 是 Flink 分发 Keyed State 的最小单元;
+Key Groups 的数目等于作业的最大并发数。在执行过程中,每个 keyed operator 会对应到一个或多个 Key Group
 
 ### Operator State
 
-With *Operator State* (or *non-keyed state*), each operator state is
-bound to one parallel operator instance.
-The [Kafka Connector]({{ site.baseurl }}/dev/connectors/kafka.html) is a good 
motivating example for the use of Operator State
-in Flink. Each parallel instance of the Kafka consumer maintains a map
-of topic partitions and offsets as its Operator State.
+对于 *Operator State* (或者 *non-keyed state*) 来说,每个 operator state 和一个并发实例进行绑定。
+[Kafka Connector]({{ site.baseurl }}/zh/dev/connectors/kafka.html) 是 Flink 中使用 
operator state 的一个很好的示例。
+每个 Kafka 消费者的并发在 Operator State 中维护一个 topic partition 到 offset 的映射关系。
 
-The Operator State interfaces support redistributing state among
-parallel operator instances when the parallelism is changed. There can be 
different schemes for doing this redistribution.
+Operator State 在 Flink 作业的并发改变后,会重新分发状态,分发的策略和 Keyed State 不一样。
 
 Review comment:
   `Operator State 在 Flink 作业的并发改变后` --> `Operator State 在 Flink 作业改变并发后`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] klion26 commented on a change in pull request #8341: [FLINK-11633][docs-zh] Translate "Working with state" into Chinese

2019-05-04 Thread GitBox
klion26 commented on a change in pull request #8341: [FLINK-11633][docs-zh] 
Translate "Working with state" into Chinese
URL: https://github.com/apache/flink/pull/8341#discussion_r280980068
 
 

 ##
 File path: docs/dev/stream/state/state.zh.md
 ##
 @@ -22,122 +22,87 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-This document explains how to use Flink's state abstractions when developing 
an application.
-
-* ToC
+本文档主要介绍如何在 Flink 作业中使用状态
+* 目录
 {:toc}
 
 ## Keyed State and Operator State
 
-There are two basic kinds of state in Flink: `Keyed State` and `Operator 
State`.
+Flink 中有两种基本的状态:`Keyed State` 和 `Operator State`
 
 ### Keyed State
 
-*Keyed State* is always relative to keys and can only be used in functions and 
operators on a `KeyedStream`.
+*Keyed State* 通常和键相关,仅可使用在 `KeyedStream` 的方法和算子中。
 
-You can think of Keyed State as Operator State that has been partitioned,
-or sharded, with exactly one state-partition per key.
-Each keyed-state is logically bound to a unique
-composite of , and since each key
-"belongs" to exactly one parallel instance of a keyed operator, we can
-think of this simply as .
+你可以把 Keyed State 看作分区或者共享的 Operator State, 而且每个键仅出现在一个分区内。
+逻辑上每个 keyed-state 和 <算子并发实例, key> 相绑定,由于每个 key 仅"属于"
+算子的一个并发,因此简化为 <算子, key>。
 
-Keyed State is further organized into so-called *Key Groups*. Key Groups are 
the
-atomic unit by which Flink can redistribute Keyed State;
-there are exactly as many Key Groups as the defined maximum parallelism.
-During execution each parallel instance of a keyed operator works with the keys
-for one or more Key Groups.
+Keyed State 会按照 *Key Group* 进行管理。Key Group 是 Flink 分发 Keyed State 的最小单元;
+Key Groups 的数目等于作业的最大并发数。在执行过程中,每个 keyed operator 会对应到一个或多个 Key Group
 
 ### Operator State
 
-With *Operator State* (or *non-keyed state*), each operator state is
-bound to one parallel operator instance.
-The [Kafka Connector]({{ site.baseurl }}/dev/connectors/kafka.html) is a good 
motivating example for the use of Operator State
-in Flink. Each parallel instance of the Kafka consumer maintains a map
-of topic partitions and offsets as its Operator State.
+对于 *Operator State* (或者 *non-keyed state*) 来说,每个 operator state 和一个并发实例进行绑定。
+[Kafka Connector]({{ site.baseurl }}/zh/dev/connectors/kafka.html) 是 Flink 中使用 
operator state 的一个很好的示例。
+每个 Kafka 消费者的并发在 Operator State 中维护一个 topic partition 到 offset 的映射关系。
 
-The Operator State interfaces support redistributing state among
-parallel operator instances when the parallelism is changed. There can be 
different schemes for doing this redistribution.
+Operator State 在 Flink 作业的并发改变后,会重新分发状态,分发的策略和 Keyed State 不一样。
 
 ## Raw and Managed State
 
-*Keyed State* and *Operator State* exist in two forms: *managed* and *raw*.
+*Keyed State* 和 *Operator State* 分别有两种存在形式:*managed* and *raw*.
 
-*Managed State* is represented in data structures controlled by the Flink 
runtime, such as internal hash tables, or RocksDB.
-Examples are "ValueState", "ListState", etc. Flink's runtime encodes
-the states and writes them into the checkpoints.
+*Managed State* 有 Flink runtime 中的数据结构所控制,比如内部的 hash table 或者 RocksDB。
+比如 "ValueState", "ListState" 等。Flink runtime 会对这些状态进行编码并写入 checkpoint。
 
-*Raw State* is state that operators keep in their own data structures. When 
checkpointed, they only write a sequence of bytes into
-the checkpoint. Flink knows nothing about the state's data structures and sees 
only the raw bytes.
+*Raw State* 则保存在算子自己的数据结构中。checkpoint 的时候,Flink 并不知晓具体的内容,仅仅写入一串字节序列到 
checkpoint。
 
-All datastream functions can use managed state, but the raw state interfaces 
can only be used when implementing operators.
-Using managed state (rather than raw state) is recommended, since with
-managed state Flink is able to automatically redistribute state when the 
parallelism is
-changed, and also do better memory management.
+所有 datastream 的方法都可以使用 managed state, 但是 raw state 则只能在实现算子的时候使用。
+由于 Flink 可以在修改并发是更好的重新分发状态数据,并且能够更好的管理内存,因此建议使用 managed state(而不是 raw state)。
 
 Review comment:
   `由于 Flink 可以在修改并发是更好的重新分发状态数据` --> `由于 Flink 可以在修改并发时更好的分发状态数据`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] klion26 commented on a change in pull request #8341: [FLINK-11633][docs-zh] Translate "Working with state" into Chinese

2019-05-04 Thread GitBox
klion26 commented on a change in pull request #8341: [FLINK-11633][docs-zh] 
Translate "Working with state" into Chinese
URL: https://github.com/apache/flink/pull/8341#discussion_r280979872
 
 

 ##
 File path: docs/dev/stream/state/state.zh.md
 ##
 @@ -22,122 +22,87 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-This document explains how to use Flink's state abstractions when developing 
an application.
-
-* ToC
+本文档主要介绍如何在 Flink 作业中使用状态
+* 目录
 {:toc}
 
 ## Keyed State and Operator State
 
-There are two basic kinds of state in Flink: `Keyed State` and `Operator 
State`.
+Flink 中有两种基本的状态:`Keyed State` 和 `Operator State`
 
 ### Keyed State
 
-*Keyed State* is always relative to keys and can only be used in functions and 
operators on a `KeyedStream`.
+*Keyed State* 通常和键相关,仅可使用在 `KeyedStream` 的方法和算子中。
 
-You can think of Keyed State as Operator State that has been partitioned,
-or sharded, with exactly one state-partition per key.
-Each keyed-state is logically bound to a unique
-composite of , and since each key
-"belongs" to exactly one parallel instance of a keyed operator, we can
-think of this simply as .
+你可以把 Keyed State 看作分区或者共享的 Operator State, 而且每个键仅出现在一个分区内。
+逻辑上每个 keyed-state 和 <算子并发实例, key> 相绑定,由于每个 key 仅"属于"
 
 Review comment:
   maybe `逻辑上每个 keyed-state 和 <算子并发实例, key> 相绑定` can be `逻辑上每个 keyed-state 和 
<算子并发实例, key> 绑定`
   
   `由于每个 key 仅"属于"` --> `另外每个 key 仅"属于"`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-12400) NullpointerException using SimpleStringSchema with Kafka

2019-05-04 Thread Pierre Zemb (JIRA)
Pierre Zemb created FLINK-12400:
---

 Summary: NullpointerException using SimpleStringSchema with Kafka
 Key: FLINK-12400
 URL: https://issues.apache.org/jira/browse/FLINK-12400
 Project: Flink
  Issue Type: Improvement
  Components: API / Type Serialization System
Affects Versions: 1.8.0, 1.7.2
 Environment: Flink 1.7.2 job on 1.8 cluster
Kafka 0.10 with a topic in log-compaction
Reporter: Pierre Zemb
Assignee: Pierre Zemb


Hi!

Yesterday, we saw a strange behavior with our Flink job and Kafka. We are 
consuming a Kafka topic setup in 
[log-compaction|https://kafka.apache.org/documentation/#compaction] mode. As 
such, sending a message with a null payload acts like a tombstone.

We are consuming Kafka like this:

{code:java}
new FlinkKafkaConsumer010<>  ("topic", new SimpleStringSchema(), 
this.kafkaProperties)
{code}

When we sent the message, job failed because of a NullPointerException 
[here|https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/serialization/SimpleStringSchema.java#L75].
 `byte[] message` was null, causing the NPE. 

We forked the class and added a basic nullable check, returning null if so. It 
fixed our issue. 

Should we add it to the main class?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] PierreZ commented on a change in pull request #8272: [FLINK-12274] Fix documentation to enable Queryable State

2019-05-04 Thread GitBox
PierreZ commented on a change in pull request #8272: [FLINK-12274] Fix 
documentation to enable Queryable State
URL: https://github.com/apache/flink/pull/8272#discussion_r280978158
 
 

 ##
 File path: docs/dev/stream/state/queryable_state.md
 ##
 @@ -69,10 +69,12 @@ response back to the client.
 
 ## Activating Queryable State
 
-To enable queryable state on your Flink cluster, you just have to copy the 
-`flink-queryable-state-runtime{{ site.scala_version_suffix }}-{{site.version 
}}.jar` 
+To enable queryable state on your Flink cluster, you need to do the following:
+
+ 1. copy the `flink-queryable-state-runtime{{ site.scala_version_suffix 
}}-{{site.version }}.jar` 
 from the `opt/` folder of your [Flink 
distribution](https://flink.apache.org/downloads.html "Apache Flink: 
Downloads"), 
-to the `lib/` folder. Otherwise, the queryable state feature is not enabled. 
+to the `lib/` folder.
+ 2. set the property `queryable-state.enable` to `true` in 
`./conf/flink-conf.yaml`. See the [Configuration]({{ site.baseurl 
}}/ops/config.html#queryable-state) documentation for details and additional 
parameters.
 
 Review comment:
   Thanks to both of you, I'm dropping the last part of the sentence!


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-12392) FlinkRelMetadataQuery does not compile with Scala 2.12

2019-05-04 Thread Jark Wu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12392?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16833022#comment-16833022
 ] 

Jark Wu commented on FLINK-12392:
-

cc [~godfreyhe]

> FlinkRelMetadataQuery does not compile with Scala 2.12
> --
>
> Key: FLINK-12392
> URL: https://issues.apache.org/jira/browse/FLINK-12392
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.9.0
>Reporter: Chesnay Schepler
>Priority: Major
> Fix For: 1.9.0
>
>
> {code}
> 10:57:51.770 [ERROR] 
> /home/travis/build/apache/flink/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMetadataQuery.scala:52:
>  error: value EMPTY in class RelMetadataQuery cannot be accessed in object 
> org.apache.calcite.rel.metadata.RelMetadataQuery
> 10:57:51.770 [ERROR]  Access to protected value EMPTY not permitted because
> 10:57:51.770 [ERROR]  enclosing package metadata in package plan is not a 
> subclass of
> 10:57:51.770 [ERROR]  class RelMetadataQuery in package metadata where target 
> is defined
> 10:57:51.770 [ERROR] this(RelMetadataQuery.THREAD_PROVIDERS.get, 
> RelMetadataQuery.EMPTY)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] klion26 commented on issue #8341: [FLINK-11633][docs-zh] Translate "Working with state" into Chinese

2019-05-04 Thread GitBox
klion26 commented on issue #8341: [FLINK-11633][docs-zh] Translate "Working 
with state" into Chinese
URL: https://github.com/apache/flink/pull/8341#issuecomment-489305163
 
 
   This page contains many pieces of information, so I'll recheck the 
translation again, and any reviews are welcome.
   @wuchong 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-11633) Translate "Working with State" into Chinese

2019-05-04 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11633?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-11633:
---
Labels: pull-request-available  (was: )

> Translate "Working with State" into Chinese
> ---
>
> Key: FLINK-11633
> URL: https://issues.apache.org/jira/browse/FLINK-11633
> Project: Flink
>  Issue Type: Sub-task
>  Components: chinese-translation, Documentation
>Reporter: Congxian Qiu(klion26)
>Assignee: Congxian Qiu(klion26)
>Priority: Major
>  Labels: pull-request-available
>
> Doc locates in flink/doc/dev/state/state.md



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] flinkbot commented on issue #8341: [FLINK-11633][docs-zh] Translate "Working with state" into Chinese

2019-05-04 Thread GitBox
flinkbot commented on issue #8341: [FLINK-11633][docs-zh] Translate "Working 
with state" into Chinese
URL: https://github.com/apache/flink/pull/8341#issuecomment-489305060
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] klion26 opened a new pull request #8341: [FLINK-11633][docs-zh] Translate "Working with state" into Chinese

2019-05-04 Thread GitBox
klion26 opened a new pull request #8341: [FLINK-11633][docs-zh] Translate 
"Working with state" into Chinese
URL: https://github.com/apache/flink/pull/8341
 
 
   ## What is the purpose of the change
   
   Translate "Working with state" into Chinese
   
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] Xeli commented on a change in pull request #8259: [FLINK-12325][metrics] Fix bug in statsd exporter when using negative values

2019-05-04 Thread GitBox
Xeli commented on a change in pull request #8259: [FLINK-12325][metrics] Fix 
bug in statsd exporter when using negative values
URL: https://github.com/apache/flink/pull/8259#discussion_r280971043
 
 

 ##
 File path: 
flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java
 ##
 @@ -240,12 +240,6 @@ public String filterCharacters(String input) {
}
 
private boolean numberIsNegative(Number input) {
-   try {
-   return new 
BigDecimal(input.toString()).compareTo(BigDecimal.ZERO) == -1;
-   } catch (Exception e) {
-   //not all Number's can be converted to a BigDecimal, 
such as Infinity or NaN
-   //in this case we just say it isn't a negative number
-   return false;
-   }
+   return input.doubleValue() < 0;
 
 Review comment:
   Ah sorry, no reason! 
   
   I've changed it to Double.compare(), I've also changed line #190 to use this 
method so the double check there works in the same way.  


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services