[
https://issues.apache.org/jira/browse/BEAM-10703?focusedWorklogId=507880&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-507880
]
ASF GitHub Bot logged work on BEAM-10703:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 05/Nov/20 00:28
Start Date: 05/Nov/20 00:28
Worklog Time Spent: 10m
Work Description: nehsyc commented on a change in pull request #13208:
URL: https://github.com/apache/beam/pull/13208#discussion_r517710797
##########
File path:
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/GroupIntoBatchesOverride.java
##########
@@ -103,43 +156,76 @@ public void process(ProcessContext c) {
}
@Override
- public PTransformReplacement<PCollection<KV<K, V>>, PCollection<KV<K,
Iterable<V>>>>
+ public PTransformReplacement<PCollection<KV<K, V>>,
PCollection<KV<ShardedKey<K>, Iterable<V>>>>
getReplacementTransform(
AppliedPTransform<
- PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>,
GroupIntoBatches<K, V>>
+ PCollection<KV<K, V>>,
+ PCollection<KV<ShardedKey<K>, Iterable<V>>>,
+ GroupIntoBatches<K, V>.WithShardedKey>
transform) {
return PTransformReplacement.of(
PTransformReplacements.getSingletonMainInput(transform),
- new StreamingGroupIntoBatches(runner, transform.getTransform()));
+ new StreamingGroupIntoBatchesWithShardedKey<>(runner,
transform.getTransform()));
}
@Override
public Map<PCollection<?>, ReplacementOutput> mapOutputs(
- Map<TupleTag<?>, PCollection<?>> outputs, PCollection<KV<K,
Iterable<V>>> newOutput) {
+ Map<TupleTag<?>, PCollection<?>> outputs,
+ PCollection<KV<ShardedKey<K>, Iterable<V>>> newOutput) {
return ReplacementOutputs.singleton(outputs, newOutput);
}
}
/**
- * Specialized implementation of {@link GroupIntoBatches} for unbounded
Dataflow pipelines. The
- * override does the same thing as the original transform but additionally
record the input to add
- * corresponding properties during the graph translation.
+ * Specialized implementation of {@link GroupIntoBatches.WithShardedKey} for
unbounded Dataflow
+ * pipelines. The override does the same thing as the original transform but
additionally records
+ * the input of {@code GroupIntoBatchesDoFn} in order to append relevant
step properties during
+ * the graph translation.
*/
- static class StreamingGroupIntoBatches<K, V>
- extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K,
Iterable<V>>>> {
+ static class StreamingGroupIntoBatchesWithShardedKey<K, V>
+ extends PTransform<PCollection<KV<K, V>>, PCollection<KV<ShardedKey<K>,
Iterable<V>>>> {
private final transient DataflowRunner runner;
- private final GroupIntoBatches<K, V> original;
+ private final GroupIntoBatches<K, V>.WithShardedKey original;
- public StreamingGroupIntoBatches(DataflowRunner runner,
GroupIntoBatches<K, V> original) {
+ public StreamingGroupIntoBatchesWithShardedKey(
+ DataflowRunner runner, GroupIntoBatches<K, V>.WithShardedKey original)
{
this.runner = runner;
this.original = original;
}
@Override
- public PCollection<KV<K, Iterable<V>>> expand(PCollection<KV<K, V>> input)
{
- runner.maybeRecordPCollectionWithAutoSharding(input);
- return input.apply(original);
+ public PCollection<KV<ShardedKey<K>, Iterable<V>>>
expand(PCollection<KV<K, V>> input) {
+ PCollection<KV<ShardedKey<K>, V>> intermediate_input = ShardKeys(input);
+
+ runner.maybeRecordPCollectionWithAutoSharding(intermediate_input);
+
+ if (original.getMaxBufferingDuration() != null) {
Review comment:
Also, the original now shard the input based on thread ids. For the
override, we probably don't want to pre-shard the input in SDK.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 507880)
Time Spent: 9.5h (was: 9h 20m)
> Add support for auto-sharded GroupIntoBatches in Dataflow runner
> ----------------------------------------------------------------
>
> Key: BEAM-10703
> URL: https://issues.apache.org/jira/browse/BEAM-10703
> Project: Beam
> Issue Type: Improvement
> Components: runner-dataflow
> Reporter: Siyuan Chen
> Assignee: Siyuan Chen
> Priority: P1
> Time Spent: 9.5h
> Remaining Estimate: 0h
>
> The proposal of improving GroupIntoBatches transform is in BEAM-10475
> This tracks the support in Cloud Dataflow Runner.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)