Abacn commented on code in PR #25831:
URL: https://github.com/apache/beam/pull/25831#discussion_r1137333896


##########
sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java:
##########
@@ -150,6 +155,32 @@
  *         .withTableId("table"));
  * }</pre>
  *
+ * <h4>Writing {@link RowMutations}</h4>
+ *
+ * <p>An alternative way to write to HBase is with {@link 
HBaseIO#writeRowMutations()}, which takes
+ * as input a {@link PCollection<KV<byte[], RowMutations>>}, representing KVs 
of byte row keys and
+ * {@link RowMutations}.
+ *
+ * <p>This implementation is Dataflow specific. Useful for preserving mutation 
order if the upstream

Review Comment:
   ```suggestion
    * <p>This implementation is useful for preserving mutation order if the 
upstream
   ```



##########
sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java:
##########
@@ -150,6 +155,32 @@
  *         .withTableId("table"));
  * }</pre>
  *
+ * <h4>Writing {@link RowMutations}</h4>
+ *
+ * <p>An alternative way to write to HBase is with {@link 
HBaseIO#writeRowMutations()}, which takes
+ * as input a {@link PCollection<KV<byte[], RowMutations>>}, representing KVs 
of byte row keys and

Review Comment:
   ```suggestion
    * as input a {@link PCollection<KV<byte[], RowMutations>>}, representing 
KVs of bytes row keys and
   ```



##########
sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java:
##########
@@ -765,4 +796,195 @@ public void populateDisplayData(DisplayData.Builder 
builder) {
       private transient BufferedMutator mutator;
     }
   }
+
+  public static WriteRowMutations writeRowMutations() {
+    return new WriteRowMutations(null /* Configuration */, "");
+  }
+
+  /** Transformation that writes RowMutation objects to a Hbase table. */
+  public static class WriteRowMutations
+      extends PTransform<PCollection<KV<byte[], RowMutations>>, 
PCollection<Integer>> {
+
+    /** Writes to the HBase instance indicated by the* given Configuration. */

Review Comment:
   ```suggestion
       /** Writes to the HBase instance indicated by the given Configuration. */
   ```



##########
sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java:
##########
@@ -765,4 +796,195 @@ public void populateDisplayData(DisplayData.Builder 
builder) {
       private transient BufferedMutator mutator;
     }
   }
+
+  public static WriteRowMutations writeRowMutations() {
+    return new WriteRowMutations(null /* Configuration */, "");
+  }
+
+  /** Transformation that writes RowMutation objects to a Hbase table. */
+  public static class WriteRowMutations
+      extends PTransform<PCollection<KV<byte[], RowMutations>>, 
PCollection<Integer>> {
+
+    /** Writes to the HBase instance indicated by the* given Configuration. */
+    public WriteRowMutations withConfiguration(Configuration configuration) {
+      checkNotNull(configuration, "configuration cannot be null");
+      return new WriteRowMutations(configuration, tableId);
+    }
+
+    /** Writes to the specified table. */
+    public WriteRowMutations withTableId(String tableId) {
+      checkNotNull(tableId, "tableId cannot be null");
+      return new WriteRowMutations(configuration, tableId);
+    }
+
+    private WriteRowMutations(Configuration configuration, String tableId) {
+      this.configuration = configuration;
+      this.tableId = tableId;
+    }
+
+    @Override
+    public PCollection<Integer> expand(PCollection<KV<byte[], RowMutations>> 
input) {
+      checkNotNull(configuration, "withConfiguration() is required");
+      checkNotNull(tableId, "withTableId() is required");
+      checkArgument(!tableId.isEmpty(), "withTableId() cannot be empty");
+
+      return input.apply(ParDo.of(new WriteRowMutationsFn(this)));
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
+      builder.add(DisplayData.item("configuration", configuration.toString()));
+      builder.add(DisplayData.item("tableId", tableId));
+    }
+
+    public Configuration getConfiguration() {
+      return configuration;
+    }
+
+    public String getTableId() {
+      return tableId;
+    }
+
+    @Override
+    public boolean equals(@Nullable Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      WriteRowMutations writeRowMutations = (WriteRowMutations) o;
+      return 
configuration.toString().equals(writeRowMutations.configuration.toString())
+          && Objects.equals(tableId, writeRowMutations.tableId);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(configuration, tableId);
+    }
+
+    /**
+     * The writeReplace method allows the developer to provide a replacement 
object that will be
+     * serialized instead of the original one. We use this to keep the 
enclosed class immutable. For
+     * more details on the technique see <a
+     * 
href="https://lingpipe-blog.com/2009/08/10/serializing-immutable-singletons-serialization-proxy/";>this
+     * article</a>.
+     */
+    private Object writeReplace() {
+      return new SerializationProxy(this);
+    }
+
+    private static class SerializationProxy implements Serializable {
+      public SerializationProxy() {}
+
+      public SerializationProxy(WriteRowMutations writeRowMutations) {
+        configuration = writeRowMutations.configuration;
+        tableId = writeRowMutations.tableId;
+      }
+
+      private void writeObject(ObjectOutputStream out) throws IOException {
+        SerializableCoder.of(SerializableConfiguration.class)
+            .encode(new SerializableConfiguration(this.configuration), out);
+
+        StringUtf8Coder.of().encode(this.tableId, out);
+      }
+
+      private void readObject(ObjectInputStream in) throws IOException {
+        this.configuration = 
SerializableCoder.of(SerializableConfiguration.class).decode(in).get();
+        this.tableId = StringUtf8Coder.of().decode(in);
+      }
+
+      Object readResolve() {
+        return 
HBaseIO.writeRowMutations().withConfiguration(configuration).withTableId(tableId);
+      }
+
+      private Configuration configuration;
+      private String tableId;
+    }
+
+    private final Configuration configuration;
+    private final String tableId;
+
+    /** Function to write row mutations to a hbase table. */
+    private class WriteRowMutationsFn extends DoFn<KV<byte[], RowMutations>, 
Integer> {
+
+      public WriteRowMutationsFn(
+          WriteRowMutations writeRowMutations) { // , HbaseSharedConnection 
hbaseSharedConnection) {
+        checkNotNull(writeRowMutations.tableId, "tableId");
+        checkNotNull(writeRowMutations.configuration, "configuration");
+      }
+
+      @Setup
+      public void setup() throws Exception {
+        connection = HBaseSharedConnection.getOrCreate(configuration);
+      }
+
+      @StartBundle
+      public void startBundle(StartBundleContext c) throws IOException {
+        table = connection.getTable(TableName.valueOf(tableId));
+        recordsWritten = 0;
+      }
+
+      @FinishBundle
+      public void finishBundle() throws Exception {
+        if (table != null) {
+          table.close();
+          table = null;
+        }
+
+        LOG.debug("Wrote {} records", recordsWritten);
+      }
+
+      @Teardown
+      public void tearDown() throws Exception {
+
+        if (table != null) {
+          table.close();
+          table = null;
+        }
+
+        HBaseSharedConnection.close();
+      }
+
+      @ProcessElement
+      public void processElement(ProcessContext c) throws Exception {
+        RowMutations mutations = c.element().getValue();
+
+        try {
+          // Use Table instead of BufferedMutator to preserve mutation-ordering
+          table.mutateRow(mutations);
+        } catch (Exception e) {
+          throw new Exception(
+              (String.join(
+                  " ",
+                  "Table",
+                  tableId,
+                  "row",
+                  Bytes.toString(mutations.getRow()),
+                  "mutation failed.",
+                  "\nTable Available/Enabled:",
+                  Boolean.toString(
+                      
connection.getAdmin().isTableAvailable(TableName.valueOf(tableId))),
+                  Boolean.toString(
+                      
connection.getAdmin().isTableEnabled(TableName.valueOf(tableId))),
+                  "\nConnection Closed/Aborted/Locks:",
+                  Boolean.toString(connection.isClosed()),
+                  Boolean.toString(connection.isAborted()))));
+        }
+
+        // Dummy output so that we can get Dataflow stats for throughput.
+        c.output(1);

Review Comment:
   dummy output may not be suitable in production code. Either implement 
WriteResult or use PDone for now (and implement WriteResult as future task)



##########
sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java:
##########
@@ -765,4 +796,195 @@ public void populateDisplayData(DisplayData.Builder 
builder) {
       private transient BufferedMutator mutator;
     }
   }
+
+  public static WriteRowMutations writeRowMutations() {
+    return new WriteRowMutations(null /* Configuration */, "");
+  }
+
+  /** Transformation that writes RowMutation objects to a Hbase table. */
+  public static class WriteRowMutations
+      extends PTransform<PCollection<KV<byte[], RowMutations>>, 
PCollection<Integer>> {
+
+    /** Writes to the HBase instance indicated by the* given Configuration. */
+    public WriteRowMutations withConfiguration(Configuration configuration) {
+      checkNotNull(configuration, "configuration cannot be null");
+      return new WriteRowMutations(configuration, tableId);
+    }
+
+    /** Writes to the specified table. */
+    public WriteRowMutations withTableId(String tableId) {
+      checkNotNull(tableId, "tableId cannot be null");
+      return new WriteRowMutations(configuration, tableId);
+    }
+
+    private WriteRowMutations(Configuration configuration, String tableId) {
+      this.configuration = configuration;
+      this.tableId = tableId;
+    }
+
+    @Override
+    public PCollection<Integer> expand(PCollection<KV<byte[], RowMutations>> 
input) {
+      checkNotNull(configuration, "withConfiguration() is required");
+      checkNotNull(tableId, "withTableId() is required");
+      checkArgument(!tableId.isEmpty(), "withTableId() cannot be empty");
+
+      return input.apply(ParDo.of(new WriteRowMutationsFn(this)));
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
+      builder.add(DisplayData.item("configuration", configuration.toString()));
+      builder.add(DisplayData.item("tableId", tableId));
+    }
+
+    public Configuration getConfiguration() {
+      return configuration;
+    }
+
+    public String getTableId() {
+      return tableId;
+    }
+
+    @Override
+    public boolean equals(@Nullable Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      WriteRowMutations writeRowMutations = (WriteRowMutations) o;
+      return 
configuration.toString().equals(writeRowMutations.configuration.toString())
+          && Objects.equals(tableId, writeRowMutations.tableId);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(configuration, tableId);
+    }
+
+    /**
+     * The writeReplace method allows the developer to provide a replacement 
object that will be
+     * serialized instead of the original one. We use this to keep the 
enclosed class immutable. For
+     * more details on the technique see <a
+     * 
href="https://lingpipe-blog.com/2009/08/10/serializing-immutable-singletons-serialization-proxy/";>this
+     * article</a>.
+     */
+    private Object writeReplace() {
+      return new SerializationProxy(this);
+    }
+
+    private static class SerializationProxy implements Serializable {
+      public SerializationProxy() {}
+
+      public SerializationProxy(WriteRowMutations writeRowMutations) {
+        configuration = writeRowMutations.configuration;
+        tableId = writeRowMutations.tableId;
+      }
+
+      private void writeObject(ObjectOutputStream out) throws IOException {
+        SerializableCoder.of(SerializableConfiguration.class)
+            .encode(new SerializableConfiguration(this.configuration), out);
+
+        StringUtf8Coder.of().encode(this.tableId, out);
+      }
+
+      private void readObject(ObjectInputStream in) throws IOException {
+        this.configuration = 
SerializableCoder.of(SerializableConfiguration.class).decode(in).get();
+        this.tableId = StringUtf8Coder.of().decode(in);
+      }
+
+      Object readResolve() {
+        return 
HBaseIO.writeRowMutations().withConfiguration(configuration).withTableId(tableId);
+      }
+
+      private Configuration configuration;
+      private String tableId;
+    }
+
+    private final Configuration configuration;
+    private final String tableId;
+
+    /** Function to write row mutations to a hbase table. */
+    private class WriteRowMutationsFn extends DoFn<KV<byte[], RowMutations>, 
Integer> {
+
+      public WriteRowMutationsFn(
+          WriteRowMutations writeRowMutations) { // , HbaseSharedConnection 
hbaseSharedConnection) {
+        checkNotNull(writeRowMutations.tableId, "tableId");
+        checkNotNull(writeRowMutations.configuration, "configuration");
+      }
+
+      @Setup
+      public void setup() throws Exception {
+        connection = HBaseSharedConnection.getOrCreate(configuration);
+      }
+
+      @StartBundle
+      public void startBundle(StartBundleContext c) throws IOException {
+        table = connection.getTable(TableName.valueOf(tableId));
+        recordsWritten = 0;
+      }
+
+      @FinishBundle
+      public void finishBundle() throws Exception {
+        if (table != null) {
+          table.close();
+          table = null;
+        }
+
+        LOG.debug("Wrote {} records", recordsWritten);

Review Comment:
   This is always 0, looks like missing a line of incrementing recordsWritten 
somewhere?



##########
sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java:
##########
@@ -150,6 +155,32 @@
  *         .withTableId("table"));
  * }</pre>
  *
+ * <h4>Writing {@link RowMutations}</h4>
+ *
+ * <p>An alternative way to write to HBase is with {@link 
HBaseIO#writeRowMutations()}, which takes
+ * as input a {@link PCollection<KV<byte[], RowMutations>>}, representing KVs 
of byte row keys and
+ * {@link RowMutations}.
+ *
+ * <p>This implementation is Dataflow specific. Useful for preserving mutation 
order if the upstream
+ * is ordered by row key, as RowMutations will only be applied after previous 
RowMutations are
+ * successful.
+ *
+ * <p>To configure the sink, you must supply a table id string and a {@link 
Configuration} to
+ * identify the HBase instance, for example:
+ *
+ * <pre>{@code
+ * Configuration configuration = ...;
+ * PCollection<KV<byte[], RowMutations>> data = ...;
+ *
+ * data.apply("write",
+ *     HBaseIO.writeRowMutations()
+ *         .withConfiguration(configuration)
+ *         .withTableId("table"));
+ * }</pre>
+ *
+ * <p>Note that the transformation emits the number of RowMutations written as 
an integer after

Review Comment:
   It would be nice to emit some WriteResult (like BigTableIO did). To my 
knowledge emitting the number of element written is not common practice for 
Beam sinks. The other common practice (PDone, and is what HBaseIO.Write 
currently uses) is an antipattern, and will be changed in the future.
   
   More info: https://beam.apache.org/documentation/io/io-standards/#general



##########
sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java:
##########
@@ -150,6 +155,32 @@
  *         .withTableId("table"));
  * }</pre>
  *
+ * <h4>Writing {@link RowMutations}</h4>
+ *
+ * <p>An alternative way to write to HBase is with {@link 
HBaseIO#writeRowMutations()}, which takes
+ * as input a {@link PCollection<KV<byte[], RowMutations>>}, representing KVs 
of byte row keys and
+ * {@link RowMutations}.
+ *
+ * <p>This implementation is Dataflow specific. Useful for preserving mutation 
order if the upstream

Review Comment:
   the documentation in specific IO generally does not make statement of runner 
specific because io connector artifact does not depend on dataflow-runner-java. 
What makes this dataflow specific?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to