[jira] [Commented] (FLINK-5473) setMaxParallelism() higher than 1 is possible on non-parallel operators

2017-01-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5473?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15834774#comment-15834774
 ] 

ASF GitHub Bot commented on FLINK-5473:
---

Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/3182
  
Rebased.


> setMaxParallelism() higher than 1 is possible on non-parallel operators
> ---
>
> Key: FLINK-5473
> URL: https://issues.apache.org/jira/browse/FLINK-5473
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.2.0
>Reporter: Robert Metzger
>Assignee: Stefan Richter
>
> While trying out Flink 1.2, I found out that you can set a maxParallelism 
> higher than 1 on a non-parallel operator.
> I think we should have the same semantics as the setParallelism() method.
> Also, when setting a global maxParallelism in the execution environment, it 
> will be set as a default value for the non-parallel operator.
> When restoring a savepoint from 1.1, you have to set the maxParallelism to 
> the parallelism of the 1.1 job. Non-parallel operators will then also get the 
> maxPar set to this value, leading to an error on restore.
> So currently, users restoring from 1.1 to 1.2 have to manually set the 
> maxParallelism to 1 for all non-parallel operators.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5473) setMaxParallelism() higher than 1 is possible on non-parallel operators

2017-01-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5473?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15834714#comment-15834714
 ] 

ASF GitHub Bot commented on FLINK-5473:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/3182#discussion_r97337263
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
 ---
@@ -57,47 +57,51 @@
 
/** Use the same log for all ExecutionGraph classes */
private static final Logger LOG = ExecutionGraph.LOG;
-   
-   private final SerializableObject stateMonitor = new 
SerializableObject();
+
+   public static final int VALUE_NOT_SET = -1;
+
+   private final Object stateMonitor = new Object();

private final ExecutionGraph graph;

private final JobVertex jobVertex;

private final ExecutionVertex[] taskVertices;
 
-   private IntermediateResult[] producedDataSets;
+   private final IntermediateResult[] producedDataSets;

private final List inputs;

private final int parallelism;
 
-   private final int maxParallelism;
-   
private final boolean[] finishedSubtasks;
-   
-   private volatile int numSubtasksInFinalState;
-   
+
private final SlotSharingGroup slotSharingGroup;
-   
+
private final CoLocationGroup coLocationGroup;
-   
+
private final InputSplit[] inputSplits;
 
+   private final int maxParallelismConfigured;
+
+   private int maxParallelismDerived;
+
+   private volatile int numSubtasksInFinalState;
+
/**
 * Serialized task information which is for all sub tasks the same. 
Thus, it avoids to
 * serialize the same information multiple times in order to create the
 * TaskDeploymentDescriptors.
 */
-   private final SerializedValue 
serializedTaskInformation;
+   private SerializedValue serializedTaskInformation;
 
private InputSplitAssigner splitAssigner;

public ExecutionJobVertex(
ExecutionGraph graph,
JobVertex jobVertex,
int defaultParallelism,
-   Time timeout) throws JobException, IOException {
+   Time timeout) throws JobException {
--- End diff --

You are right, but I kept the indentation to avoid formatting changes.


> setMaxParallelism() higher than 1 is possible on non-parallel operators
> ---
>
> Key: FLINK-5473
> URL: https://issues.apache.org/jira/browse/FLINK-5473
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.2.0
>Reporter: Robert Metzger
>Assignee: Stefan Richter
>
> While trying out Flink 1.2, I found out that you can set a maxParallelism 
> higher than 1 on a non-parallel operator.
> I think we should have the same semantics as the setParallelism() method.
> Also, when setting a global maxParallelism in the execution environment, it 
> will be set as a default value for the non-parallel operator.
> When restoring a savepoint from 1.1, you have to set the maxParallelism to 
> the parallelism of the 1.1 job. Non-parallel operators will then also get the 
> maxPar set to this value, leading to an error on restore.
> So currently, users restoring from 1.1 to 1.2 have to manually set the 
> maxParallelism to 1 for all non-parallel operators.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5473) setMaxParallelism() higher than 1 is possible on non-parallel operators

2017-01-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5473?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15834718#comment-15834718
 ] 

ASF GitHub Bot commented on FLINK-5473:
---

Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/3182
  
Thanks for the review, @tillrohrmann! I followed all of your suggestions, 
except for the indentation formatting.


> setMaxParallelism() higher than 1 is possible on non-parallel operators
> ---
>
> Key: FLINK-5473
> URL: https://issues.apache.org/jira/browse/FLINK-5473
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.2.0
>Reporter: Robert Metzger
>Assignee: Stefan Richter
>
> While trying out Flink 1.2, I found out that you can set a maxParallelism 
> higher than 1 on a non-parallel operator.
> I think we should have the same semantics as the setParallelism() method.
> Also, when setting a global maxParallelism in the execution environment, it 
> will be set as a default value for the non-parallel operator.
> When restoring a savepoint from 1.1, you have to set the maxParallelism to 
> the parallelism of the 1.1 job. Non-parallel operators will then also get the 
> maxPar set to this value, leading to an error on restore.
> So currently, users restoring from 1.1 to 1.2 have to manually set the 
> maxParallelism to 1 for all non-parallel operators.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5473) setMaxParallelism() higher than 1 is possible on non-parallel operators

2017-01-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5473?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15834376#comment-15834376
 ] 

ASF GitHub Bot commented on FLINK-5473:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/3182#discussion_r97281937
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
 ---
@@ -45,29 +45,37 @@
 
private final Logger logger;
private final Map tasks;
-   private final CompletedCheckpoint latest;
+   private final Map taskStates;
private final boolean allowNonRestoredState;
 
public StateAssignmentOperation(
Logger logger,
Map tasks,
-   CompletedCheckpoint latest,
+   Map taskStates,
boolean allowNonRestoredState) {
 
this.logger = logger;
this.tasks = tasks;
-   this.latest = latest;
+   this.taskStates = taskStates;
this.allowNonRestoredState = allowNonRestoredState;
}
 
public boolean assignStates() throws Exception {
--- End diff --

This method seems a bit lengthy. Maybe we could split it up.


> setMaxParallelism() higher than 1 is possible on non-parallel operators
> ---
>
> Key: FLINK-5473
> URL: https://issues.apache.org/jira/browse/FLINK-5473
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.2.0
>Reporter: Robert Metzger
>Assignee: Stefan Richter
>
> While trying out Flink 1.2, I found out that you can set a maxParallelism 
> higher than 1 on a non-parallel operator.
> I think we should have the same semantics as the setParallelism() method.
> Also, when setting a global maxParallelism in the execution environment, it 
> will be set as a default value for the non-parallel operator.
> When restoring a savepoint from 1.1, you have to set the maxParallelism to 
> the parallelism of the 1.1 job. Non-parallel operators will then also get the 
> maxPar set to this value, leading to an error on restore.
> So currently, users restoring from 1.1 to 1.2 have to manually set the 
> maxParallelism to 1 for all non-parallel operators.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5473) setMaxParallelism() higher than 1 is possible on non-parallel operators

2017-01-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5473?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15834375#comment-15834375
 ] 

ASF GitHub Bot commented on FLINK-5473:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/3182#discussion_r97280170
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
 ---
@@ -45,29 +45,37 @@
 
private final Logger logger;
private final Map tasks;
-   private final CompletedCheckpoint latest;
+   private final Map taskStates;
private final boolean allowNonRestoredState;
 
public StateAssignmentOperation(
Logger logger,
Map tasks,
-   CompletedCheckpoint latest,
+   Map taskStates,
boolean allowNonRestoredState) {
 
this.logger = logger;
this.tasks = tasks;
-   this.latest = latest;
+   this.taskStates = taskStates;
this.allowNonRestoredState = allowNonRestoredState;
--- End diff --

`Precondition` checks could be helpful here.


> setMaxParallelism() higher than 1 is possible on non-parallel operators
> ---
>
> Key: FLINK-5473
> URL: https://issues.apache.org/jira/browse/FLINK-5473
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.2.0
>Reporter: Robert Metzger
>Assignee: Stefan Richter
>
> While trying out Flink 1.2, I found out that you can set a maxParallelism 
> higher than 1 on a non-parallel operator.
> I think we should have the same semantics as the setParallelism() method.
> Also, when setting a global maxParallelism in the execution environment, it 
> will be set as a default value for the non-parallel operator.
> When restoring a savepoint from 1.1, you have to set the maxParallelism to 
> the parallelism of the 1.1 job. Non-parallel operators will then also get the 
> maxPar set to this value, leading to an error on restore.
> So currently, users restoring from 1.1 to 1.2 have to manually set the 
> maxParallelism to 1 for all non-parallel operators.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5473) setMaxParallelism() higher than 1 is possible on non-parallel operators

2017-01-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5473?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15834373#comment-15834373
 ] 

ASF GitHub Bot commented on FLINK-5473:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/3182#discussion_r97293150
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
 ---
@@ -230,9 +237,46 @@ public int getParallelism() {
return parallelism;
}
 
+   /**
+* Returns the effective max parallelism. This value is determined in 
the following order of priority:
+* 
+* (maxParallelismConfigured) overrides (maxParallelismOverride) 
override (max(128, roundUp(parallelism)) / default)
--- End diff --

`maxParallelismOverride` => `maxParallelismDerived`?


> setMaxParallelism() higher than 1 is possible on non-parallel operators
> ---
>
> Key: FLINK-5473
> URL: https://issues.apache.org/jira/browse/FLINK-5473
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.2.0
>Reporter: Robert Metzger
>Assignee: Stefan Richter
>
> While trying out Flink 1.2, I found out that you can set a maxParallelism 
> higher than 1 on a non-parallel operator.
> I think we should have the same semantics as the setParallelism() method.
> Also, when setting a global maxParallelism in the execution environment, it 
> will be set as a default value for the non-parallel operator.
> When restoring a savepoint from 1.1, you have to set the maxParallelism to 
> the parallelism of the 1.1 job. Non-parallel operators will then also get the 
> maxPar set to this value, leading to an error on restore.
> So currently, users restoring from 1.1 to 1.2 have to manually set the 
> maxParallelism to 1 for all non-parallel operators.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5473) setMaxParallelism() higher than 1 is possible on non-parallel operators

2017-01-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5473?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15834372#comment-15834372
 ] 

ASF GitHub Bot commented on FLINK-5473:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/3182#discussion_r97282806
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
 ---
@@ -212,6 +206,19 @@ public ExecutionJobVertex(
finishedSubtasks = new boolean[parallelism];
}
 
+   public void setMaxParallelismDerived(int maxParallelism) {
+
+   Preconditions.checkState(VALUE_NOT_SET == 
maxParallelismConfigured,
+   "Attempt to override a configured max 
parallelism. Configured: " + maxParallelismConfigured
+   + ", argument: " + 
maxParallelism);
+
+   Preconditions.checkArgument(maxParallelism > 0
+   && maxParallelism <= 
KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM,
+   "Overriding max parallelism is not in valid 
bounds: " + maxParallelism);
--- End diff --

Maybe we could add the valid bounds here.


> setMaxParallelism() higher than 1 is possible on non-parallel operators
> ---
>
> Key: FLINK-5473
> URL: https://issues.apache.org/jira/browse/FLINK-5473
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.2.0
>Reporter: Robert Metzger
>Assignee: Stefan Richter
>
> While trying out Flink 1.2, I found out that you can set a maxParallelism 
> higher than 1 on a non-parallel operator.
> I think we should have the same semantics as the setParallelism() method.
> Also, when setting a global maxParallelism in the execution environment, it 
> will be set as a default value for the non-parallel operator.
> When restoring a savepoint from 1.1, you have to set the maxParallelism to 
> the parallelism of the 1.1 job. Non-parallel operators will then also get the 
> maxPar set to this value, leading to an error on restore.
> So currently, users restoring from 1.1 to 1.2 have to manually set the 
> maxParallelism to 1 for all non-parallel operators.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5473) setMaxParallelism() higher than 1 is possible on non-parallel operators

2017-01-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5473?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15834377#comment-15834377
 ] 

ASF GitHub Bot commented on FLINK-5473:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/3182#discussion_r97282598
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
 ---
@@ -57,47 +57,51 @@
 
/** Use the same log for all ExecutionGraph classes */
private static final Logger LOG = ExecutionGraph.LOG;
-   
-   private final SerializableObject stateMonitor = new 
SerializableObject();
+
+   public static final int VALUE_NOT_SET = -1;
+
+   private final Object stateMonitor = new Object();

private final ExecutionGraph graph;

private final JobVertex jobVertex;

private final ExecutionVertex[] taskVertices;
 
-   private IntermediateResult[] producedDataSets;
+   private final IntermediateResult[] producedDataSets;

private final List inputs;

private final int parallelism;
 
-   private final int maxParallelism;
-   
private final boolean[] finishedSubtasks;
-   
-   private volatile int numSubtasksInFinalState;
-   
+
private final SlotSharingGroup slotSharingGroup;
-   
+
private final CoLocationGroup coLocationGroup;
-   
+
private final InputSplit[] inputSplits;
 
+   private final int maxParallelismConfigured;
+
+   private int maxParallelismDerived;
+
+   private volatile int numSubtasksInFinalState;
+
/**
 * Serialized task information which is for all sub tasks the same. 
Thus, it avoids to
 * serialize the same information multiple times in order to create the
 * TaskDeploymentDescriptors.
 */
-   private final SerializedValue 
serializedTaskInformation;
+   private SerializedValue serializedTaskInformation;
 
private InputSplitAssigner splitAssigner;

public ExecutionJobVertex(
ExecutionGraph graph,
JobVertex jobVertex,
int defaultParallelism,
-   Time timeout) throws JobException, IOException {
+   Time timeout) throws JobException {
--- End diff --

Method declaration parameters which are broken into multiple lines are 
usually indented twice.


> setMaxParallelism() higher than 1 is possible on non-parallel operators
> ---
>
> Key: FLINK-5473
> URL: https://issues.apache.org/jira/browse/FLINK-5473
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.2.0
>Reporter: Robert Metzger
>Assignee: Stefan Richter
>
> While trying out Flink 1.2, I found out that you can set a maxParallelism 
> higher than 1 on a non-parallel operator.
> I think we should have the same semantics as the setParallelism() method.
> Also, when setting a global maxParallelism in the execution environment, it 
> will be set as a default value for the non-parallel operator.
> When restoring a savepoint from 1.1, you have to set the maxParallelism to 
> the parallelism of the 1.1 job. Non-parallel operators will then also get the 
> maxPar set to this value, leading to an error on restore.
> So currently, users restoring from 1.1 to 1.2 have to manually set the 
> maxParallelism to 1 for all non-parallel operators.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5473) setMaxParallelism() higher than 1 is possible on non-parallel operators

2017-01-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5473?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15834374#comment-15834374
 ] 

ASF GitHub Bot commented on FLINK-5473:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/3182#discussion_r97283204
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
 ---
@@ -599,7 +602,24 @@ TaskDeploymentDescriptor createDeploymentDescriptor(
boolean lazyScheduling = 
getExecutionGraph().getScheduleMode().allowLazyDeployment();
 
for (IntermediateResultPartition partition : 
resultPartitions.values()) {
-   
producedPartitions.add(ResultPartitionDeploymentDescriptor.from(partition, 
lazyScheduling));
+
+   List> consumers = 
partition.getConsumers();
+
+   if(consumers.isEmpty()) {
--- End diff --

whitespace missing between `if` and `(`


> setMaxParallelism() higher than 1 is possible on non-parallel operators
> ---
>
> Key: FLINK-5473
> URL: https://issues.apache.org/jira/browse/FLINK-5473
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.2.0
>Reporter: Robert Metzger
>Assignee: Stefan Richter
>
> While trying out Flink 1.2, I found out that you can set a maxParallelism 
> higher than 1 on a non-parallel operator.
> I think we should have the same semantics as the setParallelism() method.
> Also, when setting a global maxParallelism in the execution environment, it 
> will be set as a default value for the non-parallel operator.
> When restoring a savepoint from 1.1, you have to set the maxParallelism to 
> the parallelism of the 1.1 job. Non-parallel operators will then also get the 
> maxPar set to this value, leading to an error on restore.
> So currently, users restoring from 1.1 to 1.2 have to manually set the 
> maxParallelism to 1 for all non-parallel operators.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5473) setMaxParallelism() higher than 1 is possible on non-parallel operators

2017-01-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5473?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15834378#comment-15834378
 ] 

ASF GitHub Bot commented on FLINK-5473:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/3182#discussion_r97286408
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
 ---
@@ -94,6 +95,9 @@ public OperatorChain(StreamTask containingTask) {
try {
for (int i = 0; i < outEdgesInOrder.size(); i++) {
StreamEdge outEdge = outEdgesInOrder.get(i);
+
+
+
--- End diff --

One line break would probably be enough here.


> setMaxParallelism() higher than 1 is possible on non-parallel operators
> ---
>
> Key: FLINK-5473
> URL: https://issues.apache.org/jira/browse/FLINK-5473
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.2.0
>Reporter: Robert Metzger
>Assignee: Stefan Richter
>
> While trying out Flink 1.2, I found out that you can set a maxParallelism 
> higher than 1 on a non-parallel operator.
> I think we should have the same semantics as the setParallelism() method.
> Also, when setting a global maxParallelism in the execution environment, it 
> will be set as a default value for the non-parallel operator.
> When restoring a savepoint from 1.1, you have to set the maxParallelism to 
> the parallelism of the 1.1 job. Non-parallel operators will then also get the 
> maxPar set to this value, leading to an error on restore.
> So currently, users restoring from 1.1 to 1.2 have to manually set the 
> maxParallelism to 1 for all non-parallel operators.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5473) setMaxParallelism() higher than 1 is possible on non-parallel operators

2017-01-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5473?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15831941#comment-15831941
 ] 

ASF GitHub Bot commented on FLINK-5473:
---

Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/3182
  
cc @uce 


> setMaxParallelism() higher than 1 is possible on non-parallel operators
> ---
>
> Key: FLINK-5473
> URL: https://issues.apache.org/jira/browse/FLINK-5473
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.2.0
>Reporter: Robert Metzger
>Assignee: Stefan Richter
>
> While trying out Flink 1.2, I found out that you can set a maxParallelism 
> higher than 1 on a non-parallel operator.
> I think we should have the same semantics as the setParallelism() method.
> Also, when setting a global maxParallelism in the execution environment, it 
> will be set as a default value for the non-parallel operator.
> When restoring a savepoint from 1.1, you have to set the maxParallelism to 
> the parallelism of the 1.1 job. Non-parallel operators will then also get the 
> maxPar set to this value, leading to an error on restore.
> So currently, users restoring from 1.1 to 1.2 have to manually set the 
> maxParallelism to 1 for all non-parallel operators.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5473) setMaxParallelism() higher than 1 is possible on non-parallel operators

2017-01-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5473?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15831939#comment-15831939
 ] 

ASF GitHub Bot commented on FLINK-5473:
---

GitHub user StefanRRichter opened a pull request:

https://github.com/apache/flink/pull/3182

[FLINK-5473] Limit MaxParallelism to 1 for non-parallel operators and 
improve choice of max parallelism without explicit configuration

This PR limits the maximum parallelism for non-parallel operator to 1.

Furthermore, this improves the default behaviour if the user did not 
explicitly specify a maximum parallelism. In particular, maximum parallelism 
can now be derived from savepoints, allowing users that migrate from Flink 1.1 
to Flink 1.2 to keep their job unchanged.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/StefanRRichter/flink recover-max-para

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3182.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3182


commit 20d0a3fc88c85a71b692c5408fc7b2fd33da8ff2
Author: Stefan Richter 
Date:   2017-01-16T13:31:22Z

[FLINK-5473] Limit max parallelism to 1 for non-parallel operators

commit f6081f319b7f2ef8615557743d906bc4585445f7
Author: Stefan Richter 
Date:   2017-01-16T17:41:37Z

[FLINK-5473] Better default behaviours for unspecified maximum parallelism




> setMaxParallelism() higher than 1 is possible on non-parallel operators
> ---
>
> Key: FLINK-5473
> URL: https://issues.apache.org/jira/browse/FLINK-5473
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.2.0
>Reporter: Robert Metzger
>Assignee: Stefan Richter
>
> While trying out Flink 1.2, I found out that you can set a maxParallelism 
> higher than 1 on a non-parallel operator.
> I think we should have the same semantics as the setParallelism() method.
> Also, when setting a global maxParallelism in the execution environment, it 
> will be set as a default value for the non-parallel operator.
> When restoring a savepoint from 1.1, you have to set the maxParallelism to 
> the parallelism of the 1.1 job. Non-parallel operators will then also get the 
> maxPar set to this value, leading to an error on restore.
> So currently, users restoring from 1.1 to 1.2 have to manually set the 
> maxParallelism to 1 for all non-parallel operators.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5473) setMaxParallelism() higher than 1 is possible on non-parallel operators

2017-01-12 Thread Robert Metzger (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5473?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15821262#comment-15821262
 ] 

Robert Metzger commented on FLINK-5473:
---

That would be the best user experience, yes.

> setMaxParallelism() higher than 1 is possible on non-parallel operators
> ---
>
> Key: FLINK-5473
> URL: https://issues.apache.org/jira/browse/FLINK-5473
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.2.0
>Reporter: Robert Metzger
>
> While trying out Flink 1.2, I found out that you can set a maxParallelism 
> higher than 1 on a non-parallel operator.
> I think we should have the same semantics as the setParallelism() method.
> Also, when setting a global maxParallelism in the execution environment, it 
> will be set as a default value for the non-parallel operator.
> When restoring a savepoint from 1.1, you have to set the maxParallelism to 
> the parallelism of the 1.1 job. Non-parallel operators will then also get the 
> maxPar set to this value, leading to an error on restore.
> So currently, users restoring from 1.1 to 1.2 have to manually set the 
> maxParallelism to 1 for all non-parallel operators.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5473) setMaxParallelism() higher than 1 is possible on non-parallel operators

2017-01-12 Thread Stefan Richter (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5473?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15821254#comment-15821254
 ] 

Stefan Richter commented on FLINK-5473:
---

We could try to automatically set the old parallelism as max parallelism by 
default.

> setMaxParallelism() higher than 1 is possible on non-parallel operators
> ---
>
> Key: FLINK-5473
> URL: https://issues.apache.org/jira/browse/FLINK-5473
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.2.0
>Reporter: Robert Metzger
>
> While trying out Flink 1.2, I found out that you can set a maxParallelism 
> higher than 1 on a non-parallel operator.
> I think we should have the same semantics as the setParallelism() method.
> Also, when setting a global maxParallelism in the execution environment, it 
> will be set as a default value for the non-parallel operator.
> When restoring a savepoint from 1.1, you have to set the maxParallelism to 
> the parallelism of the 1.1 job. Non-parallel operators will then also get the 
> maxPar set to this value, leading to an error on restore.
> So currently, users restoring from 1.1 to 1.2 have to manually set the 
> maxParallelism to 1 for all non-parallel operators.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)