scwhittle commented on code in PR #38783:
URL: https://github.com/apache/beam/pull/38783#discussion_r3362389609


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiLoads.java:
##########
@@ -457,6 +469,37 @@ private void addErrorCollections(
     }
   }
 
+  private static class AddShardKeyFn<DestT, ElemT>
+      extends DoFn<
+          KV<DestT, StorageApiWritePayload>, KV<Integer, KV<DestT, 
StorageApiWritePayload>>> {
+
+    private final StorageApiDynamicDestinations<ElemT, DestT> 
dynamicDestinations;
+    private final int numShards;
+
+    public AddShardKeyFn(
+        StorageApiDynamicDestinations<ElemT, DestT> dynamicDestinations, int 
numShards) {
+      this.dynamicDestinations = dynamicDestinations;
+      this.numShards = numShards;
+    }
+
+    @ProcessElement
+    public void processElement(
+        ProcessContext c,
+        @Element KV<DestT, StorageApiWritePayload> element,
+        OutputReceiver<KV<Integer, KV<DestT, StorageApiWritePayload>>> 
outputReceiver) {
+      dynamicDestinations.setSideInputAccessorFromProcessContext(c);
+
+      String tableUrn = 
dynamicDestinations.getTable(element.getKey()).getShortTableUrn();
+
+      int hash = Hashing.murmur3_32_fixed().hashString(tableUrn, 
StandardCharsets.UTF_8).asInt();
+
+      int shardKey =
+          Math.floorMod(hash + ThreadLocalRandom.current().nextInt(numShards), 
numShards);

Review Comment:
   can you add a comment on what you are trying to do?
   as is I don't see what the hashing is doing because we add a random # to it 
that is the same size as what we are modding.
   
   if you want a given table to go to some particular key, then we shouldn't 
have the randomness. If you want it to go to some subset of keys then it seems 
like the random nextInt should be less than numShards.
   
   
   Would be good to have a test verifying the behavior
   



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiLoads.java:
##########
@@ -379,12 +382,21 @@ public WriteResult expandUntriggered(
     PCollection<KV<DestinationT, StorageApiWritePayload>> 
successfulConvertedRows =
         convertMessagesResult.get(successfulConvertedRowsTag);
 
-    if (numShards > 0) {
+    if (numShards > 0 && input.isBounded() == PCollection.IsBounded.UNBOUNDED) 
{

Review Comment:
   I think this should check if the pipeline is in streaming mode or not 
instead, as streaming pipelines can process bounded pcollections.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to