This is an automated email from the ASF dual-hosted git repository.

kenn pushed a commit to branch release-2.51.0
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/release-2.51.0 by this push:
     new 70f4a1ae26d CP for #28624 into release 2.51.0 (Bigtable Python 
timestamp bug fix) (#28634)
70f4a1ae26d is described below

commit 70f4a1ae26d71fc89c3f5a99cae4e417cd1ee446
Author: Ahmed Abualsaud <65791736+ahmedab...@users.noreply.github.com>
AuthorDate: Wed Sep 27 17:42:18 2023 -0400

    CP for #28624 into release 2.51.0 (Bigtable Python timestamp bug fix) 
(#28634)
---
 .../BigtableWriteSchemaTransformProvider.java         | 13 +++++++------
 .../BigtableWriteSchemaTransformProviderIT.java       | 19 +++++++++++--------
 sdks/python/apache_beam/io/gcp/bigtableio.py          |  7 +++----
 sdks/python/apache_beam/io/gcp/bigtableio_it_test.py  | 18 ++++++++++++++++++
 4 files changed, 39 insertions(+), 18 deletions(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteSchemaTransformProvider.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteSchemaTransformProvider.java
index d38bdae2f09..b99b69621a8 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteSchemaTransformProvider.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteSchemaTransformProvider.java
@@ -179,12 +179,13 @@ public class BigtableWriteSchemaTransformProvider
                     .setColumnQualifier(
                         
ByteString.copyFrom(ofNullable(mutation.get("column_qualifier")).get()))
                     .setFamilyNameBytes(
-                        
ByteString.copyFrom(ofNullable(mutation.get("family_name")).get()));
-            if (mutation.containsKey("timestamp_micros")) {
-              setMutation =
-                  setMutation.setTimestampMicros(
-                      
Longs.fromByteArray(ofNullable(mutation.get("timestamp_micros")).get()));
-            }
+                        
ByteString.copyFrom(ofNullable(mutation.get("family_name")).get()))
+                    // Use timestamp if provided, else default to -1 (current 
Bigtable server time)
+                    .setTimestampMicros(
+                        mutation.containsKey("timestamp_micros")
+                            ? Longs.fromByteArray(
+                                
ofNullable(mutation.get("timestamp_micros")).get())
+                            : -1);
             bigtableMutation = 
Mutation.newBuilder().setSetCell(setMutation.build()).build();
             break;
           case "DeleteFromColumn":
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteSchemaTransformProviderIT.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteSchemaTransformProviderIT.java
index 14bb04b0315..1a60fe661b5 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteSchemaTransformProviderIT.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteSchemaTransformProviderIT.java
@@ -154,8 +154,8 @@ public class BigtableWriteSchemaTransformProviderIT {
   public void testSetMutationsExistingColumn() {
     RowMutation rowMutation =
         RowMutation.create(tableId, "key-1")
-            .setCell(COLUMN_FAMILY_NAME_1, "col_a", "val-1-a")
-            .setCell(COLUMN_FAMILY_NAME_2, "col_c", "val-1-c");
+            .setCell(COLUMN_FAMILY_NAME_1, "col_a", 1000, "val-1-a")
+            .setCell(COLUMN_FAMILY_NAME_2, "col_c", 1000, "val-1-c");
     dataClient.mutateRow(rowMutation);
 
     List<Map<String, byte[]>> mutations = new ArrayList<>();
@@ -165,13 +165,15 @@ public class BigtableWriteSchemaTransformProviderIT {
             "type", "SetCell".getBytes(StandardCharsets.UTF_8),
             "value", "new-val-1-a".getBytes(StandardCharsets.UTF_8),
             "column_qualifier", "col_a".getBytes(StandardCharsets.UTF_8),
-            "family_name", 
COLUMN_FAMILY_NAME_1.getBytes(StandardCharsets.UTF_8)));
+            "family_name", 
COLUMN_FAMILY_NAME_1.getBytes(StandardCharsets.UTF_8),
+            "timestamp_micros", Longs.toByteArray(2000)));
     mutations.add(
         ImmutableMap.of(
             "type", "SetCell".getBytes(StandardCharsets.UTF_8),
             "value", "new-val-1-c".getBytes(StandardCharsets.UTF_8),
             "column_qualifier", "col_c".getBytes(StandardCharsets.UTF_8),
-            "family_name", 
COLUMN_FAMILY_NAME_2.getBytes(StandardCharsets.UTF_8)));
+            "family_name", 
COLUMN_FAMILY_NAME_2.getBytes(StandardCharsets.UTF_8),
+            "timestamp_micros", Longs.toByteArray(2000)));
     Row mutationRow =
         Row.withSchema(SCHEMA)
             .withFieldValue("key", "key-1".getBytes(StandardCharsets.UTF_8))
@@ -202,10 +204,11 @@ public class BigtableWriteSchemaTransformProviderIT {
             .collect(Collectors.toList());
     assertEquals(2, cellsColA.size());
     assertEquals(2, cellsColC.size());
-    System.out.println(cellsColA);
-    System.out.println(cellsColC);
-    assertEquals("new-val-1-a", cellsColA.get(1).getValue().toStringUtf8());
-    assertEquals("new-val-1-c", cellsColC.get(1).getValue().toStringUtf8());
+    // Bigtable keeps cell history ordered by descending timestamp
+    assertEquals("new-val-1-a", cellsColA.get(0).getValue().toStringUtf8());
+    assertEquals("new-val-1-c", cellsColC.get(0).getValue().toStringUtf8());
+    assertEquals("val-1-a", cellsColA.get(1).getValue().toStringUtf8());
+    assertEquals("val-1-c", cellsColC.get(1).getValue().toStringUtf8());
   }
 
   @Test
diff --git a/sdks/python/apache_beam/io/gcp/bigtableio.py 
b/sdks/python/apache_beam/io/gcp/bigtableio.py
index b2b52bd675c..f8534f38ddf 100644
--- a/sdks/python/apache_beam/io/gcp/bigtableio.py
+++ b/sdks/python/apache_beam/io/gcp/bigtableio.py
@@ -252,11 +252,10 @@ class WriteToBigTable(beam.PTransform):
               "type": b'SetCell',
               "family_name": mutation.set_cell.family_name.encode('utf-8'),
               "column_qualifier": mutation.set_cell.column_qualifier,
-              "value": mutation.set_cell.value
+              "value": mutation.set_cell.value,
+              "timestamp_micros": struct.pack(
+                  '>q', mutation.set_cell.timestamp_micros)
           }
-          micros = mutation.set_cell.timestamp_micros
-          if micros > -1:
-            mutation_dict['timestamp_micros'] = struct.pack('>q', micros)
         elif mutation.__contains__("delete_from_column"):
           mutation_dict = {
               "type": b'DeleteFromColumn',
diff --git a/sdks/python/apache_beam/io/gcp/bigtableio_it_test.py 
b/sdks/python/apache_beam/io/gcp/bigtableio_it_test.py
index 341f2983c8b..f61e346cff9 100644
--- a/sdks/python/apache_beam/io/gcp/bigtableio_it_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigtableio_it_test.py
@@ -223,6 +223,9 @@ class TestWriteToBigtableXlangIT(unittest.TestCase):
     row1_col2_cell = Cell(b'val1-2', 200_000_000)
     row2_col1_cell = Cell(b'val2-1', 100_000_000)
     row2_col2_cell = Cell(b'val2-2', 200_000_000)
+    # When setting this cell, we won't set a timestamp. We expect the timestamp
+    # to default to -1, and Bigtable will set it to system time at insertion.
+    row2_col1_no_timestamp = Cell(b'val2-2-notimestamp', time.time())
     # rows sent to write transform
     row1.set_cell(
         'col_fam', b'col-1', row1_col1_cell.value, row1_col1_cell.timestamp)
@@ -232,6 +235,8 @@ class TestWriteToBigtableXlangIT(unittest.TestCase):
         'col_fam', b'col-1', row2_col1_cell.value, row2_col1_cell.timestamp)
     row2.set_cell(
         'col_fam', b'col-2', row2_col2_cell.value, row2_col2_cell.timestamp)
+    # don't set a timestamp here. it should default to -1
+    row2.set_cell('col_fam', b'col-no-timestamp', row2_col1_no_timestamp.value)
 
     self.run_pipeline([row1, row2])
 
@@ -249,6 +254,19 @@ class TestWriteToBigtableXlangIT(unittest.TestCase):
     self.assertEqual(
         row2_col2_cell, actual_row2.find_cells('col_fam', b'col-2')[0])
 
+    # check mutation that doesn't have a timestamp set is handled properly:
+    self.assertEqual(
+        row2_col1_no_timestamp.value,
+        actual_row2.find_cells('col_fam', b'col-no-timestamp')[0].value)
+    # Bigtable sets timestamp as insertion time, which is later than the
+    # time.time() we set when creating this test case
+    cell_timestamp = actual_row2.find_cells('col_fam',
+                                            b'col-no-timestamp')[0].timestamp
+    self.assertTrue(
+        row2_col1_no_timestamp.timestamp < cell_timestamp,
+        msg="Expected cell with unset timestamp to have ingestion time "
+        f"attached, but was {cell_timestamp}")
+
   def test_delete_cells_mutation(self):
     col_fam = self.table.column_family('col_fam')
     col_fam.create()

Reply via email to