arnavarora2004 commented on code in PR #35435:
URL: https://github.com/apache/beam/pull/35435#discussion_r2213691940


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteSchemaTransformProvider.java:
##########
@@ -135,18 +140,186 @@ public PCollectionRowTuple expand(PCollectionRowTuple 
input) {
           String.format(
               "Could not find expected input [%s] to %s.", INPUT_TAG, 
getClass().getSimpleName()));
 
-      PCollection<Row> beamRowMutations = input.get(INPUT_TAG);
-      PCollection<KV<ByteString, Iterable<Mutation>>> bigtableMutations =
-          beamRowMutations.apply(MapElements.via(new 
GetMutationsFromBeamRow()));
+      Schema testOriginialSchema =
+          Schema.builder()
+              .addByteArrayField("key")
+              .addArrayField(
+                  "mutations",
+                  Schema.FieldType.map(Schema.FieldType.STRING, 
Schema.FieldType.BYTES))
+              .build();
 
-      bigtableMutations.apply(
-          BigtableIO.write()
-              .withTableId(configuration.getTableId())
-              .withInstanceId(configuration.getInstanceId())
-              .withProjectId(configuration.getProjectId()));
+      Schema inputSchema = input.getSinglePCollection().getSchema();
 
+      System.out.println("Input Schema for BigTableMutations: " + inputSchema);
+
+      PCollection<KV<ByteString, Iterable<Mutation>>> bigtableMutations = null;
+      if (inputSchema.equals(testOriginialSchema)) {
+        PCollection<Row> beamRowMutations = input.get(INPUT_TAG);
+        bigtableMutations =
+            beamRowMutations.apply(
+                // Original schema inputs gets sent out to the original 
transform provider mutations
+                // function
+                MapElements.via(
+                    new 
BigtableWriteSchemaTransformProvider.GetMutationsFromBeamRow()));
+      } else if (inputSchema.hasField("type")) {
+        bigtableMutations = changeMutationInput(input);
+      } else {
+        System.out.println(
+            "Inputted Schema is Invalid; the schema should be formatted in one 
of two ways:\n "
+                + "key\": ByteString\n"
+                + "\"type\": String\n"
+                + "\"column_qualifier\": ByteString\n"
+                + "\"family_name\": ByteString\n"
+                + "\"timestamp_micros\": Long\n"
+                + "\"start_timestamp_micros\": Long\n"
+                + "\"end_timestamp_micros\": Long"
+                + "OR\n"
+                + "\n"
+                + "\"key\": ByteString\n"
+                + "(\"mutations\", contains map(String, ByteString) of 
mutations in the mutation schema format");
+      }
+
+      if (bigtableMutations != null) {
+        bigtableMutations.apply(
+            BigtableIO.write()
+                .withTableId(configuration.getTableId())
+                .withInstanceId(configuration.getInstanceId())
+                .withProjectId(configuration.getProjectId()));
+      } else {
+        throw new RuntimeException(
+            "Inputted Schema caused mutation error, check error logs and input 
schema format");
+      }
       return PCollectionRowTuple.empty(input.getPipeline());
     }
+
+    public PCollection<KV<ByteString, Iterable<Mutation>>> changeMutationInput(
+        PCollectionRowTuple inputR) {
+      PCollection<Row> beamRowMutationsList = inputR.getSinglePCollection();
+      // convert all row inputs into KV<ByteString, Mutation>
+      PCollection<KV<ByteString, Mutation>> changedBeamRowMutationsList =
+          beamRowMutationsList.apply(
+              MapElements.into(
+                      TypeDescriptors.kvs(
+                          TypeDescriptor.of(ByteString.class), 
TypeDescriptor.of(Mutation.class)))
+                  .via(
+                      (Row input) -> {
+                        @SuppressWarnings("nullness")
+                        ByteString key =
+                            ByteString.copyFrom(
+                                Preconditions.checkStateNotNull(
+                                    input.getBytes("key"),
+                                    "Encountered row with incorrect 'key' 
property."));
+
+                        Mutation bigtableMutation;
+                        String mutationType =
+                            input.getString("type"); // Direct call, can 
return null
+                        if (mutationType == null) {
+                          throw new IllegalArgumentException("Mutation type 
cannot be null.");
+                        }
+                        switch (mutationType) {
+                          case "SetCell":
+                            @SuppressWarnings("nullness")
+                            Mutation.SetCell.Builder setMutation =
+                                Mutation.SetCell.newBuilder()
+                                    .setValue(
+                                        ByteString.copyFrom(
+                                            Preconditions.checkStateNotNull(
+                                                input.getBytes("value"),
+                                                "Encountered SetCell mutation 
with incorrect 'family_name' property.")))
+                                    .setColumnQualifier(
+                                        ByteString.copyFrom(
+                                            Preconditions.checkStateNotNull(
+                                                
input.getBytes("column_qualifier"),
+                                                "Encountered SetCell mutation 
with incorrect 'column_qualifier' property. ")))
+                                    .setFamilyNameBytes(
+                                        ByteString.copyFrom(
+                                            Preconditions.checkStateNotNull(
+                                                input.getBytes("family_name"),
+                                                "Encountered SetCell mutation 
with incorrect 'family_name' property.")));
+                            // Use timestamp if provided, else default to -1 
(current
+                            // Bigtable
+                            // server time)
+                            // Timestamp (optional, assuming Long type in Row 
schema)
+                            Long timestampMicros = 
input.getInt64("timestamp_micros");
+                            setMutation.setTimestampMicros(
+                                timestampMicros != null ? timestampMicros : 
-1);
+
+                            bigtableMutation =
+                                
Mutation.newBuilder().setSetCell(setMutation.build()).build();
+                            break;
+                          case "DeleteFromColumn":
+                            // set timestamp range if applicable
+                            @SuppressWarnings("nullness")
+                            Mutation.DeleteFromColumn.Builder deleteMutation =
+                                Mutation.DeleteFromColumn.newBuilder()
+                                    .setColumnQualifier(
+                                        ByteString.copyFrom(
+                                            Preconditions.checkStateNotNull(
+                                                
input.getBytes("column_qualifier"),
+                                                "Encountered DeleteFromColumn 
mutation with incorrect 'column_qualifier' property.")))
+                                    .setFamilyNameBytes(
+                                        ByteString.copyFrom(
+                                            Preconditions.checkStateNotNull(
+                                                input.getBytes("family_name"),
+                                                "Encountered DeleteFromColumn 
mutation with incorrect 'family_name' property.")));
+
+                            // if start or end timestamp provided
+                            // Timestamp Range (optional, assuming Long type 
in Row schema)
+                            Long startTimestampMicros = null;
+                            Long endTimestampMicros = null;
+
+                            if 
(input.getSchema().hasField("start_timestamp_micros")) {
+                              startTimestampMicros = 
input.getInt64("start_timestamp_micros");
+                            }
+                            if 
(input.getSchema().hasField("end_timestamp_micros")) {
+                              endTimestampMicros = 
input.getInt64("end_timestamp_micros");
+                            }
+
+                            if (startTimestampMicros != null || 
endTimestampMicros != null) {
+                              TimestampRange.Builder timeRange = 
TimestampRange.newBuilder();
+                              if (startTimestampMicros != null) {
+                                
timeRange.setStartTimestampMicros(startTimestampMicros);
+                              }
+                              if (endTimestampMicros != null) {
+                                
timeRange.setEndTimestampMicros(endTimestampMicros);
+                              }
+                              deleteMutation.setTimeRange(timeRange.build());
+                            }
+                            bigtableMutation =
+                                Mutation.newBuilder()
+                                    
.setDeleteFromColumn(deleteMutation.build())
+                                    .build();
+                            break;
+                          case "DeleteFromFamily":
+                            bigtableMutation =
+                                Mutation.newBuilder()
+                                    .setDeleteFromFamily(
+                                        Mutation.DeleteFromFamily.newBuilder()
+                                            .setFamilyNameBytes(
+                                                ByteString.copyFrom(
+                                                    
Preconditions.checkStateNotNull(
+                                                        
input.getBytes("family_name"),
+                                                        "Encountered 
DeleteFromFamily mutation with incorrect 'family_name' property.")))
+                                            .build())
+                                    .build();
+                            break;
+                          case "DeleteFromRow":
+                            bigtableMutation =
+                                Mutation.newBuilder()
+                                    
.setDeleteFromRow(Mutation.DeleteFromRow.newBuilder().build())
+                                    .build();
+                            break;
+                          default:
+                            throw new RuntimeException(
+                                String.format(
+                                    "Unexpected mutation type [%s]: %s",
+                                    ((input.getString("type"))), input));

Review Comment:
   sounds good,



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to