[ https://issues.apache.org/jira/browse/FLINK-2590?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Martin Junghanns updated FLINK-2590: ------------------------------------ Description: The function creates IDs using the following code: {code} shifter = log2(numberOfParallelSubtasks) id = counter << shifter + taskId; }} As the binary function (+) is executed before the bitshift (<<), this results in cases where different tasks create the same ID. It essentially calculates {{ counter*2^(shifter+taskId) }} which is 0 for counter = 0 and all values of shifter and taskID. Consider the following example. numberOfParallelSubtaks = 8 shifter = log2(8) = 4 (maybe rename the function?) produces: {{ start: 1, shifter: 4 taskId: 4 label: 256 start: 2, shifter: 4 taskId: 3 label: 256 start: 4, shifter: 4 taskId: 2 label: 256 }} I would suggest the following: {{ counter*2^(shifter)+taskId }} which in code is equivalent to {{ shifter = log2(numberOfParallelSubtasks); id = (counter << shifter) + taskId; }} and for our example produces: {{ start: 1, shifter: 4 taskId: 4 label: 20 start: 2, shifter: 4 taskId: 3 label: 35 start: 4, shifter: 4 taskId: 2 label: 66 }} So we move the counter to the left and add the task id. As there is space for 2^shifter numbers, this prevents collisions. was: The function creates IDs using the following code: {{ shifter = log2(numberOfParallelSubtasks) id = counter << shifter + taskId; }} As the binary function (+) is executed before the bitshift (<<), this results in cases where different tasks create the same ID. It essentially calculates {{ counter*2^(shifter+taskId) }} which is 0 for counter = 0 and all values of shifter and taskID. Consider the following example. numberOfParallelSubtaks = 8 shifter = log2(8) = 4 (maybe rename the function?) produces: {{ start: 1, shifter: 4 taskId: 4 label: 256 start: 2, shifter: 4 taskId: 3 label: 256 start: 4, shifter: 4 taskId: 2 label: 256 }} I would suggest the following: {{ counter*2^(shifter)+taskId }} which in code is equivalent to {{ shifter = log2(numberOfParallelSubtasks); id = (counter << shifter) + taskId; }} and for our example produces: {{ start: 1, shifter: 4 taskId: 4 label: 20 start: 2, shifter: 4 taskId: 3 label: 35 start: 4, shifter: 4 taskId: 2 label: 66 }} So we move the counter to the left and add the task id. As there is space for 2^shifter numbers, this prevents collisions. > DataSetUtils.zipWithUniqueID creates duplicate IDs > -------------------------------------------------- > > Key: FLINK-2590 > URL: https://issues.apache.org/jira/browse/FLINK-2590 > Project: Flink > Issue Type: Bug > Components: Java API, Scala API > Affects Versions: master > Reporter: Martin Junghanns > Assignee: Martin Junghanns > Priority: Minor > > The function creates IDs using the following code: > {code} > shifter = log2(numberOfParallelSubtasks) > id = counter << shifter + taskId; > }} > As the binary function (+) is executed before the bitshift (<<), this results > in cases where different tasks create the same ID. It essentially calculates > {{ > counter*2^(shifter+taskId) > }} > which is 0 for counter = 0 and all values of shifter and taskID. > Consider the following example. > numberOfParallelSubtaks = 8 > shifter = log2(8) = 4 (maybe rename the function?) > produces: > {{ > start: 1, shifter: 4 taskId: 4 label: 256 > start: 2, shifter: 4 taskId: 3 label: 256 > start: 4, shifter: 4 taskId: 2 label: 256 > }} > I would suggest the following: > {{ > counter*2^(shifter)+taskId > }} > which in code is equivalent to > {{ > shifter = log2(numberOfParallelSubtasks); > id = (counter << shifter) + taskId; > }} > and for our example produces: > {{ > start: 1, shifter: 4 taskId: 4 label: 20 > start: 2, shifter: 4 taskId: 3 label: 35 > start: 4, shifter: 4 taskId: 2 label: 66 > }} > So we move the counter to the left and add the task id. As there is space for > 2^shifter numbers, this prevents collisions. -- This message was sent by Atlassian JIRA (v6.3.4#6332)