[GitHub] [flink] zhijiangW commented on a change in pull request #8242: [FLINK-6227][network] Introduce the DataConsumptionException for downstream task failure
zhijiangW commented on a change in pull request #8242: [FLINK-6227][network] Introduce the DataConsumptionException for downstream task failure URL: https://github.com/apache/flink/pull/8242#discussion_r281005469 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java ## @@ -162,10 +164,13 @@ void assignExclusiveSegments(List segments) { public void requestSubpartition(int subpartitionIndex) throws IOException, InterruptedException { if (partitionRequestClient == null) { // Create a client and request the partition - partitionRequestClient = connectionManager - .createPartitionRequestClient(connectionId); + try { + partitionRequestClient = connectionManager.createPartitionRequestClient(connectionId); + } catch (RemoteTransportException ex) { Review comment: It should check `IOException` here and only wrap for `RemoteTransportException` instance. For the case of `LocalTransportException` it should not need to restart producer side. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sunjincheng121 commented on issue #8230: [FLINK-10977][table] Add streaming non-window FlatAggregate to Table API
sunjincheng121 commented on issue #8230: [FLINK-10977][table] Add streaming non-window FlatAggregate to Table API URL: https://github.com/apache/flink/pull/8230#issuecomment-489392533 Thanks for create the JIRA @hequn8128! +1 to merged. @flinkbot approve all This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-12401) Support incremental emit for non-window streaming FlatAggregate on Table API
[ https://issues.apache.org/jira/browse/FLINK-12401?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16833223#comment-16833223 ] Hequn Cheng commented on FLINK-12401: - [~jark] Hi, great that you are also interested in this. This jira is going to support emit incremental value either in Acc or AccRetract. Whether Acc or AccRetract will be decided by the query. As for local-global, I think you raised a good point. Similar to the normal Aggregate, flatAggregate could also support local-global and miniBatch. However, I planned to support these optimizations later. What do you think? > Support incremental emit for non-window streaming FlatAggregate on Table API > > > Key: FLINK-12401 > URL: https://issues.apache.org/jira/browse/FLINK-12401 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > > As described in > [Flip-29|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=97552739], > there are two output modes for non-window streaming flatAggregate. One is > emitting with full values, the other is emitting with incremental values. > [FLINK-10977|https://issues.apache.org/jira/browse/FLINK-10977] supports the > former one, this jira is going to support the latter one. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] hequn8128 commented on issue #8230: [FLINK-10977][table] Add streaming non-window FlatAggregate to Table API
hequn8128 commented on issue #8230: [FLINK-10977][table] Add streaming non-window FlatAggregate to Table API URL: https://github.com/apache/flink/pull/8230#issuecomment-489390597 @sunjincheng121 Thank you for the reminder. I have created another jira([FLINK-12401](https://issues.apache.org/jira/browse/FLINK-12401)) for the incremental mode. Best, Hequn This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy URL: https://github.com/apache/flink/pull/8309#discussion_r281004053 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java ## @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.api.common.InputDependencyConstraint; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.scheduler.DeploymentOption; +import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption; +import org.apache.flink.runtime.scheduler.SchedulerOperations; + +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.Optional; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@link SchedulingStrategy} instance for batch job which schedule vertices when input data are ready. + */ +public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy { + + private final SchedulerOperations schedulerOperations; + + private final SchedulingTopology schedulingTopology; + + private final DeploymentOption deploymentOption = new DeploymentOption(true); + + private final JobGraph jobGraph; + + public LazyFromSourcesSchedulingStrategy( + SchedulerOperations schedulerOperations, + SchedulingTopology schedulingTopology, + JobGraph jobGraph) { + this.schedulerOperations = checkNotNull(schedulerOperations); + this.schedulingTopology = checkNotNull(schedulingTopology); + this.jobGraph = checkNotNull(jobGraph); + } + + @Override + public void startScheduling() { + List executionVertexDeploymentOptions = new LinkedList<>(); + for (SchedulingVertex schedulingVertex : schedulingTopology.getVertices()) { + if (jobGraph.findVertexByID(schedulingVertex.getJobVertexId()).isInputVertex()) { + // schedule vertices without consumed result partition + executionVertexDeploymentOptions.add( + new ExecutionVertexDeploymentOption(schedulingVertex.getId(), deploymentOption)); + } + } + if (!executionVertexDeploymentOptions.isEmpty()) { + schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions); + } + } + + @Override + public void restartTasks(Set verticesNeedingRestart) { + List executionVertexDeploymentOptions = new ArrayList<>(verticesNeedingRestart.size()); + for (ExecutionVertexID executionVertexID : verticesNeedingRestart) { + executionVertexDeploymentOptions.add( + new ExecutionVertexDeploymentOption(executionVertexID, deploymentOption)); + } + schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions); + } + + @Override + public void onExecutionStateChange(ExecutionVertexID executionVertexId, ExecutionState executionState) { Review comment: Since using vertex state transitions to schedule vertices has the benefit of avoiding flood `onPartitionConsumable` notifications, while there may be operators that take a very long time until they produce a result of PIPELINED shuffle mode. So, I think we could keep both the benefit by relying on both vertex state transitions and `onPartitionConsumable` notifications. 1. DeploymentOption#sendScheduleOrUpdateConsumerMessage set to true only the vertex has PIPELINED produced result partition. 2. Schedule vertices with BLOCKING input result partition using vertex state transition. 3. Schedule vertices with
[GitHub] [flink] zhijiangW edited a comment on issue #8242: [FLINK-6227][network] Introduce the DataConsumptionException for downstream task failure
zhijiangW edited a comment on issue #8242: [FLINK-6227][network] Introduce the DataConsumptionException for downstream task failure URL: https://github.com/apache/flink/pull/8242#issuecomment-489389513 Thanks for your professional reviews and valuable suggestions @tillrohrmann . I think there are mainly two issues to be confirmed: 1 . Whether we need a new special `DataConsumptionException` or why not use existing `PartitionNotFoundException`? In semantic/conceptional aspect, `PartitionNotFoundException` is actually a subtype of `DataConsumptionException`. In functional aspect they provide the same information currently, so I agree to maintain only one type of exception for simplification. That means to use `PartitionNotFoundException` directly or rename it for covering more semantics. 2. In which cases we should report this `PartitionNotFoundException` for JobMaster to restart the producer? Your above mentioned cases are very guidable and I am trying to further specify every case: -> TaskExecutor no longer reachable - TE unavailable during requesting sub partition: Executor shutdown/killed would cause exception in establishing network connection or requesting sub partition, which is covered by `RemoteInputChannel#requestSubpartition` in PR. - TE unavailable during transferring partition data: Executor shutdown/killed would cause network channel inactive, which is also covered by `RemoteInputChannel#onError` in PR. -> TaskExecutor reachable but result partition not available It is covered in `RemoteInputChannel#failPartitionRequest` in PR. -> TaskExecutor reachable, result partition available but sub partition view cannot be created (spilled file has been removed) The producer would response `ErrorResponse` for requesting sub partition, then the consumer would call `RemoteInputChannel#onError` for covering this case. Because this `ErrorResponse` is fatal error, then it would bring all the remote input channels report `DataConsumptionException`. So maybe it is better to wrap `DataConsumptionException` on producer side in this case to avoid affecting all the remote input channels. Besides above three cases, there also exists another cause of partition data corrupt/deleted during transferring, which is covered by `PartitionRequestQueue#exceptionCaught` in PR ATM. Then the consumer would receive `ErrorResponse` to call `RemoteInputChannel#onError`. In this case I ever considered wrapping `DataConsumptionException` directly on producer side in specific `SpilledSubpartitionView#getNextBuffer` to avoid reusing the general `ErrorResponse`, because you might think some internal network exceptions via `ErrorResponse` do not need to restart the producer side. My previous thought was some network exceptions including environment/hardware issues might also be suitable to restart producer in other executors, otherwise the consumer might still encounter the same problem after restarting to consume data from previous executor. So the `ErrorResponse` with fatal property is easy to be reused to represent the partition problem here. Or we could regard network environment/hardware as separate issue to be solved, then wrap the special `DataConsumptionException` in specific views to distinguish with existing `ErrorResponse`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on issue #8242: [FLINK-6227][network] Introduce the DataConsumptionException for downstream task failure
zhijiangW commented on issue #8242: [FLINK-6227][network] Introduce the DataConsumptionException for downstream task failure URL: https://github.com/apache/flink/pull/8242#issuecomment-489389513 Thanks for your professional reviews and valuable suggestions @tillrohrmann . I think there are mainly two issues to be confirmed: 1 . Whether we need a new special `DataConsumptionException` or why not use existing `PartitionNotFoundException`? In semantic/conceptional aspect, `PartitionNotFoundException` is actually a subtype of `DataConsumptionException`. In functional aspect they provide the same information currently, so I agree to maintain only one type of exception for simplification. That means to use `PartitionNotFoundException` directly or rename it for covering more semantics. 2. In which cases we should report this `PartitionNotFoundException` for JobMaster to restart the producer? Your above mentioned cases are very guidable and I am trying to further specify every case: -> TaskExecutor no longer reachable - TE unavailable during requesting sub partition: Executor shutdown/killed would cause exception in establishing network connection or requesting sub partition, which is covered by `RemoteInputChannel#requestSubpartition` in PR. - TE unavailable during transferring partition data: Executor shutdown/killed would cause network channel inactive, which is also covered by `RemoteInputChannel#onError` in PR. -> TaskExecutor reachable but result partition not available It is covered in `RemoteInputChannel#failPartitionRequest` in PR. -> TaskExecutor reachable, result partition available but sub partition view cannot be created (spilled file has been removed) The producer would response `ErrorResponse` for requesting sub partition, then the consumer would call `RemoteInputChannel#onError` for covering this case. Because this `ErrorResponse` is fatal error, then it would bring all the remote input channels report `DataConsumptionException`. So maybe it is better to wrap `DataConsumptionException` on producer side in this case to avoid affecting all the remote input channels. Besides above three cases, there also exists another cause of partition data corrupt/deleted during transferring, which is covered by `PartitionRequestQueue#exceptionCaught` in PR ATM. Then the consumer would receive `ErrorResponse` to call `RemoteInputChannel#onError`. In this case I ever considered wrapping `DataConsumptionException` directly on producer side in specific `SpilledSubpartitionView#getNextBuffer` to avoid reusing the general `ErrorResponse`, because you might think some internal network exceptions via `ErrorResponse` do not need to restart the producer side. My previous thought was some network exceptions including environment/hardware issues might also be suitable to restart producer in other executors, otherwise the consumer might still encounter the same problem after restarting to consume data from previous executor. So the `ErrorResponse` with fatal property is easy to be reused to represent the partition problem here. Or we could regard network environment/hardware as separate issue to be solved, then wrap the special `DataConsumptionException` in specific views to distinguish with existing `ErrorResponse`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW edited a comment on issue #8242: [FLINK-6227][network] Introduce the DataConsumptionException for downstream task failure
zhijiangW edited a comment on issue #8242: [FLINK-6227][network] Introduce the DataConsumptionException for downstream task failure URL: https://github.com/apache/flink/pull/8242#issuecomment-489389513 Thanks for your professional reviews and valuable suggestions @tillrohrmann . I think there are mainly two issues to be confirmed: 1 . Whether we need a new special `DataConsumptionException` or why not use existing `PartitionNotFoundException`? In semantic/conceptional aspect, `PartitionNotFoundException` is actually a subtype of `DataConsumptionException`. In functional aspect they provide the same information currently, so I agree to maintain only one type of exception for simplification. That means to use `PartitionNotFoundException` directly or rename it for covering more semantics. 2. In which cases we should report this `PartitionNotFoundException` for JobMaster to restart the producer? Your above mentioned cases are very guidable and I am trying to further specify every case: -> TaskExecutor no longer reachable - TE unavailable during requesting sub partition: Executor shutdown/killed would cause exception in establishing network connection or requesting sub partition, which is covered by `RemoteInputChannel#requestSubpartition` in PR. - TE unavailable during transferring partition data: Executor shutdown/killed would cause network channel inactive, which is also covered by `RemoteInputChannel#onError` in PR. -> TaskExecutor reachable but result partition not available It is covered in `RemoteInputChannel#failPartitionRequest` in PR. -> TaskExecutor reachable, result partition available but sub partition view cannot be created (spilled file has been removed) The producer would response `ErrorResponse` for requesting sub partition, then the consumer would call `RemoteInputChannel#onError` for covering this case. Because this `ErrorResponse` is fatal error, then it would bring all the remote input channels report `DataConsumptionException`. So maybe it is better to wrap `DataConsumptionException` on producer side in this case to avoid affecting all the remote input channels. Besides above three cases, there also exists another cause of partition data corrupt/deleted during transferring, which is covered by `PartitionRequestQueue#exceptionCaught` in PR ATM. Then the consumer would receive `ErrorResponse` to call `RemoteInputChannel#onError`. In this case I ever considered wrapping `DataConsumptionException` directly on producer side in specific `SpilledSubpartitionView#getNextBuffer` to avoid reusing the general `ErrorResponse`, because you might think some internal network exceptions via `ErrorResponse` do not need to restart the producer side. My previous thought was some network exceptions including environment/hardware issues might also be suitable to restart producer in other executors, otherwise the consumer might still encounter the same problem after restarting to consume data from previous executor. So the `ErrorResponse` with fatal property is easy to be reused to represent the partition problem here. Or we could regard network environment/hardware as separate issue to be solved, then wrap the special `DataConsumptionException` in specific views to distinguish with existing `ErrorResponse`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW edited a comment on issue #8242: [FLINK-6227][network] Introduce the DataConsumptionException for downstream task failure
zhijiangW edited a comment on issue #8242: [FLINK-6227][network] Introduce the DataConsumptionException for downstream task failure URL: https://github.com/apache/flink/pull/8242#issuecomment-489389513 Thanks for your professional reviews and valuable suggestions @tillrohrmann . I think there are mainly two issues to be confirmed: 1 . Whether we need a new special `DataConsumptionException` or why not use existing `PartitionNotFoundException`? In semantic/conceptional aspect, `PartitionNotFoundException` is actually a subtype of `DataConsumptionException`. In functional aspect they provide the same information currently, so I agree to maintain only one type of exception for simplification. That means to use `PartitionNotFoundException` directly or rename it for covering more semantics. 2. In which cases we should report this `PartitionNotFoundException` for JobMaster to restart the producer? Your above mentioned cases are very guidable and I am trying to further specify every case: -> TaskExecutor no longer reachable - TE unavailable during requesting sub partition: Executor shutdown/killed would cause exception in establishing network connection or requesting sub partition, which is covered by `RemoteInputChannel#requestSubpartition` in PR. - TE unavailable during transferring partition data: Executor shutdown/killed would cause network channel inactive, which is also covered by `RemoteInputChannel#onError` in PR. -> TaskExecutor reachable but result partition not available It is covered in `RemoteInputChannel#failPartitionRequest` in PR. -> TaskExecutor reachable, result partition available but sub partition view cannot be created (spilled file has been removed) The producer would response `ErrorResponse` for requesting sub partition, then the consumer would call `RemoteInputChannel#onError` for covering this case. Because this `ErrorResponse` is fatal error, then it would bring all the remote input channels report `DataConsumptionException`. So maybe it is better to wrap `DataConsumptionException` on producer side in this case to avoid affecting all the remote input channels. Besides above three cases, there also exists another cause of partition data corrupt/deleted during transferring, which is covered by `PartitionRequestQueue#exceptionCaught` in PR ATM. Then the consumer would receive `ErrorResponse` to call `RemoteInputChannel#onError`. In this case I ever considered wrapping `DataConsumptionException` directly on producer side in specific `SpilledSubpartitionView#getNextBuffer` to avoid reusing the general `ErrorResponse`, because you might think some internal network exceptions via `ErrorResponse` do not need to restart the producer side. My previous thought was some network exceptions including environment/hardware issues might also be suitable to restart producer in other executors, otherwise the consumer might still encounter the same problem after restarting to consume data from previous executor. So the `ErrorResponse` with fatal property is easy to be reused to represent the partition problem here. Or we could regard network environment/hardware as separate issue to be solved, then wrap the special `DataConsumptionException` in specific views to distinguish with existing `ErrorResponse`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-12348) Use TableConfig in api module to replace TableConfig in blink-planner module.
[ https://issues.apache.org/jira/browse/FLINK-12348?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16833212#comment-16833212 ] Jark Wu commented on FLINK-12348: - Hi [~twalthr], I find that the updates to TableConfig mentioned in FLIP-32 is mainly for a builder pattern. But the most important change Blink introduced is making TableConfig configurable using plain key-value pairs (similar to runtime options in {{flink-conf.yaml}}). So that we can setup a cluster or a job using yaml files, for example: {code:java} sql.timeZone: UTC sql.codegen.length.max: 64000 {code} In order to support this feature, we need a member {{org.apache.flink.configuration.Configuration}} in {{TableConfig}}. And move {{TableConfigOptions}} into api module. What do you think? [~twalthr] [~hequn8128] [~dawidwys] > Use TableConfig in api module to replace TableConfig in blink-planner module. > - > > Key: FLINK-12348 > URL: https://issues.apache.org/jira/browse/FLINK-12348 > Project: Flink > Issue Type: Task > Components: Table SQL / API >Reporter: Jing Zhang >Assignee: Jing Zhang >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > Since TableConfig already moved to API module in > [FLINK-11067|https://issues.apache.org/jira/browse/FLINK-11067], TableConfig > in blink-planner-module should not exist anymore. The issue aims to remove > the TableConfig in blink-planner-module, use TableConfig in API module > instead. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11820) SimpleStringSchema handle message record which value is null
[ https://issues.apache.org/jira/browse/FLINK-11820?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yu Li updated FLINK-11820: -- Affects Version/s: 1.8.0 Fix Version/s: (was: 1.7.3) Issue Type: Bug (was: Improvement) > SimpleStringSchema handle message record which value is null > > > Key: FLINK-11820 > URL: https://issues.apache.org/jira/browse/FLINK-11820 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.7.2, 1.8.0 >Reporter: lamber-ken >Assignee: lamber-ken >Priority: Critical > > when kafka msg queue contains some records which value is null, > flink-kafka-connector can't process these records. > for example, msg queue like bellow. > |msg|{color:#ff}null{color}|msg|msg|msg|msg| > for normal, use +SimpleStringSchema+ to process msg queue data > {code:java} > env.addSource(new FlinkKafkaConsumer010("topic", new SimpleStringSchema(), > properties)); > {code} > but, will get NullPointerException > {code:java} > java.lang.NullPointerException > at java.lang.String.(String.java:515) > at > org.apache.flink.api.common.serialization.SimpleStringSchema.deserialize(SimpleStringSchema.java:75) > at > org.apache.flink.api.common.serialization.SimpleStringSchema.deserialize(SimpleStringSchema.java:36) > {code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] carp84 commented on issue #7987: [FLINK-11820][serialization] SimpleStringSchema handle message record which value is null
carp84 commented on issue #7987: [FLINK-11820][serialization] SimpleStringSchema handle message record which value is null URL: https://github.com/apache/flink/pull/7987#issuecomment-489388048 @aljoscha @tzulitai Mind take a look here? Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Resolved] (FLINK-12400) NullpointerException using SimpleStringSchema with Kafka
[ https://issues.apache.org/jira/browse/FLINK-12400?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yu Li resolved FLINK-12400. --- Resolution: Duplicate > NullpointerException using SimpleStringSchema with Kafka > > > Key: FLINK-12400 > URL: https://issues.apache.org/jira/browse/FLINK-12400 > Project: Flink > Issue Type: Improvement > Components: API / Type Serialization System >Affects Versions: 1.7.2, 1.8.0 > Environment: Flink 1.7.2 job on 1.8 cluster > Kafka 0.10 with a topic in log-compaction >Reporter: Pierre Zemb >Assignee: Pierre Zemb >Priority: Minor > > Hi! > Yesterday, we saw a strange behavior with our Flink job and Kafka. We are > consuming a Kafka topic setup in > [log-compaction|https://kafka.apache.org/documentation/#compaction] mode. As > such, sending a message with a null payload acts like a tombstone. > We are consuming Kafka like this: > {code:java} > new FlinkKafkaConsumer010<> ("topic", new SimpleStringSchema(), > this.kafkaProperties) > {code} > When we sent the message, job failed because of a NullPointerException > [here|https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/serialization/SimpleStringSchema.java#L75]. > `byte[] message` was null, causing the NPE. > We forked the class and added a basic nullable check, returning null if so. > It fixed our issue. > Should we add it to the main class? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy URL: https://github.com/apache/flink/pull/8309#discussion_r281002555 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategyTest.java ## @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.api.common.InputDependencyConstraint; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.jobgraph.DistributionPattern; +import org.apache.flink.runtime.jobgraph.IntermediateDataSet; +import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +/** + * Unit test for {@link LazyFromSourcesSchedulingStrategy}. + */ +public class LazyFromSourcesSchedulingStrategyTest extends TestLogger { + + /** +* Tests that when start scheduling lazy from sources scheduling strategy will start input vertices in scheduling topology. +*/ + @Test + public void testStartScheduling() { + JobVertex jobVertex1 = new JobVertex("vertex#1"); + JobVertex jobVertex2 = new JobVertex("vertex#2"); + JobGraph graph = new JobGraph(jobVertex1, jobVertex2); + jobVertex2.connectNewDataSetAsInput(jobVertex1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING); + + TestingSchedulingTopology schedulingTopology = new TestingSchedulingTopology(); + + for (int i = 0; i < 3; i++) { + schedulingTopology.addSchedulingVertex(new TestingSchedulingVertex(jobVertex1.getID(), i)); + schedulingTopology.addSchedulingVertex(new TestingSchedulingVertex(jobVertex2.getID(), i)); + } + + TestingSchedulerOperation testingSchedulerOperation = new TestingSchedulerOperation(); + LazyFromSourcesSchedulingStrategy schedulingStrategy = new LazyFromSourcesSchedulingStrategy( + testingSchedulerOperation, + schedulingTopology, + graph); + + schedulingStrategy.startScheduling(); + + assertEquals(3, testingSchedulerOperation.getScheduledVertices().size()); + } + + /** +* Tests that when on execution state change will start available downstream vertices. +* vertex#0vertex#1 +* \ / +*\ / +* \ / +* (BLOCKING, ALL) +* vertex#3 vertex#2 +* \/ +*\ / +* \/ +* (BLOCKING, ANY) +* vertex#4 +*| +*| +*| +*(PIPELINED) +* vertex#5 +*/ + @Test + public void testOnExecutionStateChange() { + TestingSchedulingTopology schedulingTopology = new TestingSchedulingTopology(); + JobVertex[] jobVertices = new JobVertex[6]; + + for (int i = 0; i < 6; i++) { + jobVertices[i] = new JobVertex("vertex#" + i); + } + + jobVertices[3].connectNewDataSetAsInput(jobVertices[0], DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING); + jobVertices[3].connectNewDataSetAsInput(jobVertices[1], DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING); + jobVertices[4].connectNewDataSetAsInput(jobVertices[3], DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
[GitHub] [flink] WeiZhong94 commented on a change in pull request #8267: [FLINK-12311][table][python] Add base python framework and Add Scan, Projection, and Filter operator support
WeiZhong94 commented on a change in pull request #8267: [FLINK-12311][table][python] Add base python framework and Add Scan, Projection, and Filter operator support URL: https://github.com/apache/flink/pull/8267#discussion_r281002536 ## File path: flink-python/pyflink/table/table.py ## @@ -0,0 +1,117 @@ + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from py4j.java_gateway import get_method + +__all__ = ['Table'] + + +class Table(object): + +""" +A :class:`Table` is the core component of the Table API. +Similar to how the batch and streaming APIs have DataSet and DataStream, +the Table API is built around :class:`Table`. + +Use the methods of :class:`Table` to transform data. + +Example: +:: +>>> t_config = TableConfig.Builder().as_streaming_execution().set_parallelism(1).build() +>>> t_env = TableEnvironment.get_table_environment(t_config) +>>> ... +>>> t = t_env.scan("source") +>>> t.select(...) +... +>>> t.insert_into("print") +>>> t_env.execute() + +Operations such as :func:`~pyflink.table.Table.join`, :func:`~pyflink.table.Table.select`, +:func:`~pyflink.table.Table.where` and :func:`~pyflink.table.Table.group_by` +take arguments in an expression string. Please refer to the documentation for +the expression syntax. +""" + +def __init__(self, j_table): +self._j_table = j_table + +def select(self, fields): +""" +Performs a selection operation. Similar to a SQL SELECT statement. The field expressions +can contain complex expressions. + +Example: +:: +>>> t = tab.select("key, value + 'hello'") Review comment: For this point it makes sense. I'll adjust these document to be consistent with scala documents. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy URL: https://github.com/apache/flink/pull/8309#discussion_r281002374 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategyTest.java ## @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.api.common.InputDependencyConstraint; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.jobgraph.DistributionPattern; +import org.apache.flink.runtime.jobgraph.IntermediateDataSet; +import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +/** + * Unit test for {@link LazyFromSourcesSchedulingStrategy}. + */ +public class LazyFromSourcesSchedulingStrategyTest extends TestLogger { + + /** +* Tests that when start scheduling lazy from sources scheduling strategy will start input vertices in scheduling topology. +*/ + @Test + public void testStartScheduling() { + JobVertex jobVertex1 = new JobVertex("vertex#1"); + JobVertex jobVertex2 = new JobVertex("vertex#2"); + JobGraph graph = new JobGraph(jobVertex1, jobVertex2); + jobVertex2.connectNewDataSetAsInput(jobVertex1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING); + + TestingSchedulingTopology schedulingTopology = new TestingSchedulingTopology(); + + for (int i = 0; i < 3; i++) { + schedulingTopology.addSchedulingVertex(new TestingSchedulingVertex(jobVertex1.getID(), i)); + schedulingTopology.addSchedulingVertex(new TestingSchedulingVertex(jobVertex2.getID(), i)); + } + + TestingSchedulerOperation testingSchedulerOperation = new TestingSchedulerOperation(); + LazyFromSourcesSchedulingStrategy schedulingStrategy = new LazyFromSourcesSchedulingStrategy( + testingSchedulerOperation, + schedulingTopology, + graph); + + schedulingStrategy.startScheduling(); + + assertEquals(3, testingSchedulerOperation.getScheduledVertices().size()); + } + + /** +* Tests that when on execution state change will start available downstream vertices. +* vertex#0vertex#1 +* \ / +*\ / +* \ / +* (BLOCKING, ALL) +* vertex#3 vertex#2 +* \/ +*\ / +* \/ +* (BLOCKING, ANY) +* vertex#4 +*| +*| +*| +*(PIPELINED) +* vertex#5 +*/ + @Test + public void testOnExecutionStateChange() { + TestingSchedulingTopology schedulingTopology = new TestingSchedulingTopology(); + JobVertex[] jobVertices = new JobVertex[6]; + + for (int i = 0; i < 6; i++) { + jobVertices[i] = new JobVertex("vertex#" + i); + } + + jobVertices[3].connectNewDataSetAsInput(jobVertices[0], DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING); + jobVertices[3].connectNewDataSetAsInput(jobVertices[1], DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING); + jobVertices[4].connectNewDataSetAsInput(jobVertices[3], DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy URL: https://github.com/apache/flink/pull/8309#discussion_r281002364 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategyTest.java ## @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.api.common.InputDependencyConstraint; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.jobgraph.DistributionPattern; +import org.apache.flink.runtime.jobgraph.IntermediateDataSet; +import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +/** + * Unit test for {@link LazyFromSourcesSchedulingStrategy}. + */ +public class LazyFromSourcesSchedulingStrategyTest extends TestLogger { + + /** +* Tests that when start scheduling lazy from sources scheduling strategy will start input vertices in scheduling topology. +*/ + @Test + public void testStartScheduling() { + JobVertex jobVertex1 = new JobVertex("vertex#1"); + JobVertex jobVertex2 = new JobVertex("vertex#2"); + JobGraph graph = new JobGraph(jobVertex1, jobVertex2); + jobVertex2.connectNewDataSetAsInput(jobVertex1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING); + + TestingSchedulingTopology schedulingTopology = new TestingSchedulingTopology(); + + for (int i = 0; i < 3; i++) { + schedulingTopology.addSchedulingVertex(new TestingSchedulingVertex(jobVertex1.getID(), i)); + schedulingTopology.addSchedulingVertex(new TestingSchedulingVertex(jobVertex2.getID(), i)); + } + + TestingSchedulerOperation testingSchedulerOperation = new TestingSchedulerOperation(); + LazyFromSourcesSchedulingStrategy schedulingStrategy = new LazyFromSourcesSchedulingStrategy( + testingSchedulerOperation, + schedulingTopology, + graph); + + schedulingStrategy.startScheduling(); + + assertEquals(3, testingSchedulerOperation.getScheduledVertices().size()); + } + + /** +* Tests that when on execution state change will start available downstream vertices. +* vertex#0vertex#1 +* \ / +*\ / +* \ / +* (BLOCKING, ALL) +* vertex#3 vertex#2 +* \/ +*\ / +* \/ +* (BLOCKING, ANY) +* vertex#4 +*| +*| +*| +*(PIPELINED) +* vertex#5 +*/ + @Test + public void testOnExecutionStateChange() { + TestingSchedulingTopology schedulingTopology = new TestingSchedulingTopology(); Review comment: OK This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy URL: https://github.com/apache/flink/pull/8309#discussion_r281002357 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java ## @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.api.common.InputDependencyConstraint; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.jobgraph.IntermediateDataSet; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.scheduler.DeploymentOption; +import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption; +import org.apache.flink.runtime.scheduler.SchedulerOperations; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@link SchedulingStrategy} instance for batch job which schedule vertices when input data are ready. + */ +public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy { + + private final SchedulerOperations schedulerOperations; + + private final SchedulingTopology schedulingTopology; + + private final DeploymentOption deploymentOption = new DeploymentOption(true); + + private final JobGraph jobGraph; + + private final Map intermediateDataSets; + + public LazyFromSourcesSchedulingStrategy( + SchedulerOperations schedulerOperations, + SchedulingTopology schedulingTopology, + JobGraph jobGraph) { + this.schedulerOperations = checkNotNull(schedulerOperations); + this.schedulingTopology = checkNotNull(schedulingTopology); + this.jobGraph = checkNotNull(jobGraph); + this.intermediateDataSets = new HashMap<>(); + } + + @Override + public void startScheduling() { + List executionVertexDeploymentOptions = new LinkedList<>(); + for (SchedulingVertex schedulingVertex : schedulingTopology.getVertices()) { + if (jobGraph.findVertexByID(schedulingVertex.getJobVertexId()).isInputVertex()) { + // schedule vertices without consumed result partition + executionVertexDeploymentOptions.add( + new ExecutionVertexDeploymentOption(schedulingVertex.getId(), deploymentOption)); + } + + Collection partitions = schedulingVertex.getProducedResultPartitions(); + if (partitions != null) { + for (SchedulingResultPartition srp : partitions) { + SchedulingIntermediateDataSet sid = intermediateDataSets.computeIfAbsent(srp.getResultId(), + (key) -> new SchedulingIntermediateDataSet()); + sid.increaseProducerCnt(); + } + } + } + if (!executionVertexDeploymentOptions.isEmpty()) { + schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions); + } + } + + @Override + public void restartTasks(Set verticesNeedingRestart) { + List executionVertexDeploymentOptions = new ArrayList<>(verticesNeedingRestart.size()); + for (ExecutionVertexID executionVertexID : verticesNeedingRestart) { + executionVertexDeploymentOptions.add( + new ExecutionVertexDeploymentOption(executionVertexID, deploymentOption));
[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy URL: https://github.com/apache/flink/pull/8309#discussion_r281002317 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java ## @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.api.common.InputDependencyConstraint; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.jobgraph.IntermediateDataSet; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.scheduler.DeploymentOption; +import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption; +import org.apache.flink.runtime.scheduler.SchedulerOperations; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@link SchedulingStrategy} instance for batch job which schedule vertices when input data are ready. + */ +public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy { + + private final SchedulerOperations schedulerOperations; + + private final SchedulingTopology schedulingTopology; + + private final DeploymentOption deploymentOption = new DeploymentOption(true); + + private final JobGraph jobGraph; + + private final Map intermediateDataSets; + + public LazyFromSourcesSchedulingStrategy( + SchedulerOperations schedulerOperations, + SchedulingTopology schedulingTopology, + JobGraph jobGraph) { + this.schedulerOperations = checkNotNull(schedulerOperations); + this.schedulingTopology = checkNotNull(schedulingTopology); + this.jobGraph = checkNotNull(jobGraph); + this.intermediateDataSets = new HashMap<>(); + } + + @Override + public void startScheduling() { + List executionVertexDeploymentOptions = new LinkedList<>(); + for (SchedulingVertex schedulingVertex : schedulingTopology.getVertices()) { + if (jobGraph.findVertexByID(schedulingVertex.getJobVertexId()).isInputVertex()) { + // schedule vertices without consumed result partition + executionVertexDeploymentOptions.add( + new ExecutionVertexDeploymentOption(schedulingVertex.getId(), deploymentOption)); + } + + Collection partitions = schedulingVertex.getProducedResultPartitions(); + if (partitions != null) { + for (SchedulingResultPartition srp : partitions) { + SchedulingIntermediateDataSet sid = intermediateDataSets.computeIfAbsent(srp.getResultId(), + (key) -> new SchedulingIntermediateDataSet()); + sid.increaseProducerCnt(); + } + } + } + if (!executionVertexDeploymentOptions.isEmpty()) { + schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions); + } + } + + @Override + public void restartTasks(Set verticesNeedingRestart) { + List executionVertexDeploymentOptions = new ArrayList<>(verticesNeedingRestart.size()); + for (ExecutionVertexID executionVertexID : verticesNeedingRestart) { + executionVertexDeploymentOptions.add( + new ExecutionVertexDeploymentOption(executionVertexID, deploymentOption));
[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy URL: https://github.com/apache/flink/pull/8309#discussion_r281002177 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java ## @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.api.common.InputDependencyConstraint; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.jobgraph.IntermediateDataSet; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.scheduler.DeploymentOption; +import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption; +import org.apache.flink.runtime.scheduler.SchedulerOperations; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@link SchedulingStrategy} instance for batch job which schedule vertices when input data are ready. + */ +public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy { + + private final SchedulerOperations schedulerOperations; + + private final SchedulingTopology schedulingTopology; + + private final DeploymentOption deploymentOption = new DeploymentOption(true); + + private final JobGraph jobGraph; + + private final Map intermediateDataSets; + + public LazyFromSourcesSchedulingStrategy( + SchedulerOperations schedulerOperations, + SchedulingTopology schedulingTopology, + JobGraph jobGraph) { + this.schedulerOperations = checkNotNull(schedulerOperations); + this.schedulingTopology = checkNotNull(schedulingTopology); + this.jobGraph = checkNotNull(jobGraph); + this.intermediateDataSets = new HashMap<>(); + } + + @Override + public void startScheduling() { + List executionVertexDeploymentOptions = new LinkedList<>(); + for (SchedulingVertex schedulingVertex : schedulingTopology.getVertices()) { + if (jobGraph.findVertexByID(schedulingVertex.getJobVertexId()).isInputVertex()) { + // schedule vertices without consumed result partition + executionVertexDeploymentOptions.add( + new ExecutionVertexDeploymentOption(schedulingVertex.getId(), deploymentOption)); + } + + Collection partitions = schedulingVertex.getProducedResultPartitions(); + if (partitions != null) { + for (SchedulingResultPartition srp : partitions) { + SchedulingIntermediateDataSet sid = intermediateDataSets.computeIfAbsent(srp.getResultId(), + (key) -> new SchedulingIntermediateDataSet()); + sid.increaseProducerCnt(); + } + } + } + if (!executionVertexDeploymentOptions.isEmpty()) { + schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions); + } + } + + @Override + public void restartTasks(Set verticesNeedingRestart) { + List executionVertexDeploymentOptions = new ArrayList<>(verticesNeedingRestart.size()); + for (ExecutionVertexID executionVertexID : verticesNeedingRestart) { + executionVertexDeploymentOptions.add( + new ExecutionVertexDeploymentOption(executionVertexID, deploymentOption));
[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy URL: https://github.com/apache/flink/pull/8309#discussion_r281002120 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java ## @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.api.common.InputDependencyConstraint; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.jobgraph.IntermediateDataSet; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.scheduler.DeploymentOption; +import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption; +import org.apache.flink.runtime.scheduler.SchedulerOperations; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@link SchedulingStrategy} instance for batch job which schedule vertices when input data are ready. + */ +public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy { + + private final SchedulerOperations schedulerOperations; + + private final SchedulingTopology schedulingTopology; + + private final DeploymentOption deploymentOption = new DeploymentOption(true); + + private final JobGraph jobGraph; + + private final Map intermediateDataSets; + + public LazyFromSourcesSchedulingStrategy( + SchedulerOperations schedulerOperations, + SchedulingTopology schedulingTopology, + JobGraph jobGraph) { + this.schedulerOperations = checkNotNull(schedulerOperations); + this.schedulingTopology = checkNotNull(schedulingTopology); + this.jobGraph = checkNotNull(jobGraph); + this.intermediateDataSets = new HashMap<>(); + } + + @Override + public void startScheduling() { + List executionVertexDeploymentOptions = new LinkedList<>(); + for (SchedulingVertex schedulingVertex : schedulingTopology.getVertices()) { + if (jobGraph.findVertexByID(schedulingVertex.getJobVertexId()).isInputVertex()) { + // schedule vertices without consumed result partition + executionVertexDeploymentOptions.add( + new ExecutionVertexDeploymentOption(schedulingVertex.getId(), deploymentOption)); + } + + Collection partitions = schedulingVertex.getProducedResultPartitions(); + if (partitions != null) { + for (SchedulingResultPartition srp : partitions) { + SchedulingIntermediateDataSet sid = intermediateDataSets.computeIfAbsent(srp.getResultId(), + (key) -> new SchedulingIntermediateDataSet()); + sid.increaseProducerCnt(); + } + } + } + if (!executionVertexDeploymentOptions.isEmpty()) { + schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions); + } + } + + @Override + public void restartTasks(Set verticesNeedingRestart) { + List executionVertexDeploymentOptions = new ArrayList<>(verticesNeedingRestart.size()); + for (ExecutionVertexID executionVertexID : verticesNeedingRestart) { + executionVertexDeploymentOptions.add( + new ExecutionVertexDeploymentOption(executionVertexID, deploymentOption));
[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy URL: https://github.com/apache/flink/pull/8309#discussion_r281002108 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingResultPartition.java ## @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; + +import java.util.Collection; + +/** + * Simple implementation of {@link SchedulingResultPartition} for testing. + */ +public class TestingSchedulingResultPartition implements SchedulingResultPartition { + private final IntermediateDataSetID intermediateDataSetID; + + private final IntermediateResultPartitionID intermediateResultPartitionID; + + private final ResultPartitionType partitionType; + + private final SchedulingVertex producer; + + private Collection consumers; Review comment: OK This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy URL: https://github.com/apache/flink/pull/8309#discussion_r281002116 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java ## @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.api.common.InputDependencyConstraint; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.jobgraph.IntermediateDataSet; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.scheduler.DeploymentOption; +import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption; +import org.apache.flink.runtime.scheduler.SchedulerOperations; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@link SchedulingStrategy} instance for batch job which schedule vertices when input data are ready. + */ +public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy { + + private final SchedulerOperations schedulerOperations; + + private final SchedulingTopology schedulingTopology; + + private final DeploymentOption deploymentOption = new DeploymentOption(true); + + private final JobGraph jobGraph; + + private final Map intermediateDataSets; + + public LazyFromSourcesSchedulingStrategy( + SchedulerOperations schedulerOperations, + SchedulingTopology schedulingTopology, + JobGraph jobGraph) { + this.schedulerOperations = checkNotNull(schedulerOperations); + this.schedulingTopology = checkNotNull(schedulingTopology); + this.jobGraph = checkNotNull(jobGraph); + this.intermediateDataSets = new HashMap<>(); + } + + @Override + public void startScheduling() { + List executionVertexDeploymentOptions = new LinkedList<>(); + for (SchedulingVertex schedulingVertex : schedulingTopology.getVertices()) { + if (jobGraph.findVertexByID(schedulingVertex.getJobVertexId()).isInputVertex()) { + // schedule vertices without consumed result partition + executionVertexDeploymentOptions.add( + new ExecutionVertexDeploymentOption(schedulingVertex.getId(), deploymentOption)); + } + + Collection partitions = schedulingVertex.getProducedResultPartitions(); + if (partitions != null) { Review comment: OK This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy URL: https://github.com/apache/flink/pull/8309#discussion_r281002114 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java ## @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.api.common.InputDependencyConstraint; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.jobgraph.IntermediateDataSet; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.scheduler.DeploymentOption; +import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption; +import org.apache.flink.runtime.scheduler.SchedulerOperations; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@link SchedulingStrategy} instance for batch job which schedule vertices when input data are ready. + */ +public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy { + + private final SchedulerOperations schedulerOperations; + + private final SchedulingTopology schedulingTopology; + + private final DeploymentOption deploymentOption = new DeploymentOption(true); + + private final JobGraph jobGraph; + + private final Map intermediateDataSets; + + public LazyFromSourcesSchedulingStrategy( + SchedulerOperations schedulerOperations, + SchedulingTopology schedulingTopology, + JobGraph jobGraph) { + this.schedulerOperations = checkNotNull(schedulerOperations); + this.schedulingTopology = checkNotNull(schedulingTopology); + this.jobGraph = checkNotNull(jobGraph); + this.intermediateDataSets = new HashMap<>(); + } + + @Override + public void startScheduling() { + List executionVertexDeploymentOptions = new LinkedList<>(); + for (SchedulingVertex schedulingVertex : schedulingTopology.getVertices()) { + if (jobGraph.findVertexByID(schedulingVertex.getJobVertexId()).isInputVertex()) { + // schedule vertices without consumed result partition + executionVertexDeploymentOptions.add( + new ExecutionVertexDeploymentOption(schedulingVertex.getId(), deploymentOption)); + } + + Collection partitions = schedulingVertex.getProducedResultPartitions(); + if (partitions != null) { + for (SchedulingResultPartition srp : partitions) { + SchedulingIntermediateDataSet sid = intermediateDataSets.computeIfAbsent(srp.getResultId(), + (key) -> new SchedulingIntermediateDataSet()); + sid.increaseProducerCnt(); + } + } + } + if (!executionVertexDeploymentOptions.isEmpty()) { Review comment: OK This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy URL: https://github.com/apache/flink/pull/8309#discussion_r281002106 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingResultPartition.java ## @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; + +import java.util.Collection; + +/** + * Simple implementation of {@link SchedulingResultPartition} for testing. + */ +public class TestingSchedulingResultPartition implements SchedulingResultPartition { + private final IntermediateDataSetID intermediateDataSetID; + + private final IntermediateResultPartitionID intermediateResultPartitionID; + + private final ResultPartitionType partitionType; + + private final SchedulingVertex producer; + + private Collection consumers; + + TestingSchedulingResultPartition(IntermediateDataSetID dataSetID, + IntermediateResultPartitionID partitionID, ResultPartitionType type, SchedulingVertex producer, + Collection consumers) { + this.intermediateDataSetID = dataSetID; + this.intermediateResultPartitionID = partitionID; + this.partitionType = type; + this.producer = producer; + this.consumers = consumers; + } + + @Override + public IntermediateResultPartitionID getId() { + return intermediateResultPartitionID; + } + + @Override + public IntermediateDataSetID getResultId() { + return intermediateDataSetID; + } + + @Override + public ResultPartitionType getPartitionType() { + return partitionType; + } + + @Override + public ResultPartitionState getState() { + return ResultPartitionState.DONE; + } + + @Override + public SchedulingVertex getProducer() { + return producer; + } + + @Override + public Collection getConsumers() { + return consumers; + } + + public void setConsumers(Collection consumers) { + this.consumers = consumers; Review comment: OK This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy URL: https://github.com/apache/flink/pull/8309#discussion_r281002110 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java ## @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.api.common.InputDependencyConstraint; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.jobgraph.IntermediateDataSet; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.scheduler.DeploymentOption; +import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption; +import org.apache.flink.runtime.scheduler.SchedulerOperations; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@link SchedulingStrategy} instance for batch job which schedule vertices when input data are ready. + */ +public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy { + + private final SchedulerOperations schedulerOperations; + + private final SchedulingTopology schedulingTopology; + + private final DeploymentOption deploymentOption = new DeploymentOption(true); + + private final JobGraph jobGraph; + + private final Map intermediateDataSets; + + public LazyFromSourcesSchedulingStrategy( + SchedulerOperations schedulerOperations, + SchedulingTopology schedulingTopology, + JobGraph jobGraph) { + this.schedulerOperations = checkNotNull(schedulerOperations); + this.schedulingTopology = checkNotNull(schedulingTopology); + this.jobGraph = checkNotNull(jobGraph); + this.intermediateDataSets = new HashMap<>(); + } + + @Override + public void startScheduling() { + List executionVertexDeploymentOptions = new LinkedList<>(); + for (SchedulingVertex schedulingVertex : schedulingTopology.getVertices()) { + if (jobGraph.findVertexByID(schedulingVertex.getJobVertexId()).isInputVertex()) { + // schedule vertices without consumed result partition + executionVertexDeploymentOptions.add( + new ExecutionVertexDeploymentOption(schedulingVertex.getId(), deploymentOption)); + } + + Collection partitions = schedulingVertex.getProducedResultPartitions(); + if (partitions != null) { + for (SchedulingResultPartition srp : partitions) { + SchedulingIntermediateDataSet sid = intermediateDataSets.computeIfAbsent(srp.getResultId(), + (key) -> new SchedulingIntermediateDataSet()); + sid.increaseProducerCnt(); + } + } + } + if (!executionVertexDeploymentOptions.isEmpty()) { + schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions); + } + } + + @Override + public void restartTasks(Set verticesNeedingRestart) { + List executionVertexDeploymentOptions = new ArrayList<>(verticesNeedingRestart.size()); + for (ExecutionVertexID executionVertexID : verticesNeedingRestart) { + executionVertexDeploymentOptions.add( + new ExecutionVertexDeploymentOption(executionVertexID, deploymentOption));
[GitHub] [flink] sunjincheng121 commented on a change in pull request #8267: [FLINK-12311][table][python] Add base python framework and Add Scan, Projection, and Filter operator support
sunjincheng121 commented on a change in pull request #8267: [FLINK-12311][table][python] Add base python framework and Add Scan, Projection, and Filter operator support URL: https://github.com/apache/flink/pull/8267#discussion_r281002095 ## File path: flink-dist/src/main/flink-bin/bin/pyflink2.sh ## @@ -0,0 +1,48 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +# = +bin=`dirname "$0"` +bin=`cd "$bin"; pwd` + +. "$bin"/config.sh + +FLINK_CLASSPATH=`constructFlinkClassPath` + +ARGS=() + +while [[ $# -gt 0 ]] +do +key="$1" +case $key in +-c|--class) +DRIVER=$2 +shift +shift +;; +*) + ARGS+=("$1") + shift + ;; +esac +done + +PYTHON_JAR_PATH=`echo "$FLINK_ROOT_DIR"/opt/flink-python-*.jar` +TABLE_JAR_PATH=`echo "$FLINK_ROOT_DIR"/opt/flink-table*.jar` +exec $JAVA_RUN $JVM_ARGS -cp ${FLINK_CLASSPATH}:${TABLE_JAR_PATH}:${PYTHON_JAR_PATH} ${DRIVER} ${ARGS[@]} Review comment: Make sense, if we change the` pyflink2.sh` to `pyflink-gateway-server.sh` which user never use it directly. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy URL: https://github.com/apache/flink/pull/8309#discussion_r281001965 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java ## @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.api.common.InputDependencyConstraint; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.jobgraph.IntermediateDataSet; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.scheduler.DeploymentOption; +import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption; +import org.apache.flink.runtime.scheduler.SchedulerOperations; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@link SchedulingStrategy} instance for batch job which schedule vertices when input data are ready. + */ +public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy { + + private final SchedulerOperations schedulerOperations; + + private final SchedulingTopology schedulingTopology; + + private final DeploymentOption deploymentOption = new DeploymentOption(true); + + private final JobGraph jobGraph; + + private final Map intermediateDataSets; + + public LazyFromSourcesSchedulingStrategy( + SchedulerOperations schedulerOperations, + SchedulingTopology schedulingTopology, + JobGraph jobGraph) { + this.schedulerOperations = checkNotNull(schedulerOperations); + this.schedulingTopology = checkNotNull(schedulingTopology); + this.jobGraph = checkNotNull(jobGraph); + this.intermediateDataSets = new HashMap<>(); + } + + @Override + public void startScheduling() { + List executionVertexDeploymentOptions = new LinkedList<>(); + for (SchedulingVertex schedulingVertex : schedulingTopology.getVertices()) { + if (jobGraph.findVertexByID(schedulingVertex.getJobVertexId()).isInputVertex()) { + // schedule vertices without consumed result partition + executionVertexDeploymentOptions.add( + new ExecutionVertexDeploymentOption(schedulingVertex.getId(), deploymentOption)); + } + + Collection partitions = schedulingVertex.getProducedResultPartitions(); + if (partitions != null) { + for (SchedulingResultPartition srp : partitions) { + SchedulingIntermediateDataSet sid = intermediateDataSets.computeIfAbsent(srp.getResultId(), + (key) -> new SchedulingIntermediateDataSet()); + sid.increaseProducerCnt(); + } + } + } + if (!executionVertexDeploymentOptions.isEmpty()) { + schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions); + } + } + + @Override + public void restartTasks(Set verticesNeedingRestart) { + List executionVertexDeploymentOptions = new ArrayList<>(verticesNeedingRestart.size()); + for (ExecutionVertexID executionVertexID : verticesNeedingRestart) { + executionVertexDeploymentOptions.add( + new ExecutionVertexDeploymentOption(executionVertexID, deploymentOption));
[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy URL: https://github.com/apache/flink/pull/8309#discussion_r281001882 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java ## @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.api.common.InputDependencyConstraint; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.jobgraph.IntermediateDataSet; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.scheduler.DeploymentOption; +import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption; +import org.apache.flink.runtime.scheduler.SchedulerOperations; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@link SchedulingStrategy} instance for batch job which schedule vertices when input data are ready. + */ +public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy { + + private final SchedulerOperations schedulerOperations; + + private final SchedulingTopology schedulingTopology; + + private final DeploymentOption deploymentOption = new DeploymentOption(true); + + private final JobGraph jobGraph; + + private final Map intermediateDataSets; + + public LazyFromSourcesSchedulingStrategy( + SchedulerOperations schedulerOperations, + SchedulingTopology schedulingTopology, + JobGraph jobGraph) { + this.schedulerOperations = checkNotNull(schedulerOperations); + this.schedulingTopology = checkNotNull(schedulingTopology); + this.jobGraph = checkNotNull(jobGraph); + this.intermediateDataSets = new HashMap<>(); + } + + @Override + public void startScheduling() { + List executionVertexDeploymentOptions = new LinkedList<>(); + for (SchedulingVertex schedulingVertex : schedulingTopology.getVertices()) { + if (jobGraph.findVertexByID(schedulingVertex.getJobVertexId()).isInputVertex()) { Review comment: Yes, `schedulingVertex.getConsumedResultPartitions().isEmpty()` would work. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy URL: https://github.com/apache/flink/pull/8309#discussion_r281001867 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java ## @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.api.common.InputDependencyConstraint; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.jobgraph.IntermediateDataSet; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.scheduler.DeploymentOption; +import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption; +import org.apache.flink.runtime.scheduler.SchedulerOperations; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@link SchedulingStrategy} instance for batch job which schedule vertices when input data are ready. + */ +public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy { + + private final SchedulerOperations schedulerOperations; + + private final SchedulingTopology schedulingTopology; + + private final DeploymentOption deploymentOption = new DeploymentOption(true); + + private final JobGraph jobGraph; + + private final Map intermediateDataSets; + + public LazyFromSourcesSchedulingStrategy( + SchedulerOperations schedulerOperations, + SchedulingTopology schedulingTopology, + JobGraph jobGraph) { + this.schedulerOperations = checkNotNull(schedulerOperations); + this.schedulingTopology = checkNotNull(schedulingTopology); + this.jobGraph = checkNotNull(jobGraph); + this.intermediateDataSets = new HashMap<>(); + } + + @Override + public void startScheduling() { + List executionVertexDeploymentOptions = new LinkedList<>(); + for (SchedulingVertex schedulingVertex : schedulingTopology.getVertices()) { + if (jobGraph.findVertexByID(schedulingVertex.getJobVertexId()).isInputVertex()) { + // schedule vertices without consumed result partition + executionVertexDeploymentOptions.add( + new ExecutionVertexDeploymentOption(schedulingVertex.getId(), deploymentOption)); + } + + Collection partitions = schedulingVertex.getProducedResultPartitions(); + if (partitions != null) { + for (SchedulingResultPartition srp : partitions) { + SchedulingIntermediateDataSet sid = intermediateDataSets.computeIfAbsent(srp.getResultId(), + (key) -> new SchedulingIntermediateDataSet()); + sid.increaseProducerCnt(); + } + } + } + if (!executionVertexDeploymentOptions.isEmpty()) { + schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions); + } + } + + @Override + public void restartTasks(Set verticesNeedingRestart) { Review comment: OK, I thought the `verticesNeedingRestart` of `restartTasks` are well chosen for directly scheduling vertices before. There would be another little difference between `startScheduling` and `restartTasks` in implementation, the former schedules the vertices without input result partition of all vertices in the topology, and the
[jira] [Assigned] (FLINK-10976) Add Aggregate operator to Table API
[ https://issues.apache.org/jira/browse/FLINK-10976?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hequn Cheng reassigned FLINK-10976: --- Assignee: Hequn Cheng > Add Aggregate operator to Table API > --- > > Key: FLINK-10976 > URL: https://issues.apache.org/jira/browse/FLINK-10976 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Reporter: sunjincheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > Time Spent: 20m > Remaining Estimate: 0h > > Add Aggregate operator to Table API as described in [Google > doc|https://docs.google.com/document/d/1tnpxg31EQz2-MEzSotwFzqatsB4rNLz0I-l_vPa5H4Q/edit#heading=h.q23rny2iglsr]. > The usage: > {code:java} > val res = tab > .groupBy('a) // leave out groupBy-clause to define global aggregates > .agg(fun: AggregateFunction) // output has columns 'a, 'b, 'c > .select('a, 'c) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] flinkbot commented on issue #8343: [hotfix][runtime] Remove useless code in JSONGenerator
flinkbot commented on issue #8343: [hotfix][runtime] Remove useless code in JSONGenerator URL: https://github.com/apache/flink/pull/8343#issuecomment-489383552 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on a change in pull request #8295: [FLINK-11974][runtime] Introduce StreamOperatorFactory
JingsongLi commented on a change in pull request #8295: [FLINK-11974][runtime] Introduce StreamOperatorFactory URL: https://github.com/apache/flink/pull/8295#discussion_r281001725 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/JSONGenerator.java ## @@ -187,8 +186,6 @@ private void decorateNode(Integer vertexID, ObjectNode node) { node.put(PACT, "Operator"); } - StreamOperator operator = streamGraph.getStreamNode(vertexID).getOperator(); Review comment: Yes, new pr: https://github.com/apache/flink/pull/8343 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi opened a new pull request #8343: [hotfix][runtime] Remove useless code in JSONGenerator
JingsongLi opened a new pull request #8343: [hotfix][runtime] Remove useless code in JSONGenerator URL: https://github.com/apache/flink/pull/8343 ## What is the purpose of the change Remove useless code in JSONGenerator for https://github.com/apache/flink/pull/8295 ## Verifying this change none ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Assigned] (FLINK-10976) Add Aggregate operator to Table API
[ https://issues.apache.org/jira/browse/FLINK-10976?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dian Fu reassigned FLINK-10976: --- Assignee: (was: Dian Fu) > Add Aggregate operator to Table API > --- > > Key: FLINK-10976 > URL: https://issues.apache.org/jira/browse/FLINK-10976 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Reporter: sunjincheng >Priority: Major > Labels: pull-request-available > Time Spent: 20m > Remaining Estimate: 0h > > Add Aggregate operator to Table API as described in [Google > doc|https://docs.google.com/document/d/1tnpxg31EQz2-MEzSotwFzqatsB4rNLz0I-l_vPa5H4Q/edit#heading=h.q23rny2iglsr]. > The usage: > {code:java} > val res = tab > .groupBy('a) // leave out groupBy-clause to define global aggregates > .agg(fun: AggregateFunction) // output has columns 'a, 'b, 'c > .select('a, 'c) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] dianfu closed pull request #7235: [FLINK-10976] [table] Add support for aggregate to table API
dianfu closed pull request #7235: [FLINK-10976] [table] Add support for aggregate to table API URL: https://github.com/apache/flink/pull/7235 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy
eaglewatcherwb commented on a change in pull request #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy URL: https://github.com/apache/flink/pull/8309#discussion_r281001575 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java ## @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.api.common.InputDependencyConstraint; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.jobgraph.IntermediateDataSet; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.scheduler.DeploymentOption; +import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption; +import org.apache.flink.runtime.scheduler.SchedulerOperations; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@link SchedulingStrategy} instance for batch job which schedule vertices when input data are ready. + */ +public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy { + + private final SchedulerOperations schedulerOperations; + + private final SchedulingTopology schedulingTopology; + + private final DeploymentOption deploymentOption = new DeploymentOption(true); + + private final JobGraph jobGraph; + + private final Map intermediateDataSets; + + public LazyFromSourcesSchedulingStrategy( + SchedulerOperations schedulerOperations, + SchedulingTopology schedulingTopology, + JobGraph jobGraph) { + this.schedulerOperations = checkNotNull(schedulerOperations); + this.schedulingTopology = checkNotNull(schedulingTopology); + this.jobGraph = checkNotNull(jobGraph); + this.intermediateDataSets = new HashMap<>(); + } + + @Override + public void startScheduling() { + List executionVertexDeploymentOptions = new LinkedList<>(); + for (SchedulingVertex schedulingVertex : schedulingTopology.getVertices()) { + if (jobGraph.findVertexByID(schedulingVertex.getJobVertexId()).isInputVertex()) { + // schedule vertices without consumed result partition + executionVertexDeploymentOptions.add( + new ExecutionVertexDeploymentOption(schedulingVertex.getId(), deploymentOption)); + } + + Collection partitions = schedulingVertex.getProducedResultPartitions(); + if (partitions != null) { + for (SchedulingResultPartition srp : partitions) { + SchedulingIntermediateDataSet sid = intermediateDataSets.computeIfAbsent(srp.getResultId(), + (key) -> new SchedulingIntermediateDataSet()); + sid.increaseProducerCnt(); + } + } + } + if (!executionVertexDeploymentOptions.isEmpty()) { + schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions); + } + } + + @Override + public void restartTasks(Set verticesNeedingRestart) { + List executionVertexDeploymentOptions = new ArrayList<>(verticesNeedingRestart.size()); + for (ExecutionVertexID executionVertexID : verticesNeedingRestart) { + executionVertexDeploymentOptions.add( + new ExecutionVertexDeploymentOption(executionVertexID, deploymentOption));
[GitHub] [flink] flinkbot commented on issue #8342: [FLINK-12360][chinese-translation]Translate Jobs and Scheduling Page …
flinkbot commented on issue #8342: [FLINK-12360][chinese-translation]Translate Jobs and Scheduling Page … URL: https://github.com/apache/flink/pull/8342#issuecomment-489383011 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-12360) Translate "Jobs and Scheduling" Page into Chinese
[ https://issues.apache.org/jira/browse/FLINK-12360?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-12360: --- Labels: pull-request-available (was: ) > Translate "Jobs and Scheduling" Page into Chinese > - > > Key: FLINK-12360 > URL: https://issues.apache.org/jira/browse/FLINK-12360 > Project: Flink > Issue Type: Task > Components: chinese-translation, Documentation >Reporter: Armstrong Nova >Assignee: Armstrong Nova >Priority: Major > Labels: pull-request-available > > Translate the internal page > "[https://ci.apache.org/projects/flink/flink-docs-master/internals/job_scheduling.html]; > to Chinese > the doc locates in "flink/docs/internals/job_scheduling.md", the translated > doc in "flink/docs/internals/job_scheduling.zh.md" > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] Armstrongya opened a new pull request #8342: [FLINK-12360][chinese-translation]Translate Jobs and Scheduling Page …
Armstrongya opened a new pull request #8342: [FLINK-12360][chinese-translation]Translate Jobs and Scheduling Page … URL: https://github.com/apache/flink/pull/8342 Translate the internal page "https://ci.apache.org/projects/flink/flink-docs-master/internals/job_scheduling.html; to Chinese ## What is the purpose of the change This pull request completes the Chinese translation of "job_scheduling" from official documents. ## Brief change log - *translates internal document "job_scheduling" into Chinese* ## Verifying this change - *This change is to add a new translated document.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #8233: [FLINK-12227] [runtime] introduce SchedulingStrategy interface
zhuzhurk commented on a change in pull request #8233: [FLINK-12227] [runtime] introduce SchedulingStrategy interface URL: https://github.com/apache/flink/pull/8233#discussion_r281001358 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexDeploymentOption.java ## @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; + +/** + * Component that stores the task need to be scheduled and the option for deployment. + */ +public class ExecutionVertexDeploymentOption { + + private final ExecutionVertexID executionVertexId; + + private final DeploymentOption deploymentOption; + + public ExecutionVertexDeploymentOption(ExecutionVertexID executionVertexId, DeploymentOption deploymentOption) { Review comment: Sorry I made a mistake here. I was bit confused of DeploymentOption and ExecutionVertexDeploymentOption due to the similar name. Now I understand they actually are different components. ExecutionVertexDeploymentOption is a deployment descriptor and the DeploymentOption is a wrapper of the options. Just wonder whether we can have different name patterns for them, like ExecutionVertexDeploymentDescriptor and DeploymentOption? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dianfu commented on issue #8311: [FLINK-10976][table] Add Aggregate operator to Table API
dianfu commented on issue #8311: [FLINK-10976][table] Add Aggregate operator to Table API URL: https://github.com/apache/flink/pull/8311#issuecomment-489382199 @hequn8128 Thanks a lot for the PR. LGTM overall with just a few comments. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dianfu commented on a change in pull request #8311: [FLINK-10976][table] Add Aggregate operator to Table API
dianfu commented on a change in pull request #8311: [FLINK-10976][table] Add Aggregate operator to Table API URL: https://github.com/apache/flink/pull/8311#discussion_r281000530 ## File path: flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/operations/OperationTreeBuilder.scala ## @@ -159,6 +159,66 @@ class OperationTreeBuilder(private val tableEnv: TableEnvImpl) { aggregateOperationFactory.createAggregate(resolvedGroupings, resolvedAggregates, child) } + def rowBasedAggregate( +groupingExpressions: JList[Expression], Review comment: indentation This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dianfu commented on a change in pull request #8311: [FLINK-10976][table] Add Aggregate operator to Table API
dianfu commented on a change in pull request #8311: [FLINK-10976][table] Add Aggregate operator to Table API URL: https://github.com/apache/flink/pull/8311#discussion_r281000842 ## File path: docs/dev/table/tableApi.md ## @@ -1889,6 +1889,57 @@ tableEnv.registerFunction("func", func); Table table = input .flatMap("func(c)").as("a, b") +{% endhighlight %} + + + + + +Aggregate +Batch Streaming + + +Performs an aggregate operation with an aggregate function. You have to close the "aggregate" with a select statement. The output of aggregate will be flattened if the output type is a composite type. +{% highlight java %} +public class MyMinMaxAcc { +public int min = 0; +public int max = 0; +} + +public class MyMinMax extends AggregateFunction { + +public void accumulate(MyMinMaxAcc acc, int value) { +if (value < acc.min) { +acc.min = value; +} +if (value > acc.max) { +acc.max = value; +} +} + +@Override +public MyMinMaxAcc createAccumulator() { +return new MyMinMaxAcc(); +} + +@Override +public Row getValue(MyMinMaxAcc acc) { +return Row.of(acc.min, acc.max); +} + +@Override +public TypeInformation getResultType() { +return new RowTypeInfo(Types.INT, Types.INT); +} +} + +AggregateFunction myAggFunc = new MyMinMax(); + +tableEnv.registerFunction("myAggFunc", myAggFunc); +Table table = input + .groupBy("key") + .aggregate("myAggFunc(a, b) as (x, y, z)") Review comment: as (x, y) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dianfu commented on a change in pull request #8311: [FLINK-10976][table] Add Aggregate operator to Table API
dianfu commented on a change in pull request #8311: [FLINK-10976][table] Add Aggregate operator to Table API URL: https://github.com/apache/flink/pull/8311#discussion_r281001161 ## File path: flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/validation/AggregateValidationTest.scala ## @@ -47,4 +47,54 @@ class AggregateValidationTest extends TableTestBase { // must fail. 'c is not a grouping key or aggregation .select('c) } + + @Test(expected = classOf[ValidationException]) + def testTableFunctionInSelection(): Unit = { Review comment: This test failed with the exception: org.apache.flink.table.api.ValidationException: Given parameters of function 'func' do not match any signature. Actual: (java.lang.Long) Expected: (java.lang.String) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dianfu commented on a change in pull request #8311: [FLINK-10976][table] Add Aggregate operator to Table API
dianfu commented on a change in pull request #8311: [FLINK-10976][table] Add Aggregate operator to Table API URL: https://github.com/apache/flink/pull/8311#discussion_r281000705 ## File path: flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/operations/OperationTreeBuilder.scala ## @@ -159,6 +159,66 @@ class OperationTreeBuilder(private val tableEnv: TableEnvImpl) { aggregateOperationFactory.createAggregate(resolvedGroupings, resolvedAggregates, child) } + def rowBasedAggregate( +groupingExpressions: JList[Expression], +aggregate: Expression, +child: TableOperation) + : TableOperation = { +// resolve for java string case, i.e., turn LookupCallExpression to CallExpression. +val resolver = resolverFor(tableCatalog, functionCatalog, child).build +val resolvedAggregate = resolveSingleExpression(aggregate, resolver) + +// extract alias and aggregate function +var alias: Seq[String] = Seq() +val aggWithoutAlias = resolvedAggregate match { + case c: CallExpression +if c.getFunctionDefinition.getName == BuiltInFunctionDefinitions.AS.getName => { +alias = c.getChildren + .drop(1) + .map(e => e.asInstanceOf[ValueLiteralExpression].getValue.asInstanceOf[String]) +c.getChildren.get(0) + } + case c: CallExpression +if c.getFunctionDefinition.isInstanceOf[AggregateFunctionDefinition] => { +if (alias.isEmpty) alias = UserDefinedFunctionUtils.getFieldInfo( + c.getFunctionDefinition.asInstanceOf[AggregateFunctionDefinition].getResultTypeInfo)._1 +c + } + case e => e Review comment: this line can be removed This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dianfu commented on a change in pull request #8311: [FLINK-10976][table] Add Aggregate operator to Table API
dianfu commented on a change in pull request #8311: [FLINK-10976][table] Add Aggregate operator to Table API URL: https://github.com/apache/flink/pull/8311#discussion_r281000442 ## File path: flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/tableImpl.scala ## @@ -580,3 +596,30 @@ class OverWindowedTableImpl( ) } } + +/** + * The implementation of a [[AggregatedTable]] that has been grouped on a set of grouping keys. Review comment: a -> an that has been performed on an aggregate function. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dianfu commented on a change in pull request #8311: [FLINK-10976][table] Add Aggregate operator to Table API
dianfu commented on a change in pull request #8311: [FLINK-10976][table] Add Aggregate operator to Table API URL: https://github.com/apache/flink/pull/8311#discussion_r281000507 ## File path: flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/tableImpl.scala ## @@ -580,3 +596,30 @@ class OverWindowedTableImpl( ) } } + +/** + * The implementation of a [[AggregatedTable]] that has been grouped on a set of grouping keys. + */ +class AggregatedTableImpl( + private[flink] val table: Table, Review comment: indentation This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dianfu commented on a change in pull request #8311: [FLINK-10976][table] Add Aggregate operator to Table API
dianfu commented on a change in pull request #8311: [FLINK-10976][table] Add Aggregate operator to Table API URL: https://github.com/apache/flink/pull/8311#discussion_r281000356 ## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java ## @@ -1058,4 +1058,39 @@ * */ Table flatMap(Expression tableFunction); + + /** +* Performs a global aggregate operation with an aggregate function. Use this before a selection +* to perform the selection operation. The output will be flattened if the output type is a +* composite type. +* +* Example: +* +* +* {@code +* AggregateFunction aggFunc = new MyAggregateFunction() +* tableEnv.registerFunction("aggFunc", aggFunc); +* table.aggregate("aggFunc(a, b) as (f0, f1, f2)") +* .select("key, f0, f1") Review comment: select("f0, f1")? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dianfu commented on a change in pull request #8311: [FLINK-10976][table] Add Aggregate operator to Table API
dianfu commented on a change in pull request #8311: [FLINK-10976][table] Add Aggregate operator to Table API URL: https://github.com/apache/flink/pull/8311#discussion_r281000946 ## File path: docs/dev/table/tableApi.md ## @@ -1957,6 +2008,52 @@ class MyFlatMapFunction extends TableFunction[Row] { val func = new MyFlatMapFunction val table = input .flatMap(func('c)).as('a, 'b) +{% endhighlight %} + + + + + +Aggregate +Batch Streaming + + +Performs an aggregate operation with an aggregate function. You have to close the "aggregate" with a select statement. The output of aggregate will be flattened if the output type is a composite type. +{% highlight scala %} +case class MyMinMaxAcc(var min: Int, var max: Int) + +class MyMinMax extends AggregateFunction[Row, MyMinMaxAcc] { + + def accumulate(acc: MyMinMaxAcc, value: Int): Unit = { +if (value < acc.min) { + acc.min = value +} +if (value > acc.max) { + acc.max = value +} + } + + def resetAccumulator(acc: MyMinMaxAcc): Unit = { Review comment: The Scala example has resetAccumulator, while the Java example has not. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dianfu commented on a change in pull request #8311: [FLINK-10976][table] Add Aggregate operator to Table API
dianfu commented on a change in pull request #8311: [FLINK-10976][table] Add Aggregate operator to Table API URL: https://github.com/apache/flink/pull/8311#discussion_r281000689 ## File path: flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/operations/OperationTreeBuilder.scala ## @@ -159,6 +159,66 @@ class OperationTreeBuilder(private val tableEnv: TableEnvImpl) { aggregateOperationFactory.createAggregate(resolvedGroupings, resolvedAggregates, child) } + def rowBasedAggregate( +groupingExpressions: JList[Expression], +aggregate: Expression, +child: TableOperation) + : TableOperation = { +// resolve for java string case, i.e., turn LookupCallExpression to CallExpression. +val resolver = resolverFor(tableCatalog, functionCatalog, child).build +val resolvedAggregate = resolveSingleExpression(aggregate, resolver) + +// extract alias and aggregate function +var alias: Seq[String] = Seq() +val aggWithoutAlias = resolvedAggregate match { + case c: CallExpression +if c.getFunctionDefinition.getName == BuiltInFunctionDefinitions.AS.getName => { Review comment: the brace is not necessary. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dianfu commented on a change in pull request #8311: [FLINK-10976][table] Add Aggregate operator to Table API
dianfu commented on a change in pull request #8311: [FLINK-10976][table] Add Aggregate operator to Table API URL: https://github.com/apache/flink/pull/8311#discussion_r281001183 ## File path: flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/validation/AggregateValidationTest.scala ## @@ -47,4 +47,54 @@ class AggregateValidationTest extends TableTestBase { // must fail. 'c is not a grouping key or aggregation .select('c) } + + @Test(expected = classOf[ValidationException]) + def testTableFunctionInSelection(): Unit = { +val util = streamTestUtil() +val table = util.addTable[(Long, Int, String)]('a, 'b, 'c) + +util.tableEnv.registerFunction("func", new TableFunc0) +table + .groupBy('a) + .aggregate('b.sum as 'd) + // must fail. Cannot use TableFunction in select after aggregate + .select("func(a)") + } + + @Test(expected = classOf[ValidationException]) + def testInvalidExpressionInAggregate(): Unit = { +val util = streamTestUtil() +val table = util.addTable[(Long, Int, String)]('a, 'b, 'c) + +table + .groupBy('a) + // must fail. Only AggregateFunction can be used in aggregate Review comment: The exception message is not friendly for users: org.apache.flink.table.api.ValidationException: Invalid arguments [log(b), 'd'] for function: as This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dianfu commented on a change in pull request #8311: [FLINK-10976][table] Add Aggregate operator to Table API
dianfu commented on a change in pull request #8311: [FLINK-10976][table] Add Aggregate operator to Table API URL: https://github.com/apache/flink/pull/8311#discussion_r281001232 ## File path: flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/validation/AggregateValidationTest.scala ## @@ -47,4 +47,54 @@ class AggregateValidationTest extends TableTestBase { // must fail. 'c is not a grouping key or aggregation .select('c) } + + @Test(expected = classOf[ValidationException]) + def testTableFunctionInSelection(): Unit = { +val util = streamTestUtil() +val table = util.addTable[(Long, Int, String)]('a, 'b, 'c) + +util.tableEnv.registerFunction("func", new TableFunc0) +table + .groupBy('a) + .aggregate('b.sum as 'd) + // must fail. Cannot use TableFunction in select after aggregate + .select("func(a)") + } + + @Test(expected = classOf[ValidationException]) + def testInvalidExpressionInAggregate(): Unit = { +val util = streamTestUtil() +val table = util.addTable[(Long, Int, String)]('a, 'b, 'c) + +table + .groupBy('a) + // must fail. Only AggregateFunction can be used in aggregate + .aggregate('b.log as 'd) + .select('a, 'd) + } + + @Test(expected = classOf[ValidationException]) + def testInvalidExpressionInAggregate2(): Unit = { +val util = streamTestUtil() +val table = util.addTable[(Long, Int, String)]('a, 'b, 'c) + +util.tableEnv.registerFunction("func", new TableFunc0) +table + .groupBy('a) + // must fail. Only AggregateFunction can be used in aggregate + .aggregate("func(c) as d") + .select('a, 'd) + } + + @Test(expected = classOf[ExpressionParserException]) + def testMultipleAggregateExpressionInAggregate(): Unit = { +val util = streamTestUtil() +val table = util.addTable[(Long, Int, String)]('a, 'b, 'c) + +util.tableEnv.registerFunction("func", new TableFunc0) +table + .groupBy('a) + // must fail. Only AggregateFunction can be used in aggregate Review comment: Only AggregateFunction -> Only one AggregateFunction This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dianfu commented on a change in pull request #8311: [FLINK-10976][table] Add Aggregate operator to Table API
dianfu commented on a change in pull request #8311: [FLINK-10976][table] Add Aggregate operator to Table API URL: https://github.com/apache/flink/pull/8311#discussion_r281000677 ## File path: flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/operations/OperationTreeBuilder.scala ## @@ -159,6 +159,66 @@ class OperationTreeBuilder(private val tableEnv: TableEnvImpl) { aggregateOperationFactory.createAggregate(resolvedGroupings, resolvedAggregates, child) } + def rowBasedAggregate( +groupingExpressions: JList[Expression], +aggregate: Expression, +child: TableOperation) + : TableOperation = { +// resolve for java string case, i.e., turn LookupCallExpression to CallExpression. +val resolver = resolverFor(tableCatalog, functionCatalog, child).build +val resolvedAggregate = resolveSingleExpression(aggregate, resolver) Review comment: use resolveExpression? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] klion26 commented on a change in pull request #8319: [FLINK-11636] [docs-zh] Translate "State Schema Evolution" into Chinese
klion26 commented on a change in pull request #8319: [FLINK-11636] [docs-zh] Translate "State Schema Evolution" into Chinese URL: https://github.com/apache/flink/pull/8319#discussion_r281000916 ## File path: docs/dev/stream/state/schema_evolution.zh.md ## @@ -26,11 +26,11 @@ under the License. {:toc} Apache Flink 流应用通常被设计为永远或者长时间运行。 -与所有长期运行的服务一样,需要更新应用程序以适应不断变化的需求。 -对于应用程序处理的数据结构来说,这是相同的;它们也随应用程序一起升级。 +与所有长期运行的服务一样,应用程序需要随着业务的迭代而进行调整。 +应用所处理的数据 schema 也会随着进行变化。 -此页面概述了如何升级状态类型的数据结构。 -目前的限制因不同类型和状态结构而异(`ValueState`、`ListState` 等)。 +此页面概述了如何升级状态类型的数据 schema 。 +目前对不同类系的状态结构(`ValueState`、`ListState` 等)有不同的限制 Review comment: `类系` --> `类型` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] klion26 commented on a change in pull request #8319: [FLINK-11636] [docs-zh] Translate "State Schema Evolution" into Chinese
klion26 commented on a change in pull request #8319: [FLINK-11636] [docs-zh] Translate "State Schema Evolution" into Chinese URL: https://github.com/apache/flink/pull/8319#discussion_r281000985 ## File path: docs/dev/stream/state/schema_evolution.zh.md ## @@ -80,19 +80,19 @@ Flink 内部是这样来进行处理的,首先会检查新的序列化器相 Flink 基于下面的规则来支持 [POJO 类型]({{ site.baseurl }}/zh/dev/types_serialization.html#pojo-类型的规则)结构的升级: - 1. 不可以删除字段。一旦删除,被删除字段的前值将会在将来的 checkpoints 以及 savepoints 中删除。 + 1. 可以删除字段。一旦删除,被删除字段的前值将会在将来的 checkpoints 以及 savepoints 中删除。 2. 可以添加字段。新字段会使用类型对应的默认值进行初始化,比如 [Java 类型](https://docs.oracle.com/javase/tutorial/java/nutsandbolts/datatypes.html)。 3. 不可以修改字段的声明类型。 4. 不可以改变 POJO 类型的类名,包括类的命名空间。 -需要注意,只有对使用1.8.0及以上版本的 Flink savepoint 进行恢复时,POJO 类型的状态才可以进行升级。 -对1.8.0版本之前的 Flink 是没有办法进行 POJO 类型升级的。 +需要注意,只有从 1.8.0 及以上版本的 Flink 生产的 savepoint 进行恢复时,POJO 类型的状态才可以进行升级。 +对 1.8.0 版本之前的 Flink 是没有办法进行 POJO 类型升级的。 ### Avro 类型 Flink 完全支持 Avro 状态类型的升级,只要数据结构的修改是被 [Avro 的数据结构解析规则](http://avro.apache.org/docs/current/spec.html#Schema+Resolution)认为兼容的即可。 -一个限制是,当作业恢复时,Avro 生成的用作状态类型的类无法重定位或具有不同的命名空间。 +如果新的 Avro 数据 schema 生产的类无法被重定位或者使用了不同的命名空间,状态是没办法在作业恢复时进行升级的。 Review comment: `状态是没办法在作业恢复时进行升级的。` seems like spoken language. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] Leeviiii commented on a change in pull request #8319: [FLINK-11636] [docs-zh] Translate "State Schema Evolution" into Chinese
Leev commented on a change in pull request #8319: [FLINK-11636] [docs-zh] Translate "State Schema Evolution" into Chinese URL: https://github.com/apache/flink/pull/8319#discussion_r281000793 ## File path: docs/dev/stream/state/schema_evolution.zh.md ## @@ -48,62 +47,52 @@ checkpointedState = getRuntimeContext().getListState(descriptor); {% endhighlight %} -Under the hood, whether or not the schema of state can be evolved depends on the serializer used to read / write -persisted state bytes. Simply put, a registered state's schema can only be evolved if its serializer properly -supports it. This is handled transparently by serializers generated by Flink's type serialization framework -(current scope of support is listed [below]({{ site.baseurl }}/dev/stream/state/schema_evolution.html#supported-data-types-for-schema-evolution)). +在内部,状态是否可以进行升级取决于用于读写持久化状态字节的序列化器。 +简而言之,状态数据结构只有在其序列化器正确支持时才能升级。 +这一过程是被 Flink 的类型序列化框架生成的序列化器透明处理的([下面]({{ site.baseurl }}/zh/dev/stream/state/schema_evolution.html#数据结构升级支持的数据类型) 列出了当前的支持范围)。 -If you intend to implement a custom `TypeSerializer` for your state type and would like to learn how to implement -the serializer to support state schema evolution, please refer to -[Custom State Serialization]({{ site.baseurl }}/dev/stream/state/custom_serialization.html). -The documentation there also covers necessary internal details about the interplay between state serializers and Flink's -state backends to support state schema evolution. +如果你想要为你的状态类型实现自定义的 `TypeSerializer` 并且想要学习如何实现支持状态数据结构升级的序列化器, +可以参考 [自定义状态序列化器]({{ site.baseurl }}/zh/dev/stream/state/custom_serialization.html)。 +本文档也包含一些用于支持状态数据结构升级的状态序列化器与 Flink 状态后端存储相互作用的必要内部细节。 -## Evolving state schema +## 升级状态数据结构 -To evolve the schema of a given state type, you would take the following steps: +为了对给定的状态类型进行升级,你需要采取以下几个步骤: - 1. Take a savepoint of your Flink streaming job. - 2. Update state types in your application (e.g., modifying your Avro type schema). - 3. Restore the job from the savepoint. When accessing state for the first time, Flink will assess whether or not - the schema had been changed for the state, and migrate state schema if necessary. + 1. 对 Flink 流作业进行 savepoint 操作。 + 2. 升级程序中的状态类型(例如:修改你的 Avro 结构)。 + 3. 从 savepoint 处重启作业。当第一次访问状态数据时,Flink 会评估状态数据结构是否已经改变,并在必要的时候进行状态结构迁移。 -The process of migrating state to adapt to changed schemas happens automatically, and independently for each state. -This process is performed internally by Flink by first checking if the new serializer for the state has different -serialization schema than the previous serializer; if so, the previous serializer is used to read the state to objects, -and written back to bytes again with the new serializer. +用来适应状态结构的改变而进行的状态迁移过程是自动发生的,并且状态之间是互相独立的。 +Flink 内部是这样来进行处理的,首先会检查新的序列化器相对比之前的序列化器是否有不同的状态结构;如果有, +那么之前的序列化器用来读取状态数据字节到对象,然后使用新的序列化器将对象回写为字节。 -Further details about the migration process is out of the scope of this documentation; please refer to -[here]({{ site.baseurl }}/dev/stream/state/custom_serialization.html). +更多的迁移过程细节不在本文档谈论的范围;可以参考[文档]({{ site.baseurl }}/zh/dev/stream/state/custom_serialization.html)。 -## Supported data types for schema evolution +## 数据结构升级支持的数据类型 -Currently, schema evolution is supported only for POJO and Avro types. Therefore, if you care about schema evolution for -state, it is currently recommended to always use either Pojo or Avro for state data types. +目前,仅对于 POJO 以及 Avro 类型支持数据结构升级。 +因此,如果你比较关注于状态数据结构的升级,那么目前来看强烈推荐使用 Pojo 或者 Avro 状态数据类型。 -There are plans to extend the support for more composite types; for more details, -please refer to [FLINK-10896](https://issues.apache.org/jira/browse/FLINK-10896). +我们有计划支持更多的复合类型;更多的细节可以参考 [FLINK-10896](https://issues.apache.org/jira/browse/FLINK-10896)。 -### POJO types +### POJO 类型 -Flink supports evolving schema of [POJO types]({{ site.baseurl }}/dev/types_serialization.html#rules-for-pojo-types), -based on the following set of rules: +Flink 基于下面的规则来支持 [POJO 类型]({{ site.baseurl }}/zh/dev/types_serialization.html#pojo-类型的规则)结构的升级: - 1. Fields can be removed. Once removed, the previous value for the removed field will be dropped in future checkpoints and savepoints. - 2. New fields can be added. The new field will be initialized to the default value for its type, as -[defined by Java](https://docs.oracle.com/javase/tutorial/java/nutsandbolts/datatypes.html). - 3. Declared fields types cannot change. - 4. Class name of the POJO type cannot change, including the namespace of the class. + 1. 不可以删除字段。一旦删除,被删除字段的前值将会在将来的 checkpoints 以及 savepoints 中删除。 + 2. 可以添加字段。新字段会使用类型对应的默认值进行初始化,比如 [Java 类型](https://docs.oracle.com/javase/tutorial/java/nutsandbolts/datatypes.html)。 + 3. 不可以修改字段的声明类型。 + 4. 不可以改变 POJO 类型的类名,包括类的命名空间。 -Note that the schema of POJO type state can only be evolved when restoring from a previous savepoint with Flink versions -newer than 1.8.0. When
[jira] [Comment Edited] (FLINK-12400) NullpointerException using SimpleStringSchema with Kafka
[ https://issues.apache.org/jira/browse/FLINK-12400?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16833187#comment-16833187 ] Congxian Qiu(klion26) edited comment on FLINK-12400 at 5/5/19 1:52 AM: --- [~PierreZ] thanks for filing this issue, I think there is already an issue want to fix this, please have a look at https://issues.apache.org/jira/browse/FLINK-11820 And a pr for FLINK-11820 [https://github.com/apache/flink/pull/7987] was (Author: klion26): [~PierreZ] thanks for filing this issue, I think there is already an issue want to fix this, please have a look at https://issues.apache.org/jira/browse/FLINK-11820 > NullpointerException using SimpleStringSchema with Kafka > > > Key: FLINK-12400 > URL: https://issues.apache.org/jira/browse/FLINK-12400 > Project: Flink > Issue Type: Improvement > Components: API / Type Serialization System >Affects Versions: 1.7.2, 1.8.0 > Environment: Flink 1.7.2 job on 1.8 cluster > Kafka 0.10 with a topic in log-compaction >Reporter: Pierre Zemb >Assignee: Pierre Zemb >Priority: Minor > > Hi! > Yesterday, we saw a strange behavior with our Flink job and Kafka. We are > consuming a Kafka topic setup in > [log-compaction|https://kafka.apache.org/documentation/#compaction] mode. As > such, sending a message with a null payload acts like a tombstone. > We are consuming Kafka like this: > {code:java} > new FlinkKafkaConsumer010<> ("topic", new SimpleStringSchema(), > this.kafkaProperties) > {code} > When we sent the message, job failed because of a NullPointerException > [here|https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/serialization/SimpleStringSchema.java#L75]. > `byte[] message` was null, causing the NPE. > We forked the class and added a basic nullable check, returning null if so. > It fixed our issue. > Should we add it to the main class? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12400) NullpointerException using SimpleStringSchema with Kafka
[ https://issues.apache.org/jira/browse/FLINK-12400?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16833187#comment-16833187 ] Congxian Qiu(klion26) commented on FLINK-12400: --- [~PierreZ] thanks for filing this issue, I think there is already an issue want to fix this, please have a look at https://issues.apache.org/jira/browse/FLINK-11820 > NullpointerException using SimpleStringSchema with Kafka > > > Key: FLINK-12400 > URL: https://issues.apache.org/jira/browse/FLINK-12400 > Project: Flink > Issue Type: Improvement > Components: API / Type Serialization System >Affects Versions: 1.7.2, 1.8.0 > Environment: Flink 1.7.2 job on 1.8 cluster > Kafka 0.10 with a topic in log-compaction >Reporter: Pierre Zemb >Assignee: Pierre Zemb >Priority: Minor > > Hi! > Yesterday, we saw a strange behavior with our Flink job and Kafka. We are > consuming a Kafka topic setup in > [log-compaction|https://kafka.apache.org/documentation/#compaction] mode. As > such, sending a message with a null payload acts like a tombstone. > We are consuming Kafka like this: > {code:java} > new FlinkKafkaConsumer010<> ("topic", new SimpleStringSchema(), > this.kafkaProperties) > {code} > When we sent the message, job failed because of a NullPointerException > [here|https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/serialization/SimpleStringSchema.java#L75]. > `byte[] message` was null, causing the NPE. > We forked the class and added a basic nullable check, returning null if so. > It fixed our issue. > Should we add it to the main class? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12401) Support incremental emit for non-window streaming FlatAggregate on Table API
[ https://issues.apache.org/jira/browse/FLINK-12401?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16833186#comment-16833186 ] Jark Wu commented on FLINK-12401: - Incremental value is ACC? Is this issue going to support local-combine + global optimization for flatAggregate? > Support incremental emit for non-window streaming FlatAggregate on Table API > > > Key: FLINK-12401 > URL: https://issues.apache.org/jira/browse/FLINK-12401 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > > As described in > [Flip-29|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=97552739], > there are two output modes for non-window streaming flatAggregate. One is > emitting with full values, the other is emitting with incremental values. > [FLINK-10977|https://issues.apache.org/jira/browse/FLINK-10977] supports the > former one, this jira is going to support the latter one. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] Leeviiii commented on a change in pull request #8319: [FLINK-11636] [docs-zh] Translate "State Schema Evolution" into Chinese
Leev commented on a change in pull request #8319: [FLINK-11636] [docs-zh] Translate "State Schema Evolution" into Chinese URL: https://github.com/apache/flink/pull/8319#discussion_r281000414 ## File path: docs/dev/stream/state/schema_evolution.zh.md ## @@ -25,17 +25,16 @@ under the License. * ToC {:toc} -Apache Flink streaming applications are typically designed to run indefinitely or for long periods of time. -As with all long-running services, the applications need to be updated to adapt to changing requirements. -This goes the same for data schemas that the applications work against; they evolve along with the application. +Apache Flink 流应用通常被设计为永远或者长时间运行。 +与所有长期运行的服务一样,需要更新应用程序以适应不断变化的需求。 Review comment: ok thanks your review This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-12401) Support incremental emit for non-window streaming FlatAggregate on Table API
Hequn Cheng created FLINK-12401: --- Summary: Support incremental emit for non-window streaming FlatAggregate on Table API Key: FLINK-12401 URL: https://issues.apache.org/jira/browse/FLINK-12401 Project: Flink Issue Type: Sub-task Components: Table SQL / API Reporter: Hequn Cheng Assignee: Hequn Cheng As described in [Flip-29|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=97552739], there are two output modes for non-window streaming flatAggregate. One is emitting with full values, the other is emitting with incremental values. [FLINK-10977|https://issues.apache.org/jira/browse/FLINK-10977] supports the former one, this jira is going to support the latter one. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] sunjincheng121 commented on a change in pull request #8267: [FLINK-12311][table][python] Add base python framework and Add Scan, Projection, and Filter operator support
sunjincheng121 commented on a change in pull request #8267: [FLINK-12311][table][python] Add base python framework and Add Scan, Projection, and Filter operator support URL: https://github.com/apache/flink/pull/8267#discussion_r281000307 ## File path: tools/verify_scala_suffixes.sh ## @@ -84,8 +84,9 @@ block_infected=0 # a) are not deployed during a release # b) exist only for dev purposes # c) no-one should depend on them +# exclude flink-python because there are 2 flink-python module currently, current logic goes wrong on this situation e2e_modules=$(find flink-end-to-end-tests -mindepth 2 -maxdepth 5 -name 'pom.xml' -printf '%h\n' | sort -u | tr '\n' ',') -excluded_modules=\!${e2e_modules//,/,\!},!flink-docs +excluded_modules=\!${e2e_modules//,/,\!},!flink-docs,!flink-python,!flink-libraries/flink-python Review comment: Good points! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sunjincheng121 commented on a change in pull request #8267: [FLINK-12311][table][python] Add base python framework and Add Scan, Projection, and Filter operator support
sunjincheng121 commented on a change in pull request #8267: [FLINK-12311][table][python] Add base python framework and Add Scan, Projection, and Filter operator support URL: https://github.com/apache/flink/pull/8267#discussion_r281000283 ## File path: flink-python/pyflink/table/table.py ## @@ -0,0 +1,117 @@ + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from py4j.java_gateway import get_method + +__all__ = ['Table'] + + +class Table(object): + +""" +A :class:`Table` is the core component of the Table API. +Similar to how the batch and streaming APIs have DataSet and DataStream, +the Table API is built around :class:`Table`. + +Use the methods of :class:`Table` to transform data. + +Example: +:: +>>> t_config = TableConfig.Builder().as_streaming_execution().set_parallelism(1).build() +>>> t_env = TableEnvironment.get_table_environment(t_config) +>>> ... +>>> t = t_env.scan("source") +>>> t.select(...) +... +>>> t.insert_into("print") +>>> t_env.execute() + +Operations such as :func:`~pyflink.table.Table.join`, :func:`~pyflink.table.Table.select`, +:func:`~pyflink.table.Table.where` and :func:`~pyflink.table.Table.group_by` +take arguments in an expression string. Please refer to the documentation for +the expression syntax. +""" + +def __init__(self, j_table): +self._j_table = j_table + +def select(self, fields): +""" +Performs a selection operation. Similar to a SQL SELECT statement. The field expressions +can contain complex expressions. + +Example: +:: +>>> t = tab.select("key, value + 'hello'") Review comment: Why I suggest remove the t is because we do not use the t in the demo. See Scala version: https://github.com/apache/flink/blob/9975e0393a9e09dbde21ca61f1a5751cc5934411/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java#L114 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sunjincheng121 commented on a change in pull request #8267: [FLINK-12311][table][python] Add base python framework and Add Scan, Projection, and Filter operator support
sunjincheng121 commented on a change in pull request #8267: [FLINK-12311][table][python] Add base python framework and Add Scan, Projection, and Filter operator support URL: https://github.com/apache/flink/pull/8267#discussion_r281000156 ## File path: flink-python/pyflink/find_flink_home.py ## @@ -0,0 +1,44 @@ + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function +import os +import sys + + +def _find_flink_home(): +""" +Find the FLINK_HOME. Review comment: Yes, add message to log system, is better! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sunjincheng121 commented on issue #8230: [FLINK-10977][table] Add streaming non-window FlatAggregate to Table API
sunjincheng121 commented on issue #8230: [FLINK-10977][table] Add streaming non-window FlatAggregate to Table API URL: https://github.com/apache/flink/pull/8230#issuecomment-489378092 Thanks for your update! Then changes look good to me now! And one more thing is in current PR without the implementation of incremental mode, can you create a JIRA for incremental mode? Best,Jincheng This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9679) Implement ConfluentRegistryAvroSerializationSchema
[ https://issues.apache.org/jira/browse/FLINK-9679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16833108#comment-16833108 ] Bowen Li commented on FLINK-9679: - Hi [~dawidwys], what do you think of the relationship between this JIRA and [FLINK-12256|https://issues.apache.org/jira/browse/FLINK-12256] ? > Implement ConfluentRegistryAvroSerializationSchema > -- > > Key: FLINK-9679 > URL: https://issues.apache.org/jira/browse/FLINK-9679 > Project: Flink > Issue Type: Improvement > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Affects Versions: 1.6.0 >Reporter: Yazdan Shirvany >Assignee: Dominik Wosiński >Priority: Major > Labels: pull-request-available > > Implement AvroSerializationSchema using Confluent Schema Registry -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12399) FilterableTableSource does not use filters on job run
[ https://issues.apache.org/jira/browse/FLINK-12399?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16833107#comment-16833107 ] Rong Rong commented on FLINK-12399: --- Hi [~josh.bradt]. I think I found the root cause of this issue. Apparently you have to override the method {{explainSource}} in order to let calcite know that the new created TableSource with filter pushedDown is different from the original created CustomeTableSource (where you have not applyPredicates). I think this might be related to the #4 changelog point https://github.com/apache/flink/pull/8324: when I try upgrading to CALCITE 1.19.0 I also encounter some weird issues where calcite tries to find the correct tablesource from the digest strings. I will assigned to myself and start looking into this issue. Please let me know if adding the override resolves your issue at this moment. > FilterableTableSource does not use filters on job run > - > > Key: FLINK-12399 > URL: https://issues.apache.org/jira/browse/FLINK-12399 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.8.0 >Reporter: Josh Bradt >Priority: Major > Attachments: flink-filter-bug.tar.gz > > > As discussed [on the mailing > list|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Filter-push-down-not-working-for-a-custom-BatchTableSource-tp27654.html], > there appears to be a bug where a job that uses a custom > FilterableTableSource does not keep the filters that were pushed down into > the table source. More specifically, the table source does receive filters > via applyPredicates, and a new table source with those filters is returned, > but the final job graph appears to use the original table source, which does > not contain any filters. > I attached a minimal example program to this ticket. The custom table source > is as follows: > {code:java} > public class CustomTableSource implements BatchTableSource, > FilterableTableSource { > private static final Logger LOG = > LoggerFactory.getLogger(CustomTableSource.class); > private final Filter[] filters; > private final FilterConverter converter = new FilterConverter(); > public CustomTableSource() { > this(null); > } > private CustomTableSource(Filter[] filters) { > this.filters = filters; > } > @Override > public DataSet getDataSet(ExecutionEnvironment execEnv) { > if (filters == null) { >LOG.info(" No filters defined "); > } else { > LOG.info(" Found filters "); > for (Filter filter : filters) { > LOG.info("FILTER: {}", filter); > } > } > return execEnv.fromCollection(allModels()); > } > @Override > public TableSource applyPredicate(List predicates) { > LOG.info("Applying predicates"); > List acceptedFilters = new ArrayList<>(); > for (final Expression predicate : predicates) { > converter.convert(predicate).ifPresent(acceptedFilters::add); > } > return new CustomTableSource(acceptedFilters.toArray(new Filter[0])); > } > @Override > public boolean isFilterPushedDown() { > return filters != null; > } > @Override > public TypeInformation getReturnType() { > return TypeInformation.of(Model.class); > } > @Override > public TableSchema getTableSchema() { > return TableSchema.fromTypeInfo(getReturnType()); > } > private List allModels() { > List models = new ArrayList<>(); > models.add(new Model(1, 2, 3, 4)); > models.add(new Model(10, 11, 12, 13)); > models.add(new Model(20, 21, 22, 23)); > return models; > } > } > {code} > > When run, it logs > {noformat} > 15:24:54,888 INFO com.klaviyo.filterbug.CustomTableSource >- Applying predicates > 15:24:54,901 INFO com.klaviyo.filterbug.CustomTableSource >- Applying predicates > 15:24:54,910 INFO com.klaviyo.filterbug.CustomTableSource >- Applying predicates > 15:24:54,977 INFO com.klaviyo.filterbug.CustomTableSource >- No filters defined {noformat} > which appears to indicate that although filters are getting pushed down, the > final job does not use them. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-12399) FilterableTableSource does not use filters on job run
[ https://issues.apache.org/jira/browse/FLINK-12399?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rong Rong reassigned FLINK-12399: - Assignee: Rong Rong > FilterableTableSource does not use filters on job run > - > > Key: FLINK-12399 > URL: https://issues.apache.org/jira/browse/FLINK-12399 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.8.0 >Reporter: Josh Bradt >Assignee: Rong Rong >Priority: Major > Attachments: flink-filter-bug.tar.gz > > > As discussed [on the mailing > list|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Filter-push-down-not-working-for-a-custom-BatchTableSource-tp27654.html], > there appears to be a bug where a job that uses a custom > FilterableTableSource does not keep the filters that were pushed down into > the table source. More specifically, the table source does receive filters > via applyPredicates, and a new table source with those filters is returned, > but the final job graph appears to use the original table source, which does > not contain any filters. > I attached a minimal example program to this ticket. The custom table source > is as follows: > {code:java} > public class CustomTableSource implements BatchTableSource, > FilterableTableSource { > private static final Logger LOG = > LoggerFactory.getLogger(CustomTableSource.class); > private final Filter[] filters; > private final FilterConverter converter = new FilterConverter(); > public CustomTableSource() { > this(null); > } > private CustomTableSource(Filter[] filters) { > this.filters = filters; > } > @Override > public DataSet getDataSet(ExecutionEnvironment execEnv) { > if (filters == null) { >LOG.info(" No filters defined "); > } else { > LOG.info(" Found filters "); > for (Filter filter : filters) { > LOG.info("FILTER: {}", filter); > } > } > return execEnv.fromCollection(allModels()); > } > @Override > public TableSource applyPredicate(List predicates) { > LOG.info("Applying predicates"); > List acceptedFilters = new ArrayList<>(); > for (final Expression predicate : predicates) { > converter.convert(predicate).ifPresent(acceptedFilters::add); > } > return new CustomTableSource(acceptedFilters.toArray(new Filter[0])); > } > @Override > public boolean isFilterPushedDown() { > return filters != null; > } > @Override > public TypeInformation getReturnType() { > return TypeInformation.of(Model.class); > } > @Override > public TableSchema getTableSchema() { > return TableSchema.fromTypeInfo(getReturnType()); > } > private List allModels() { > List models = new ArrayList<>(); > models.add(new Model(1, 2, 3, 4)); > models.add(new Model(10, 11, 12, 13)); > models.add(new Model(20, 21, 22, 23)); > return models; > } > } > {code} > > When run, it logs > {noformat} > 15:24:54,888 INFO com.klaviyo.filterbug.CustomTableSource >- Applying predicates > 15:24:54,901 INFO com.klaviyo.filterbug.CustomTableSource >- Applying predicates > 15:24:54,910 INFO com.klaviyo.filterbug.CustomTableSource >- Applying predicates > 15:24:54,977 INFO com.klaviyo.filterbug.CustomTableSource >- No filters defined {noformat} > which appears to indicate that although filters are getting pushed down, the > final job does not use them. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-11909) Provide default failure/timeout/backoff handling strategy for AsyncIO functions
[ https://issues.apache.org/jira/browse/FLINK-11909?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16831421#comment-16831421 ] Rong Rong edited comment on FLINK-11909 at 5/4/19 3:28 PM: --- I took some time to look at the {{AsyncFunction}} API and the {{AsyncWaitOperator}}. My understanding is that: when user implements an AsyncFunction, it has full control on how the asyncInvoke being executed (including creation of the ExecutorService used, # of threads). Thus user can also include any of the retry policies if needed (by directly create a retry mechanism) The limitation of this is: 1. Users does not have access to timer service from AsyncFunction - On the contrary, {{AsyncWaitOperator}} have access to register / cleanup / invoke {{AsyncFunction#timeout}} API which makes it more flexible in terms of creating retry policies. 2. Retry within {{AsyncFunction#asyncInvoke}} would mean the retry is input-element independent - This does not fit the "rate limiting" requirement from the mailing list: I am assuming this is more like a retry queue for a specific outbound service, not limited to a specific input element. - Or in another way, some sort of token-based rate limit coordination is needed across input elements. 3. Retry within {{AsyncFunction}} cannot allow access to {{StreamElementQueue}}, which means it cannot alter the ordering of the elements being executed - This doesn't fit into the description in Shuyi's comment of a DLQ (where failed component are put in the back of the queue before it can be retried the 2nd time) I have a general idea to extend the {{RuntimeContext}} to create a {{RetryContext}} to solve these issues. But I want to also make sure that the above concerns I mentioned are valid and should be addressed by Flink. otherwise, I think the current AsyncFunction API is good to go for some basic retry policies. Any thoughts? [~till.rohrmann] [~suez1224] was (Author: walterddr): I took some time to look at the {{AsyncFunction}} API and the {{AsyncWaitOperator}}. My understanding is that: when user implements an AsyncFunction, it has full control on how the asyncInvoke being executed (including creation of the ExecutorService used, # of threads). Thus user can also include any of the retry policies if needed (by directly create a retry mechanism) The limitation of this is: 1. Users does not have access to timer service from AsyncFunction - On the contrary, {{AsyncWaitOperator}} have access to register / cleanup / invoke {{AsyncFunction#timeout}} API which makes it more flexible in terms of creating retry policies. 2. Retry within {{AsyncFunction#asyncInvoke}} would mean the retry is input-element independent - This does not fit the "rate limiting" requirement from the mailing list: I am assuming this is more like a retry queue for a specific outbound service, not limited to a specific input element. 3. Retry within {{AsyncFunction}} cannot allow access to {{StreamElementQueue}}, which means it cannot alter the ordering of the elements being executed - This doesn't fit into the description in Shuyi's comment of a DLQ (where failed component are put in the back of the queue before it can be retried the 2nd time) I have a general idea to extend the {{RuntimeContext}} to create a {{RetryContext}} to solve these issues. But I want to also make sure that the above concerns I mentioned are valid and should be addressed by Flink. otherwise, I think the current AsyncFunction API is good to go for some basic retry policies. Any thoughts? [~till.rohrmann] [~suez1224] > Provide default failure/timeout/backoff handling strategy for AsyncIO > functions > --- > > Key: FLINK-11909 > URL: https://issues.apache.org/jira/browse/FLINK-11909 > Project: Flink > Issue Type: Improvement > Components: API / DataStream >Reporter: Rong Rong >Assignee: Rong Rong >Priority: Major > > Currently Flink AsyncIO by default fails the entire job when async function > invoke fails [1]. It would be nice to have some default Async IO > failure/timeout handling strategy, or opens up some APIs for AsyncFunction > timeout method to interact with the AsyncWaitOperator. For example (quote > [~suez1224] in [2]): > * FAIL_OPERATOR (default & current behavior) > * FIX_INTERVAL_RETRY (retry with configurable fixed interval up to N times) > * EXP_BACKOFF_RETRY (retry with exponential backoff up to N times) > Discussion also extended to introduce configuration such as: > * MAX_RETRY_COUNT > * RETRY_FAILURE_POLICY > REF: > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/asyncio.html#timeout-handling > [2] >
[jira] [Assigned] (FLINK-12351) AsyncWaitOperator should deep copy StreamElement when object reuse is enabled
[ https://issues.apache.org/jira/browse/FLINK-12351?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu reassigned FLINK-12351: --- Assignee: Jark Wu > AsyncWaitOperator should deep copy StreamElement when object reuse is enabled > - > > Key: FLINK-12351 > URL: https://issues.apache.org/jira/browse/FLINK-12351 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Reporter: Jark Wu >Assignee: Jark Wu >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 10m > Remaining Estimate: 0h > > Currently, AsyncWaitOperator directly put the input StreamElement into > {{StreamElementQueue}}. But when object reuse is enabled, the StreamElement > is reused, which means the element in {{StreamElementQueue}} will be > modified. As a result, the output of AsyncWaitOperator might be wrong. > An easy way to fix this might be deep copy the input StreamElement when > object reuse is enabled, like this: > https://github.com/apache/flink/blob/blink/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java#L209 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] wuchong commented on issue #8302: [FLINK-12269][table-blink] Support Temporal Table Join in blink planner and runtime
wuchong commented on issue #8302: [FLINK-12269][table-blink] Support Temporal Table Join in blink planner and runtime URL: https://github.com/apache/flink/pull/8302#issuecomment-489331877 Hi @KurtYoung , I have addressed the review comments. 1. add unit tests for all join runners 2. fix code generation problem when temporal join with udf filter (a bug found recently in internal) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] klion26 commented on a change in pull request #8300: [FLINK-11638][docs-zh] Translate Savepoints page into Chinese
klion26 commented on a change in pull request #8300: [FLINK-11638][docs-zh] Translate Savepoints page into Chinese URL: https://github.com/apache/flink/pull/8300#discussion_r280980371 ## File path: docs/ops/state/savepoints.zh.md ## @@ -25,32 +25,22 @@ under the License. * toc {:toc} -## What is a Savepoint? How is a Savepoint different from a Checkpoint? +## 什么是 Savepoint ? Savepoint 与 Checkpoint 有什么不同? -A Savepoint is a consistent image of the execution state of a streaming job, created via Flink's [checkpointing mechanism]({{ site.baseurl }}/internals/stream_checkpointing.html). You can use Savepoints to stop-and-resume, fork, -or update your Flink jobs. Savepoints consist of two parts: a directory with (typically large) binary files on stable storage (e.g. HDFS, S3, ...) and a (relatively small) meta data file. The files on stable storage represent the net data of the job's execution state -image. The meta data file of a Savepoint contains (primarily) pointers to all files on stable storage that are part of the Savepoint, in form of absolute paths. +Savepoint 是依据 Flink [checkpointing 机制]({{ site.baseurl }}/internals/stream_checkpointing.html)所创建的流作业执行状态的一致镜像。 你可以使用 Savepoint 进行 Flink 作业的停止与重启、fork或者更新。 Savepoint 由两部分组成:稳定存储(列入 HDFS,S3,...) 上包含二进制文件的目录(通常很大),和元数据文件(相对较小)。 稳定存储上的文件表示作业执行状态的数据镜像。 Savepoint 的元数据文件以(绝对路径)的形式包含(主要)指向作为 Savepoint 一部分的稳定存储上的所有文件的指针。 -Attention: In order to allow upgrades between programs and Flink versions, it is important to check out the following section about assigning IDs to your operators. +注意: 为了允许程序和 Flink 版本之间的升级,请务必查看以下有关分配算子 ID 的部分 。 +从概念上讲, Flink 的 Savepoint 与 Checkpoint 的不同之处类似于传统数据库中的备份与恢复日志之间的差异。 Checkpoint 的主要目的是为意外失败的作为提供恢复机制。 Checkpoint 的生命周期由 Flink 管理,即 Flink 创建,管理和删除 Checkpoint - 无需用户交互。 作为一种恢复和定期触发的方法,Checkpoint 实现有两个设计目标:i)轻量级创建和 ii)尽可能快地恢复。 可能会利用某些特定的属性来达到这个,例如, 工作代码在执行尝试之间不会改变。 在用户终止作业后,通常会删除 Checkpoint(除非明确配置为保留的 Checkpoint)。 -Conceptually, Flink's Savepoints are different from Checkpoints in a similar way that backups are different from recovery logs in traditional database systems. The primary purpose of Checkpoints is to provide a recovery mechanism in case of -unexpected job failures. A Checkpoint's lifecycle is managed by Flink, i.e. a Checkpoint is created, owned, and released by Flink - without user interaction. As a method of recovery and being periodically triggered, two main -design goals for the Checkpoint implementation are i) being as lightweight to create and ii) being as fast to restore from as possible. Optimizations towards those goals can exploit certain properties, e.g. that the job code -doesn't change between the execution attempts. Checkpoints are usually dropped after the job was terminated by the user (except if explicitly configured as retained Checkpoints). + 与此相反、Savepoint 由用户创建,拥有和删除。 他们的用例是计划的,手动备份和恢复。 例如,升级 Flink 版本,调整用户逻辑,改变并行度,以及进行红蓝部署等。 当然,Savepoint 必须在终止工作后继续存在。 从概念上讲,Savepoint 的生成,恢复成本可能更高一些,Savepoint 更多地关注可移植性和对前面提到的作业更改的支持。 -In contrast to all this, Savepoints are created, owned, and deleted by the user. Their use-case is for planned, manual backup and resume. For example, this could be an update of your Flink version, changing your job graph, -changing parallelism, forking a second job like for a red/blue deployment, and so on. Of course, Savepoints must survive job termination. Conceptually, Savepoints can be a bit more expensive to produce and restore and focus -more on portability and support for the previously mentioned changes to the job. +除去这些概念上的差异,Checkpoint 和 Savepoint 的当前实现基本上使用相同的代码并生成相同的格式。然而,目前有一个例外,我们可能会在未来引入更多的差异。例外情况是使用 RocksDB 状态后端的增量 Checkpoint。他们使用了一些 RocksDB 内部格式,而不是 Flink 的本机 Savepoint 格式。这使他们成为了与 Savepoint 相比,更轻量级的 Checkpoint 机制的第一个实例。 -Those conceptual differences aside, the current implementations of Checkpoints and Savepoints are basically using the same code and produce the same format. However, there is currently one exception from this, and we might -introduce more differences in the future. The exception are incremental checkpoints with the RocksDB state backend. They are using some RocksDB internal format instead of Flink’s native savepoint format. This makes them the -first instance of a more lightweight checkpointing mechanism, compared to Savepoints. +## 分配算子ID -## Assigning Operator IDs - -It is **highly recommended** that you adjust your programs as described in this section in order to be able to upgrade your programs in the future. The main required change is to manually specify operator IDs via the **`uid(String)`** method. These IDs are used to scope the state of each operator. +**强烈建议**你按照本节所述调整你的程序,以便将来能够升级你的程序。主要需要的更改是通过**`uid(String)`**方法手动指定算子 ID 。这些 ID 用于限定每个操作符的状态。 Review comment: I think this sentence can be improved `主要需要的更改是通过**`uid(String)`**方法手动指定算子 ID` This is an automated message from the Apache Git
[GitHub] [flink] klion26 commented on a change in pull request #8300: [FLINK-11638][docs-zh] Translate Savepoints page into Chinese
klion26 commented on a change in pull request #8300: [FLINK-11638][docs-zh] Translate Savepoints page into Chinese URL: https://github.com/apache/flink/pull/8300#discussion_r280980514 ## File path: docs/ops/state/savepoints.zh.md ## @@ -78,160 +68,158 @@ source-id | State of StatefulSource mapper-id | State of StatefulMapper {% endhighlight %} -In the above example, the print sink is stateless and hence not part of the savepoint state. By default, we try to map each entry of the savepoint back to the new program. +在上面的示例中,print sink 是无状态的,因此不是 Savepoint 状态的一部分。默认情况下,我们尝试将 Savepoint 的每个条目映射回新程序。 -## Operations +## 算子 -You can use the [command line client]({{ site.baseurl }}/ops/cli.html#savepoints) to *trigger savepoints*, *cancel a job with a savepoint*, *resume from savepoints*, and *dispose savepoints*. +你可以使用[命令行客户端]({{site.baseurl}}/zh/ops/cli.html#Savepoint)来*触发 Savepoint *,*取消具有 Savepoint *的作业,*从 Savepoint *恢复,以及*部署 Savepoint *。 -With Flink >= 1.2.0 it is also possible to *resume from savepoints* using the webui. +使用 Flink >= 1.2.0,还可以使用 webui *从 Savepoint 恢复*。 -### Triggering Savepoints +### 触发 Savepoint -When triggering a savepoint, a new savepoint directory is created where the data as well as the meta data will be stored. The location of this directory can be controlled by [configuring a default target directory](#configuration) or by specifying a custom target directory with the trigger commands (see the [`:targetDirectory` argument](#trigger-a-savepoint)). +当触发 Savepoint 时,将创建一个新的 Savepoint 目录,其中存储数据和元数据。可以通过[配置默认目标目录](#configuration)或使用触发器命令指定自定义目标目录(参见[`:targetDirectory `参数](#trigger-a-savepoint)来控制该目录的位置。 -Attention: The target directory has to be a location accessible by both the JobManager(s) and TaskManager(s) e.g. a location on a distributed file-system. +注意:目标目录必须是 JobManager(s) 和 TaskManager(s) 都可以访问的位置,例如分布式文件系统上的位置。 -For example with a `FsStateBackend` or `RocksDBStateBackend`: +例如,使用 `FsStateBackend` 或 `RocksDBStateBackend` : {% highlight shell %} -# Savepoint target directory -/savepoints/ +# Savepoint 目标目录 +/Savepoint/ -# Savepoint directory -/savepoints/savepoint-:shortjobid-:savepointid/ +# Savepoint 目录 +/Savepoint/savepoint-:shortjobid-:savepointid/ -# Savepoint file contains the checkpoint meta data -/savepoints/savepoint-:shortjobid-:savepointid/_metadata +# Savepoint 文件包含 Checkpoint元数据 +/Savepoint/savepoint-:shortjobid-:savepointid/_metadata -# Savepoint state -/savepoints/savepoint-:shortjobid-:savepointid/... +# Savepoint 状态 +/Savepoint/savepoint-:shortjobid-:savepointid/... {% endhighlight %} - Note: -Although it looks as if the savepoints may be moved, it is currently not possible due to absolute paths in the _metadata file. -Please follow https://issues.apache.org/jira/browse/FLINK-5778;>FLINK-5778 for progress on lifting this restriction. + 注意: +虽然看起来好像可以移动 Savepoint ,但由于 _metadata 文件中的绝对路径,目前无法进行保存。 Review comment: `但由于 _metadata 文件中的绝对路径,目前无法进行保存。` --> `但由于 _metadata 中保存的是绝对路径,因此暂时不支持。 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] klion26 commented on a change in pull request #8300: [FLINK-11638][docs-zh] Translate Savepoints page into Chinese
klion26 commented on a change in pull request #8300: [FLINK-11638][docs-zh] Translate Savepoints page into Chinese URL: https://github.com/apache/flink/pull/8300#discussion_r280980459 ## File path: docs/ops/state/savepoints.zh.md ## @@ -78,160 +68,158 @@ source-id | State of StatefulSource mapper-id | State of StatefulMapper {% endhighlight %} -In the above example, the print sink is stateless and hence not part of the savepoint state. By default, we try to map each entry of the savepoint back to the new program. +在上面的示例中,print sink 是无状态的,因此不是 Savepoint 状态的一部分。默认情况下,我们尝试将 Savepoint 的每个条目映射回新程序。 -## Operations +## 算子 -You can use the [command line client]({{ site.baseurl }}/ops/cli.html#savepoints) to *trigger savepoints*, *cancel a job with a savepoint*, *resume from savepoints*, and *dispose savepoints*. +你可以使用[命令行客户端]({{site.baseurl}}/zh/ops/cli.html#Savepoint)来*触发 Savepoint *,*取消具有 Savepoint *的作业,*从 Savepoint *恢复,以及*部署 Savepoint *。 -With Flink >= 1.2.0 it is also possible to *resume from savepoints* using the webui. +使用 Flink >= 1.2.0,还可以使用 webui *从 Savepoint 恢复*。 Review comment: `使用 Flink >= 1.2.0` --> `从 Flink 1.2.0 开始` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] klion26 commented on a change in pull request #8300: [FLINK-11638][docs-zh] Translate Savepoints page into Chinese
klion26 commented on a change in pull request #8300: [FLINK-11638][docs-zh] Translate Savepoints page into Chinese URL: https://github.com/apache/flink/pull/8300#discussion_r280980436 ## File path: docs/ops/state/savepoints.zh.md ## @@ -78,160 +68,158 @@ source-id | State of StatefulSource mapper-id | State of StatefulMapper {% endhighlight %} -In the above example, the print sink is stateless and hence not part of the savepoint state. By default, we try to map each entry of the savepoint back to the new program. +在上面的示例中,print sink 是无状态的,因此不是 Savepoint 状态的一部分。默认情况下,我们尝试将 Savepoint 的每个条目映射回新程序。 -## Operations +## 算子 -You can use the [command line client]({{ site.baseurl }}/ops/cli.html#savepoints) to *trigger savepoints*, *cancel a job with a savepoint*, *resume from savepoints*, and *dispose savepoints*. +你可以使用[命令行客户端]({{site.baseurl}}/zh/ops/cli.html#Savepoint)来*触发 Savepoint *,*取消具有 Savepoint *的作业,*从 Savepoint *恢复,以及*部署 Savepoint *。 Review comment: `*取消具有 Savepoint *的作业` --> `*触发 Savepoint 并取消作业*` `*部署 Savepoint *` --> `删除 Savepoint`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] klion26 commented on a change in pull request #8300: [FLINK-11638][docs-zh] Translate Savepoints page into Chinese
klion26 commented on a change in pull request #8300: [FLINK-11638][docs-zh] Translate Savepoints page into Chinese URL: https://github.com/apache/flink/pull/8300#discussion_r280980604 ## File path: docs/ops/state/savepoints.zh.md ## @@ -78,160 +68,158 @@ source-id | State of StatefulSource mapper-id | State of StatefulMapper {% endhighlight %} -In the above example, the print sink is stateless and hence not part of the savepoint state. By default, we try to map each entry of the savepoint back to the new program. +在上面的示例中,print sink 是无状态的,因此不是 Savepoint 状态的一部分。默认情况下,我们尝试将 Savepoint 的每个条目映射回新程序。 -## Operations +## 算子 -You can use the [command line client]({{ site.baseurl }}/ops/cli.html#savepoints) to *trigger savepoints*, *cancel a job with a savepoint*, *resume from savepoints*, and *dispose savepoints*. +你可以使用[命令行客户端]({{site.baseurl}}/zh/ops/cli.html#Savepoint)来*触发 Savepoint *,*取消具有 Savepoint *的作业,*从 Savepoint *恢复,以及*部署 Savepoint *。 -With Flink >= 1.2.0 it is also possible to *resume from savepoints* using the webui. +使用 Flink >= 1.2.0,还可以使用 webui *从 Savepoint 恢复*。 -### Triggering Savepoints +### 触发 Savepoint -When triggering a savepoint, a new savepoint directory is created where the data as well as the meta data will be stored. The location of this directory can be controlled by [configuring a default target directory](#configuration) or by specifying a custom target directory with the trigger commands (see the [`:targetDirectory` argument](#trigger-a-savepoint)). +当触发 Savepoint 时,将创建一个新的 Savepoint 目录,其中存储数据和元数据。可以通过[配置默认目标目录](#configuration)或使用触发器命令指定自定义目标目录(参见[`:targetDirectory `参数](#trigger-a-savepoint)来控制该目录的位置。 -Attention: The target directory has to be a location accessible by both the JobManager(s) and TaskManager(s) e.g. a location on a distributed file-system. +注意:目标目录必须是 JobManager(s) 和 TaskManager(s) 都可以访问的位置,例如分布式文件系统上的位置。 -For example with a `FsStateBackend` or `RocksDBStateBackend`: +例如,使用 `FsStateBackend` 或 `RocksDBStateBackend` : {% highlight shell %} -# Savepoint target directory -/savepoints/ +# Savepoint 目标目录 +/Savepoint/ -# Savepoint directory -/savepoints/savepoint-:shortjobid-:savepointid/ +# Savepoint 目录 +/Savepoint/savepoint-:shortjobid-:savepointid/ -# Savepoint file contains the checkpoint meta data -/savepoints/savepoint-:shortjobid-:savepointid/_metadata +# Savepoint 文件包含 Checkpoint元数据 +/Savepoint/savepoint-:shortjobid-:savepointid/_metadata -# Savepoint state -/savepoints/savepoint-:shortjobid-:savepointid/... +# Savepoint 状态 +/Savepoint/savepoint-:shortjobid-:savepointid/... {% endhighlight %} - Note: -Although it looks as if the savepoints may be moved, it is currently not possible due to absolute paths in the _metadata file. -Please follow https://issues.apache.org/jira/browse/FLINK-5778;>FLINK-5778 for progress on lifting this restriction. + 注意: +虽然看起来好像可以移动 Savepoint ,但由于 _metadata 文件中的绝对路径,目前无法进行保存。 +请按照https://issues.apache.org/jira/browse/FLINK-5778;> FLINK-5778 了解取消此限制的进度。 - -Note that if you use the `MemoryStateBackend`, metadata *and* savepoint state will be stored in the `_metadata` file. Since it is self-contained, you may move the file and restore from any location. +请注意,如果使用 `MemoryStateBackend`,则元数据*和* Savepoint 状态将存储在`_metadata`文件中。 由于它是自包含的,你可以移动文件并从任何位置恢复。 - Attention: It is discouraged to move or delete the last savepoint of a running job, because this might interfere with failure-recovery. Savepoints have side-effects on exactly-once sinks, therefore - to ensure exactly-once semantics, if there is no checkpoint after the last savepoint, the savepoint will be used for recovery. + 注意: 不建议移动或删除正在运行作业的最后一个 Savepoint ,因为这可能会干扰故障恢复。因此,Savepoint 对完全一次的接收器有副作用,为了确保精确的一次语义,如果在最后一个 Savepoint 之后没有 Checkpoint ,那么将使用 Savepoint 进行恢复。 Review comment: `完全一次` --> `精确一次` `精确的一次语义` --> `精确一次的语义` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] klion26 commented on a change in pull request #8300: [FLINK-11638][docs-zh] Translate Savepoints page into Chinese
klion26 commented on a change in pull request #8300: [FLINK-11638][docs-zh] Translate Savepoints page into Chinese URL: https://github.com/apache/flink/pull/8300#discussion_r280980635 ## File path: docs/ops/state/savepoints.zh.md ## @@ -78,160 +68,158 @@ source-id | State of StatefulSource mapper-id | State of StatefulMapper {% endhighlight %} -In the above example, the print sink is stateless and hence not part of the savepoint state. By default, we try to map each entry of the savepoint back to the new program. +在上面的示例中,print sink 是无状态的,因此不是 Savepoint 状态的一部分。默认情况下,我们尝试将 Savepoint 的每个条目映射回新程序。 -## Operations +## 算子 -You can use the [command line client]({{ site.baseurl }}/ops/cli.html#savepoints) to *trigger savepoints*, *cancel a job with a savepoint*, *resume from savepoints*, and *dispose savepoints*. +你可以使用[命令行客户端]({{site.baseurl}}/zh/ops/cli.html#Savepoint)来*触发 Savepoint *,*取消具有 Savepoint *的作业,*从 Savepoint *恢复,以及*部署 Savepoint *。 -With Flink >= 1.2.0 it is also possible to *resume from savepoints* using the webui. +使用 Flink >= 1.2.0,还可以使用 webui *从 Savepoint 恢复*。 -### Triggering Savepoints +### 触发 Savepoint -When triggering a savepoint, a new savepoint directory is created where the data as well as the meta data will be stored. The location of this directory can be controlled by [configuring a default target directory](#configuration) or by specifying a custom target directory with the trigger commands (see the [`:targetDirectory` argument](#trigger-a-savepoint)). +当触发 Savepoint 时,将创建一个新的 Savepoint 目录,其中存储数据和元数据。可以通过[配置默认目标目录](#configuration)或使用触发器命令指定自定义目标目录(参见[`:targetDirectory `参数](#trigger-a-savepoint)来控制该目录的位置。 -Attention: The target directory has to be a location accessible by both the JobManager(s) and TaskManager(s) e.g. a location on a distributed file-system. +注意:目标目录必须是 JobManager(s) 和 TaskManager(s) 都可以访问的位置,例如分布式文件系统上的位置。 -For example with a `FsStateBackend` or `RocksDBStateBackend`: +例如,使用 `FsStateBackend` 或 `RocksDBStateBackend` : {% highlight shell %} -# Savepoint target directory -/savepoints/ +# Savepoint 目标目录 +/Savepoint/ -# Savepoint directory -/savepoints/savepoint-:shortjobid-:savepointid/ +# Savepoint 目录 +/Savepoint/savepoint-:shortjobid-:savepointid/ -# Savepoint file contains the checkpoint meta data -/savepoints/savepoint-:shortjobid-:savepointid/_metadata +# Savepoint 文件包含 Checkpoint元数据 +/Savepoint/savepoint-:shortjobid-:savepointid/_metadata -# Savepoint state -/savepoints/savepoint-:shortjobid-:savepointid/... +# Savepoint 状态 +/Savepoint/savepoint-:shortjobid-:savepointid/... {% endhighlight %} - Note: -Although it looks as if the savepoints may be moved, it is currently not possible due to absolute paths in the _metadata file. -Please follow https://issues.apache.org/jira/browse/FLINK-5778;>FLINK-5778 for progress on lifting this restriction. + 注意: +虽然看起来好像可以移动 Savepoint ,但由于 _metadata 文件中的绝对路径,目前无法进行保存。 +请按照https://issues.apache.org/jira/browse/FLINK-5778;> FLINK-5778 了解取消此限制的进度。 - -Note that if you use the `MemoryStateBackend`, metadata *and* savepoint state will be stored in the `_metadata` file. Since it is self-contained, you may move the file and restore from any location. +请注意,如果使用 `MemoryStateBackend`,则元数据*和* Savepoint 状态将存储在`_metadata`文件中。 由于它是自包含的,你可以移动文件并从任何位置恢复。 - Attention: It is discouraged to move or delete the last savepoint of a running job, because this might interfere with failure-recovery. Savepoints have side-effects on exactly-once sinks, therefore - to ensure exactly-once semantics, if there is no checkpoint after the last savepoint, the savepoint will be used for recovery. + 注意: 不建议移动或删除正在运行作业的最后一个 Savepoint ,因为这可能会干扰故障恢复。因此,Savepoint 对完全一次的接收器有副作用,为了确保精确的一次语义,如果在最后一个 Savepoint 之后没有 Checkpoint ,那么将使用 Savepoint 进行恢复。 - Trigger a Savepoint + + 触发 Savepoint {% highlight shell %} $ bin/flink savepoint :jobId [:targetDirectory] {% endhighlight %} -This will trigger a savepoint for the job with ID `:jobId`, and returns the path of the created savepoint. You need this path to restore and dispose savepoints. +这将触发ID为`:jobId`的作业的Savepoint,并返回创建的 Savepoint 的路径。 你需要此路径来还原和部署 Savepoint 。 - Trigger a Savepoint with YARN + 使用 YARN 触发 Savepoint {% highlight shell %} $ bin/flink savepoint :jobId [:targetDirectory] -yid :yarnAppId {% endhighlight %} -This will trigger a savepoint for the job with ID `:jobId` and YARN application ID `:yarnAppId`, and returns the path of the created savepoint. +这将触发 ID 为`:jobId` 和 YARN 应用程序 ID `:yarnAppId`的作业的 Savepoint,并返回创建的 Savepoint 的路径。 - Cancel Job with Savepoint + 使用 Savepoint 取消作业 {% highlight shell %} $ bin/flink cancel -s [:targetDirectory] :jobId {% endhighlight %} -This will atomically trigger a savepoint for the job with ID `:jobid` and cancel the job. Furthermore, you can specify a target file system directory to store the savepoint in. The directory
[GitHub] [flink] klion26 commented on a change in pull request #8341: [FLINK-11633][docs-zh] Translate "Working with state" into Chinese
klion26 commented on a change in pull request #8341: [FLINK-11633][docs-zh] Translate "Working with state" into Chinese URL: https://github.com/apache/flink/pull/8341#discussion_r280980168 ## File path: docs/dev/stream/state/state.zh.md ## @@ -22,122 +22,87 @@ specific language governing permissions and limitations under the License. --> -This document explains how to use Flink's state abstractions when developing an application. - -* ToC +本文档主要介绍如何在 Flink 作业中使用状态 +* 目录 {:toc} ## Keyed State and Operator State -There are two basic kinds of state in Flink: `Keyed State` and `Operator State`. +Flink 中有两种基本的状态:`Keyed State` 和 `Operator State` ### Keyed State -*Keyed State* is always relative to keys and can only be used in functions and operators on a `KeyedStream`. +*Keyed State* 通常和键相关,仅可使用在 `KeyedStream` 的方法和算子中。 -You can think of Keyed State as Operator State that has been partitioned, -or sharded, with exactly one state-partition per key. -Each keyed-state is logically bound to a unique -composite of , and since each key -"belongs" to exactly one parallel instance of a keyed operator, we can -think of this simply as . +你可以把 Keyed State 看作分区或者共享的 Operator State, 而且每个键仅出现在一个分区内。 +逻辑上每个 keyed-state 和 <算子并发实例, key> 相绑定,由于每个 key 仅"属于" +算子的一个并发,因此简化为 <算子, key>。 -Keyed State is further organized into so-called *Key Groups*. Key Groups are the -atomic unit by which Flink can redistribute Keyed State; -there are exactly as many Key Groups as the defined maximum parallelism. -During execution each parallel instance of a keyed operator works with the keys -for one or more Key Groups. +Keyed State 会按照 *Key Group* 进行管理。Key Group 是 Flink 分发 Keyed State 的最小单元; +Key Groups 的数目等于作业的最大并发数。在执行过程中,每个 keyed operator 会对应到一个或多个 Key Group ### Operator State -With *Operator State* (or *non-keyed state*), each operator state is -bound to one parallel operator instance. -The [Kafka Connector]({{ site.baseurl }}/dev/connectors/kafka.html) is a good motivating example for the use of Operator State -in Flink. Each parallel instance of the Kafka consumer maintains a map -of topic partitions and offsets as its Operator State. +对于 *Operator State* (或者 *non-keyed state*) 来说,每个 operator state 和一个并发实例进行绑定。 +[Kafka Connector]({{ site.baseurl }}/zh/dev/connectors/kafka.html) 是 Flink 中使用 operator state 的一个很好的示例。 +每个 Kafka 消费者的并发在 Operator State 中维护一个 topic partition 到 offset 的映射关系。 -The Operator State interfaces support redistributing state among -parallel operator instances when the parallelism is changed. There can be different schemes for doing this redistribution. +Operator State 在 Flink 作业的并发改变后,会重新分发状态,分发的策略和 Keyed State 不一样。 ## Raw and Managed State -*Keyed State* and *Operator State* exist in two forms: *managed* and *raw*. +*Keyed State* 和 *Operator State* 分别有两种存在形式:*managed* and *raw*. -*Managed State* is represented in data structures controlled by the Flink runtime, such as internal hash tables, or RocksDB. -Examples are "ValueState", "ListState", etc. Flink's runtime encodes -the states and writes them into the checkpoints. +*Managed State* 有 Flink runtime 中的数据结构所控制,比如内部的 hash table 或者 RocksDB。 +比如 "ValueState", "ListState" 等。Flink runtime 会对这些状态进行编码并写入 checkpoint。 -*Raw State* is state that operators keep in their own data structures. When checkpointed, they only write a sequence of bytes into -the checkpoint. Flink knows nothing about the state's data structures and sees only the raw bytes. +*Raw State* 则保存在算子自己的数据结构中。checkpoint 的时候,Flink 并不知晓具体的内容,仅仅写入一串字节序列到 checkpoint。 -All datastream functions can use managed state, but the raw state interfaces can only be used when implementing operators. -Using managed state (rather than raw state) is recommended, since with -managed state Flink is able to automatically redistribute state when the parallelism is -changed, and also do better memory management. +所有 datastream 的方法都可以使用 managed state, 但是 raw state 则只能在实现算子的时候使用。 +由于 Flink 可以在修改并发是更好的重新分发状态数据,并且能够更好的管理内存,因此建议使用 managed state(而不是 raw state)。 -Attention If your managed state needs custom serialization logic, please see -the [corresponding guide](custom_serialization.html) in order to ensure future compatibility. Flink's default serializers -don't need special treatment. +注意 如果你的 managed state 需要定制化的序列化逻辑, +为了后续的兼容性请参考 [corresponding guide](custom_serialization.html),Flink 默认提供的序列化器不需要用户做特殊的处理。 ## Using Managed Keyed State -The managed keyed state interface provides access to different types of state that are all scoped to -the key of the current input element. This means that this type of state can only be used -on a `KeyedStream`, which can be created via `stream.keyBy(…)`. +managed keyed state 接口提供不同类型状态访问的接口,这些状态都限定于当前的输入数据。换句话说,这些状态仅可在 `KeyedStream` Review comment: `不同类型状态访问的接口` --> `不同类型状态的访问接口` `这些状态都限定于当前的输入数据` --> `这些状态以当前输入数据为键` This is an automated
[GitHub] [flink] klion26 commented on a change in pull request #8341: [FLINK-11633][docs-zh] Translate "Working with state" into Chinese
klion26 commented on a change in pull request #8341: [FLINK-11633][docs-zh] Translate "Working with state" into Chinese URL: https://github.com/apache/flink/pull/8341#discussion_r280980039 ## File path: docs/dev/stream/state/state.zh.md ## @@ -22,122 +22,87 @@ specific language governing permissions and limitations under the License. --> -This document explains how to use Flink's state abstractions when developing an application. - -* ToC +本文档主要介绍如何在 Flink 作业中使用状态 +* 目录 {:toc} ## Keyed State and Operator State -There are two basic kinds of state in Flink: `Keyed State` and `Operator State`. +Flink 中有两种基本的状态:`Keyed State` 和 `Operator State` ### Keyed State -*Keyed State* is always relative to keys and can only be used in functions and operators on a `KeyedStream`. +*Keyed State* 通常和键相关,仅可使用在 `KeyedStream` 的方法和算子中。 -You can think of Keyed State as Operator State that has been partitioned, -or sharded, with exactly one state-partition per key. -Each keyed-state is logically bound to a unique -composite of , and since each key -"belongs" to exactly one parallel instance of a keyed operator, we can -think of this simply as . +你可以把 Keyed State 看作分区或者共享的 Operator State, 而且每个键仅出现在一个分区内。 +逻辑上每个 keyed-state 和 <算子并发实例, key> 相绑定,由于每个 key 仅"属于" +算子的一个并发,因此简化为 <算子, key>。 -Keyed State is further organized into so-called *Key Groups*. Key Groups are the -atomic unit by which Flink can redistribute Keyed State; -there are exactly as many Key Groups as the defined maximum parallelism. -During execution each parallel instance of a keyed operator works with the keys -for one or more Key Groups. +Keyed State 会按照 *Key Group* 进行管理。Key Group 是 Flink 分发 Keyed State 的最小单元; +Key Groups 的数目等于作业的最大并发数。在执行过程中,每个 keyed operator 会对应到一个或多个 Key Group ### Operator State -With *Operator State* (or *non-keyed state*), each operator state is -bound to one parallel operator instance. -The [Kafka Connector]({{ site.baseurl }}/dev/connectors/kafka.html) is a good motivating example for the use of Operator State -in Flink. Each parallel instance of the Kafka consumer maintains a map -of topic partitions and offsets as its Operator State. +对于 *Operator State* (或者 *non-keyed state*) 来说,每个 operator state 和一个并发实例进行绑定。 +[Kafka Connector]({{ site.baseurl }}/zh/dev/connectors/kafka.html) 是 Flink 中使用 operator state 的一个很好的示例。 +每个 Kafka 消费者的并发在 Operator State 中维护一个 topic partition 到 offset 的映射关系。 -The Operator State interfaces support redistributing state among -parallel operator instances when the parallelism is changed. There can be different schemes for doing this redistribution. +Operator State 在 Flink 作业的并发改变后,会重新分发状态,分发的策略和 Keyed State 不一样。 ## Raw and Managed State -*Keyed State* and *Operator State* exist in two forms: *managed* and *raw*. +*Keyed State* 和 *Operator State* 分别有两种存在形式:*managed* and *raw*. -*Managed State* is represented in data structures controlled by the Flink runtime, such as internal hash tables, or RocksDB. -Examples are "ValueState", "ListState", etc. Flink's runtime encodes -the states and writes them into the checkpoints. +*Managed State* 有 Flink runtime 中的数据结构所控制,比如内部的 hash table 或者 RocksDB。 Review comment: `有 Flink runtime ` --> `由 Flink runtime ` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] klion26 commented on a change in pull request #8341: [FLINK-11633][docs-zh] Translate "Working with state" into Chinese
klion26 commented on a change in pull request #8341: [FLINK-11633][docs-zh] Translate "Working with state" into Chinese URL: https://github.com/apache/flink/pull/8341#discussion_r280979940 ## File path: docs/dev/stream/state/state.zh.md ## @@ -22,122 +22,87 @@ specific language governing permissions and limitations under the License. --> -This document explains how to use Flink's state abstractions when developing an application. - -* ToC +本文档主要介绍如何在 Flink 作业中使用状态 +* 目录 {:toc} ## Keyed State and Operator State -There are two basic kinds of state in Flink: `Keyed State` and `Operator State`. +Flink 中有两种基本的状态:`Keyed State` 和 `Operator State` ### Keyed State -*Keyed State* is always relative to keys and can only be used in functions and operators on a `KeyedStream`. +*Keyed State* 通常和键相关,仅可使用在 `KeyedStream` 的方法和算子中。 -You can think of Keyed State as Operator State that has been partitioned, -or sharded, with exactly one state-partition per key. -Each keyed-state is logically bound to a unique -composite of , and since each key -"belongs" to exactly one parallel instance of a keyed operator, we can -think of this simply as . +你可以把 Keyed State 看作分区或者共享的 Operator State, 而且每个键仅出现在一个分区内。 +逻辑上每个 keyed-state 和 <算子并发实例, key> 相绑定,由于每个 key 仅"属于" +算子的一个并发,因此简化为 <算子, key>。 -Keyed State is further organized into so-called *Key Groups*. Key Groups are the -atomic unit by which Flink can redistribute Keyed State; -there are exactly as many Key Groups as the defined maximum parallelism. -During execution each parallel instance of a keyed operator works with the keys -for one or more Key Groups. +Keyed State 会按照 *Key Group* 进行管理。Key Group 是 Flink 分发 Keyed State 的最小单元; +Key Groups 的数目等于作业的最大并发数。在执行过程中,每个 keyed operator 会对应到一个或多个 Key Group ### Operator State -With *Operator State* (or *non-keyed state*), each operator state is -bound to one parallel operator instance. -The [Kafka Connector]({{ site.baseurl }}/dev/connectors/kafka.html) is a good motivating example for the use of Operator State -in Flink. Each parallel instance of the Kafka consumer maintains a map -of topic partitions and offsets as its Operator State. +对于 *Operator State* (或者 *non-keyed state*) 来说,每个 operator state 和一个并发实例进行绑定。 +[Kafka Connector]({{ site.baseurl }}/zh/dev/connectors/kafka.html) 是 Flink 中使用 operator state 的一个很好的示例。 Review comment: `是 Flink 中使用 operator state 的一个很好的示例。` --> `是 Flink 中一个使用 operator state 的好示例。` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] klion26 commented on a change in pull request #8341: [FLINK-11633][docs-zh] Translate "Working with state" into Chinese
klion26 commented on a change in pull request #8341: [FLINK-11633][docs-zh] Translate "Working with state" into Chinese URL: https://github.com/apache/flink/pull/8341#discussion_r280979906 ## File path: docs/dev/stream/state/state.zh.md ## @@ -22,122 +22,87 @@ specific language governing permissions and limitations under the License. --> -This document explains how to use Flink's state abstractions when developing an application. - -* ToC +本文档主要介绍如何在 Flink 作业中使用状态 +* 目录 {:toc} ## Keyed State and Operator State -There are two basic kinds of state in Flink: `Keyed State` and `Operator State`. +Flink 中有两种基本的状态:`Keyed State` 和 `Operator State` ### Keyed State -*Keyed State* is always relative to keys and can only be used in functions and operators on a `KeyedStream`. +*Keyed State* 通常和键相关,仅可使用在 `KeyedStream` 的方法和算子中。 -You can think of Keyed State as Operator State that has been partitioned, -or sharded, with exactly one state-partition per key. -Each keyed-state is logically bound to a unique -composite of , and since each key -"belongs" to exactly one parallel instance of a keyed operator, we can -think of this simply as . +你可以把 Keyed State 看作分区或者共享的 Operator State, 而且每个键仅出现在一个分区内。 +逻辑上每个 keyed-state 和 <算子并发实例, key> 相绑定,由于每个 key 仅"属于" +算子的一个并发,因此简化为 <算子, key>。 -Keyed State is further organized into so-called *Key Groups*. Key Groups are the -atomic unit by which Flink can redistribute Keyed State; -there are exactly as many Key Groups as the defined maximum parallelism. -During execution each parallel instance of a keyed operator works with the keys -for one or more Key Groups. +Keyed State 会按照 *Key Group* 进行管理。Key Group 是 Flink 分发 Keyed State 的最小单元; +Key Groups 的数目等于作业的最大并发数。在执行过程中,每个 keyed operator 会对应到一个或多个 Key Group Review comment: `Key Groups` can be `Key Group`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] klion26 commented on a change in pull request #8341: [FLINK-11633][docs-zh] Translate "Working with state" into Chinese
klion26 commented on a change in pull request #8341: [FLINK-11633][docs-zh] Translate "Working with state" into Chinese URL: https://github.com/apache/flink/pull/8341#discussion_r280980096 ## File path: docs/dev/stream/state/state.zh.md ## @@ -22,122 +22,87 @@ specific language governing permissions and limitations under the License. --> -This document explains how to use Flink's state abstractions when developing an application. - -* ToC +本文档主要介绍如何在 Flink 作业中使用状态 +* 目录 {:toc} ## Keyed State and Operator State -There are two basic kinds of state in Flink: `Keyed State` and `Operator State`. +Flink 中有两种基本的状态:`Keyed State` 和 `Operator State` ### Keyed State -*Keyed State* is always relative to keys and can only be used in functions and operators on a `KeyedStream`. +*Keyed State* 通常和键相关,仅可使用在 `KeyedStream` 的方法和算子中。 -You can think of Keyed State as Operator State that has been partitioned, -or sharded, with exactly one state-partition per key. -Each keyed-state is logically bound to a unique -composite of , and since each key -"belongs" to exactly one parallel instance of a keyed operator, we can -think of this simply as . +你可以把 Keyed State 看作分区或者共享的 Operator State, 而且每个键仅出现在一个分区内。 +逻辑上每个 keyed-state 和 <算子并发实例, key> 相绑定,由于每个 key 仅"属于" +算子的一个并发,因此简化为 <算子, key>。 -Keyed State is further organized into so-called *Key Groups*. Key Groups are the -atomic unit by which Flink can redistribute Keyed State; -there are exactly as many Key Groups as the defined maximum parallelism. -During execution each parallel instance of a keyed operator works with the keys -for one or more Key Groups. +Keyed State 会按照 *Key Group* 进行管理。Key Group 是 Flink 分发 Keyed State 的最小单元; +Key Groups 的数目等于作业的最大并发数。在执行过程中,每个 keyed operator 会对应到一个或多个 Key Group ### Operator State -With *Operator State* (or *non-keyed state*), each operator state is -bound to one parallel operator instance. -The [Kafka Connector]({{ site.baseurl }}/dev/connectors/kafka.html) is a good motivating example for the use of Operator State -in Flink. Each parallel instance of the Kafka consumer maintains a map -of topic partitions and offsets as its Operator State. +对于 *Operator State* (或者 *non-keyed state*) 来说,每个 operator state 和一个并发实例进行绑定。 +[Kafka Connector]({{ site.baseurl }}/zh/dev/connectors/kafka.html) 是 Flink 中使用 operator state 的一个很好的示例。 +每个 Kafka 消费者的并发在 Operator State 中维护一个 topic partition 到 offset 的映射关系。 -The Operator State interfaces support redistributing state among -parallel operator instances when the parallelism is changed. There can be different schemes for doing this redistribution. +Operator State 在 Flink 作业的并发改变后,会重新分发状态,分发的策略和 Keyed State 不一样。 ## Raw and Managed State -*Keyed State* and *Operator State* exist in two forms: *managed* and *raw*. +*Keyed State* 和 *Operator State* 分别有两种存在形式:*managed* and *raw*. -*Managed State* is represented in data structures controlled by the Flink runtime, such as internal hash tables, or RocksDB. -Examples are "ValueState", "ListState", etc. Flink's runtime encodes -the states and writes them into the checkpoints. +*Managed State* 有 Flink runtime 中的数据结构所控制,比如内部的 hash table 或者 RocksDB。 +比如 "ValueState", "ListState" 等。Flink runtime 会对这些状态进行编码并写入 checkpoint。 -*Raw State* is state that operators keep in their own data structures. When checkpointed, they only write a sequence of bytes into -the checkpoint. Flink knows nothing about the state's data structures and sees only the raw bytes. +*Raw State* 则保存在算子自己的数据结构中。checkpoint 的时候,Flink 并不知晓具体的内容,仅仅写入一串字节序列到 checkpoint。 -All datastream functions can use managed state, but the raw state interfaces can only be used when implementing operators. -Using managed state (rather than raw state) is recommended, since with -managed state Flink is able to automatically redistribute state when the parallelism is -changed, and also do better memory management. +所有 datastream 的方法都可以使用 managed state, 但是 raw state 则只能在实现算子的时候使用。 +由于 Flink 可以在修改并发是更好的重新分发状态数据,并且能够更好的管理内存,因此建议使用 managed state(而不是 raw state)。 -Attention If your managed state needs custom serialization logic, please see -the [corresponding guide](custom_serialization.html) in order to ensure future compatibility. Flink's default serializers -don't need special treatment. +注意 如果你的 managed state 需要定制化的序列化逻辑, +为了后续的兼容性请参考 [corresponding guide](custom_serialization.html),Flink 默认提供的序列化器不需要用户做特殊的处理。 Review comment: `corresponding guide` --> `相应指南` `Flink 默认提供的序列化器` --> `Flink 的默认序列化器` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] klion26 commented on a change in pull request #8341: [FLINK-11633][docs-zh] Translate "Working with state" into Chinese
klion26 commented on a change in pull request #8341: [FLINK-11633][docs-zh] Translate "Working with state" into Chinese URL: https://github.com/apache/flink/pull/8341#discussion_r280979970 ## File path: docs/dev/stream/state/state.zh.md ## @@ -22,122 +22,87 @@ specific language governing permissions and limitations under the License. --> -This document explains how to use Flink's state abstractions when developing an application. - -* ToC +本文档主要介绍如何在 Flink 作业中使用状态 +* 目录 {:toc} ## Keyed State and Operator State -There are two basic kinds of state in Flink: `Keyed State` and `Operator State`. +Flink 中有两种基本的状态:`Keyed State` 和 `Operator State` ### Keyed State -*Keyed State* is always relative to keys and can only be used in functions and operators on a `KeyedStream`. +*Keyed State* 通常和键相关,仅可使用在 `KeyedStream` 的方法和算子中。 -You can think of Keyed State as Operator State that has been partitioned, -or sharded, with exactly one state-partition per key. -Each keyed-state is logically bound to a unique -composite of , and since each key -"belongs" to exactly one parallel instance of a keyed operator, we can -think of this simply as . +你可以把 Keyed State 看作分区或者共享的 Operator State, 而且每个键仅出现在一个分区内。 +逻辑上每个 keyed-state 和 <算子并发实例, key> 相绑定,由于每个 key 仅"属于" +算子的一个并发,因此简化为 <算子, key>。 -Keyed State is further organized into so-called *Key Groups*. Key Groups are the -atomic unit by which Flink can redistribute Keyed State; -there are exactly as many Key Groups as the defined maximum parallelism. -During execution each parallel instance of a keyed operator works with the keys -for one or more Key Groups. +Keyed State 会按照 *Key Group* 进行管理。Key Group 是 Flink 分发 Keyed State 的最小单元; +Key Groups 的数目等于作业的最大并发数。在执行过程中,每个 keyed operator 会对应到一个或多个 Key Group ### Operator State -With *Operator State* (or *non-keyed state*), each operator state is -bound to one parallel operator instance. -The [Kafka Connector]({{ site.baseurl }}/dev/connectors/kafka.html) is a good motivating example for the use of Operator State -in Flink. Each parallel instance of the Kafka consumer maintains a map -of topic partitions and offsets as its Operator State. +对于 *Operator State* (或者 *non-keyed state*) 来说,每个 operator state 和一个并发实例进行绑定。 +[Kafka Connector]({{ site.baseurl }}/zh/dev/connectors/kafka.html) 是 Flink 中使用 operator state 的一个很好的示例。 +每个 Kafka 消费者的并发在 Operator State 中维护一个 topic partition 到 offset 的映射关系。 -The Operator State interfaces support redistributing state among -parallel operator instances when the parallelism is changed. There can be different schemes for doing this redistribution. +Operator State 在 Flink 作业的并发改变后,会重新分发状态,分发的策略和 Keyed State 不一样。 Review comment: `Operator State 在 Flink 作业的并发改变后` --> `Operator State 在 Flink 作业改变并发后` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] klion26 commented on a change in pull request #8341: [FLINK-11633][docs-zh] Translate "Working with state" into Chinese
klion26 commented on a change in pull request #8341: [FLINK-11633][docs-zh] Translate "Working with state" into Chinese URL: https://github.com/apache/flink/pull/8341#discussion_r280980068 ## File path: docs/dev/stream/state/state.zh.md ## @@ -22,122 +22,87 @@ specific language governing permissions and limitations under the License. --> -This document explains how to use Flink's state abstractions when developing an application. - -* ToC +本文档主要介绍如何在 Flink 作业中使用状态 +* 目录 {:toc} ## Keyed State and Operator State -There are two basic kinds of state in Flink: `Keyed State` and `Operator State`. +Flink 中有两种基本的状态:`Keyed State` 和 `Operator State` ### Keyed State -*Keyed State* is always relative to keys and can only be used in functions and operators on a `KeyedStream`. +*Keyed State* 通常和键相关,仅可使用在 `KeyedStream` 的方法和算子中。 -You can think of Keyed State as Operator State that has been partitioned, -or sharded, with exactly one state-partition per key. -Each keyed-state is logically bound to a unique -composite of , and since each key -"belongs" to exactly one parallel instance of a keyed operator, we can -think of this simply as . +你可以把 Keyed State 看作分区或者共享的 Operator State, 而且每个键仅出现在一个分区内。 +逻辑上每个 keyed-state 和 <算子并发实例, key> 相绑定,由于每个 key 仅"属于" +算子的一个并发,因此简化为 <算子, key>。 -Keyed State is further organized into so-called *Key Groups*. Key Groups are the -atomic unit by which Flink can redistribute Keyed State; -there are exactly as many Key Groups as the defined maximum parallelism. -During execution each parallel instance of a keyed operator works with the keys -for one or more Key Groups. +Keyed State 会按照 *Key Group* 进行管理。Key Group 是 Flink 分发 Keyed State 的最小单元; +Key Groups 的数目等于作业的最大并发数。在执行过程中,每个 keyed operator 会对应到一个或多个 Key Group ### Operator State -With *Operator State* (or *non-keyed state*), each operator state is -bound to one parallel operator instance. -The [Kafka Connector]({{ site.baseurl }}/dev/connectors/kafka.html) is a good motivating example for the use of Operator State -in Flink. Each parallel instance of the Kafka consumer maintains a map -of topic partitions and offsets as its Operator State. +对于 *Operator State* (或者 *non-keyed state*) 来说,每个 operator state 和一个并发实例进行绑定。 +[Kafka Connector]({{ site.baseurl }}/zh/dev/connectors/kafka.html) 是 Flink 中使用 operator state 的一个很好的示例。 +每个 Kafka 消费者的并发在 Operator State 中维护一个 topic partition 到 offset 的映射关系。 -The Operator State interfaces support redistributing state among -parallel operator instances when the parallelism is changed. There can be different schemes for doing this redistribution. +Operator State 在 Flink 作业的并发改变后,会重新分发状态,分发的策略和 Keyed State 不一样。 ## Raw and Managed State -*Keyed State* and *Operator State* exist in two forms: *managed* and *raw*. +*Keyed State* 和 *Operator State* 分别有两种存在形式:*managed* and *raw*. -*Managed State* is represented in data structures controlled by the Flink runtime, such as internal hash tables, or RocksDB. -Examples are "ValueState", "ListState", etc. Flink's runtime encodes -the states and writes them into the checkpoints. +*Managed State* 有 Flink runtime 中的数据结构所控制,比如内部的 hash table 或者 RocksDB。 +比如 "ValueState", "ListState" 等。Flink runtime 会对这些状态进行编码并写入 checkpoint。 -*Raw State* is state that operators keep in their own data structures. When checkpointed, they only write a sequence of bytes into -the checkpoint. Flink knows nothing about the state's data structures and sees only the raw bytes. +*Raw State* 则保存在算子自己的数据结构中。checkpoint 的时候,Flink 并不知晓具体的内容,仅仅写入一串字节序列到 checkpoint。 -All datastream functions can use managed state, but the raw state interfaces can only be used when implementing operators. -Using managed state (rather than raw state) is recommended, since with -managed state Flink is able to automatically redistribute state when the parallelism is -changed, and also do better memory management. +所有 datastream 的方法都可以使用 managed state, 但是 raw state 则只能在实现算子的时候使用。 +由于 Flink 可以在修改并发是更好的重新分发状态数据,并且能够更好的管理内存,因此建议使用 managed state(而不是 raw state)。 Review comment: `由于 Flink 可以在修改并发是更好的重新分发状态数据` --> `由于 Flink 可以在修改并发时更好的分发状态数据` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] klion26 commented on a change in pull request #8341: [FLINK-11633][docs-zh] Translate "Working with state" into Chinese
klion26 commented on a change in pull request #8341: [FLINK-11633][docs-zh] Translate "Working with state" into Chinese URL: https://github.com/apache/flink/pull/8341#discussion_r280979872 ## File path: docs/dev/stream/state/state.zh.md ## @@ -22,122 +22,87 @@ specific language governing permissions and limitations under the License. --> -This document explains how to use Flink's state abstractions when developing an application. - -* ToC +本文档主要介绍如何在 Flink 作业中使用状态 +* 目录 {:toc} ## Keyed State and Operator State -There are two basic kinds of state in Flink: `Keyed State` and `Operator State`. +Flink 中有两种基本的状态:`Keyed State` 和 `Operator State` ### Keyed State -*Keyed State* is always relative to keys and can only be used in functions and operators on a `KeyedStream`. +*Keyed State* 通常和键相关,仅可使用在 `KeyedStream` 的方法和算子中。 -You can think of Keyed State as Operator State that has been partitioned, -or sharded, with exactly one state-partition per key. -Each keyed-state is logically bound to a unique -composite of , and since each key -"belongs" to exactly one parallel instance of a keyed operator, we can -think of this simply as . +你可以把 Keyed State 看作分区或者共享的 Operator State, 而且每个键仅出现在一个分区内。 +逻辑上每个 keyed-state 和 <算子并发实例, key> 相绑定,由于每个 key 仅"属于" Review comment: maybe `逻辑上每个 keyed-state 和 <算子并发实例, key> 相绑定` can be `逻辑上每个 keyed-state 和 <算子并发实例, key> 绑定` `由于每个 key 仅"属于"` --> `另外每个 key 仅"属于"` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-12400) NullpointerException using SimpleStringSchema with Kafka
Pierre Zemb created FLINK-12400: --- Summary: NullpointerException using SimpleStringSchema with Kafka Key: FLINK-12400 URL: https://issues.apache.org/jira/browse/FLINK-12400 Project: Flink Issue Type: Improvement Components: API / Type Serialization System Affects Versions: 1.8.0, 1.7.2 Environment: Flink 1.7.2 job on 1.8 cluster Kafka 0.10 with a topic in log-compaction Reporter: Pierre Zemb Assignee: Pierre Zemb Hi! Yesterday, we saw a strange behavior with our Flink job and Kafka. We are consuming a Kafka topic setup in [log-compaction|https://kafka.apache.org/documentation/#compaction] mode. As such, sending a message with a null payload acts like a tombstone. We are consuming Kafka like this: {code:java} new FlinkKafkaConsumer010<> ("topic", new SimpleStringSchema(), this.kafkaProperties) {code} When we sent the message, job failed because of a NullPointerException [here|https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/serialization/SimpleStringSchema.java#L75]. `byte[] message` was null, causing the NPE. We forked the class and added a basic nullable check, returning null if so. It fixed our issue. Should we add it to the main class? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] PierreZ commented on a change in pull request #8272: [FLINK-12274] Fix documentation to enable Queryable State
PierreZ commented on a change in pull request #8272: [FLINK-12274] Fix documentation to enable Queryable State URL: https://github.com/apache/flink/pull/8272#discussion_r280978158 ## File path: docs/dev/stream/state/queryable_state.md ## @@ -69,10 +69,12 @@ response back to the client. ## Activating Queryable State -To enable queryable state on your Flink cluster, you just have to copy the -`flink-queryable-state-runtime{{ site.scala_version_suffix }}-{{site.version }}.jar` +To enable queryable state on your Flink cluster, you need to do the following: + + 1. copy the `flink-queryable-state-runtime{{ site.scala_version_suffix }}-{{site.version }}.jar` from the `opt/` folder of your [Flink distribution](https://flink.apache.org/downloads.html "Apache Flink: Downloads"), -to the `lib/` folder. Otherwise, the queryable state feature is not enabled. +to the `lib/` folder. + 2. set the property `queryable-state.enable` to `true` in `./conf/flink-conf.yaml`. See the [Configuration]({{ site.baseurl }}/ops/config.html#queryable-state) documentation for details and additional parameters. Review comment: Thanks to both of you, I'm dropping the last part of the sentence! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-12392) FlinkRelMetadataQuery does not compile with Scala 2.12
[ https://issues.apache.org/jira/browse/FLINK-12392?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16833022#comment-16833022 ] Jark Wu commented on FLINK-12392: - cc [~godfreyhe] > FlinkRelMetadataQuery does not compile with Scala 2.12 > -- > > Key: FLINK-12392 > URL: https://issues.apache.org/jira/browse/FLINK-12392 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.9.0 >Reporter: Chesnay Schepler >Priority: Major > Fix For: 1.9.0 > > > {code} > 10:57:51.770 [ERROR] > /home/travis/build/apache/flink/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMetadataQuery.scala:52: > error: value EMPTY in class RelMetadataQuery cannot be accessed in object > org.apache.calcite.rel.metadata.RelMetadataQuery > 10:57:51.770 [ERROR] Access to protected value EMPTY not permitted because > 10:57:51.770 [ERROR] enclosing package metadata in package plan is not a > subclass of > 10:57:51.770 [ERROR] class RelMetadataQuery in package metadata where target > is defined > 10:57:51.770 [ERROR] this(RelMetadataQuery.THREAD_PROVIDERS.get, > RelMetadataQuery.EMPTY) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] klion26 commented on issue #8341: [FLINK-11633][docs-zh] Translate "Working with state" into Chinese
klion26 commented on issue #8341: [FLINK-11633][docs-zh] Translate "Working with state" into Chinese URL: https://github.com/apache/flink/pull/8341#issuecomment-489305163 This page contains many pieces of information, so I'll recheck the translation again, and any reviews are welcome. @wuchong This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-11633) Translate "Working with State" into Chinese
[ https://issues.apache.org/jira/browse/FLINK-11633?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-11633: --- Labels: pull-request-available (was: ) > Translate "Working with State" into Chinese > --- > > Key: FLINK-11633 > URL: https://issues.apache.org/jira/browse/FLINK-11633 > Project: Flink > Issue Type: Sub-task > Components: chinese-translation, Documentation >Reporter: Congxian Qiu(klion26) >Assignee: Congxian Qiu(klion26) >Priority: Major > Labels: pull-request-available > > Doc locates in flink/doc/dev/state/state.md -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] flinkbot commented on issue #8341: [FLINK-11633][docs-zh] Translate "Working with state" into Chinese
flinkbot commented on issue #8341: [FLINK-11633][docs-zh] Translate "Working with state" into Chinese URL: https://github.com/apache/flink/pull/8341#issuecomment-489305060 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] klion26 opened a new pull request #8341: [FLINK-11633][docs-zh] Translate "Working with state" into Chinese
klion26 opened a new pull request #8341: [FLINK-11633][docs-zh] Translate "Working with state" into Chinese URL: https://github.com/apache/flink/pull/8341 ## What is the purpose of the change Translate "Working with state" into Chinese ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] Xeli commented on a change in pull request #8259: [FLINK-12325][metrics] Fix bug in statsd exporter when using negative values
Xeli commented on a change in pull request #8259: [FLINK-12325][metrics] Fix bug in statsd exporter when using negative values URL: https://github.com/apache/flink/pull/8259#discussion_r280971043 ## File path: flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java ## @@ -240,12 +240,6 @@ public String filterCharacters(String input) { } private boolean numberIsNegative(Number input) { - try { - return new BigDecimal(input.toString()).compareTo(BigDecimal.ZERO) == -1; - } catch (Exception e) { - //not all Number's can be converted to a BigDecimal, such as Infinity or NaN - //in this case we just say it isn't a negative number - return false; - } + return input.doubleValue() < 0; Review comment: Ah sorry, no reason! I've changed it to Double.compare(), I've also changed line #190 to use this method so the double check there works in the same way. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services