KarmaGYZ commented on a change in pull request #11855:
URL: https://github.com/apache/flink/pull/11855#discussion_r412789878



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/IntermediateResultPartitionID.java
##########
@@ -27,29 +28,76 @@
 /**
  * Id identifying {@link IntermediateResultPartition}.
  */
-public class IntermediateResultPartitionID extends AbstractID implements 
ResultID {
+public class IntermediateResultPartitionID implements 
Comparable<IntermediateResultPartitionID>, ResultID, java.io.Serializable {
 
        private static final long serialVersionUID = 1L;
 
+       private final IntermediateDataSetID intermediateDataSetID;
+       private final int partitionNum;
+
        /**
-        * Creates an new random intermediate result partition ID.
+        * Creates an new random intermediate result partition ID for testing.
         */
+       @VisibleForTesting
        public IntermediateResultPartitionID() {
-               super();
+               this.partitionNum = -1;
+               this.intermediateDataSetID = new IntermediateDataSetID(new 
AbstractID());
        }
 
-       public IntermediateResultPartitionID(long lowerPart, long upperPart) {
-               super(lowerPart, upperPart);
+       /**
+        * Creates an new intermediate result partition ID with {@link 
IntermediateDataSetID} and the partitionNum.
+        */
+       public IntermediateResultPartitionID(IntermediateDataSetID 
intermediateDataSetID, int partitionNum) {
+               this.intermediateDataSetID = intermediateDataSetID;
+               this.partitionNum = partitionNum;
        }
 
        public void writeTo(ByteBuf buf) {
-               buf.writeLong(this.lowerPart);
-               buf.writeLong(this.upperPart);
+               buf.writeLong(intermediateDataSetID.getLowerPart());
+               buf.writeLong(intermediateDataSetID.getUpperPart());
+               buf.writeInt(partitionNum);
        }
 
        public static IntermediateResultPartitionID fromByteBuf(ByteBuf buf) {
-               long lower = buf.readLong();
-               long upper = buf.readLong();
-               return new IntermediateResultPartitionID(lower, upper);
+               final long lower = buf.readLong();
+               final long upper = buf.readLong();
+               final int partitionNum = buf.readInt();
+               final IntermediateDataSetID intermediateDataSetID = new 
IntermediateDataSetID(new AbstractID(lower, upper));
+               return new IntermediateResultPartitionID(intermediateDataSetID, 
partitionNum);
+       }
+
+       @Override
+       public boolean equals(Object obj) {
+               if (obj == this) {
+                       return true;
+               } else if (obj != null && obj.getClass() == getClass()) {
+                       IntermediateResultPartitionID that = 
(IntermediateResultPartitionID) obj;
+                       return that.intermediateDataSetID.getLowerPart() == 
this.intermediateDataSetID.getLowerPart()

Review comment:
       I think you mean `intermediateDataSetID#equals`.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/IntermediateResultPartitionID.java
##########
@@ -27,29 +28,76 @@
 /**
  * Id identifying {@link IntermediateResultPartition}.
  */
-public class IntermediateResultPartitionID extends AbstractID implements 
ResultID {
+public class IntermediateResultPartitionID implements 
Comparable<IntermediateResultPartitionID>, ResultID, java.io.Serializable {
 
        private static final long serialVersionUID = 1L;
 
+       private final IntermediateDataSetID intermediateDataSetID;
+       private final int partitionNum;
+
        /**
-        * Creates an new random intermediate result partition ID.
+        * Creates an new random intermediate result partition ID for testing.
         */
+       @VisibleForTesting
        public IntermediateResultPartitionID() {
-               super();
+               this.partitionNum = -1;
+               this.intermediateDataSetID = new IntermediateDataSetID(new 
AbstractID());
        }
 
-       public IntermediateResultPartitionID(long lowerPart, long upperPart) {
-               super(lowerPart, upperPart);
+       /**
+        * Creates an new intermediate result partition ID with {@link 
IntermediateDataSetID} and the partitionNum.
+        */
+       public IntermediateResultPartitionID(IntermediateDataSetID 
intermediateDataSetID, int partitionNum) {
+               this.intermediateDataSetID = intermediateDataSetID;
+               this.partitionNum = partitionNum;
        }
 
        public void writeTo(ByteBuf buf) {
-               buf.writeLong(this.lowerPart);
-               buf.writeLong(this.upperPart);
+               buf.writeLong(intermediateDataSetID.getLowerPart());
+               buf.writeLong(intermediateDataSetID.getUpperPart());
+               buf.writeInt(partitionNum);
        }
 
        public static IntermediateResultPartitionID fromByteBuf(ByteBuf buf) {
-               long lower = buf.readLong();
-               long upper = buf.readLong();
-               return new IntermediateResultPartitionID(lower, upper);
+               final long lower = buf.readLong();
+               final long upper = buf.readLong();
+               final int partitionNum = buf.readInt();
+               final IntermediateDataSetID intermediateDataSetID = new 
IntermediateDataSetID(new AbstractID(lower, upper));
+               return new IntermediateResultPartitionID(intermediateDataSetID, 
partitionNum);
+       }
+
+       @Override
+       public boolean equals(Object obj) {
+               if (obj == this) {
+                       return true;
+               } else if (obj != null && obj.getClass() == getClass()) {
+                       IntermediateResultPartitionID that = 
(IntermediateResultPartitionID) obj;
+                       return that.intermediateDataSetID.getLowerPart() == 
this.intermediateDataSetID.getLowerPart()
+                               && that.intermediateDataSetID.getUpperPart() == 
this.intermediateDataSetID.getUpperPart()
+                               && that.partitionNum == this.partitionNum;
+               } else {
+                       return false;
+               }
+       }
+
+       @Override
+       public int hashCode() {
+               return ((int)  this.intermediateDataSetID.getLowerPart()) ^

Review comment:
       ditto




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to