Abacn commented on code in PR #25831:
URL: https://github.com/apache/beam/pull/25831#discussion_r1137333896
##########
sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java:
##########
@@ -150,6 +155,32 @@
* .withTableId("table"));
* }</pre>
*
+ * <h4>Writing {@link RowMutations}</h4>
+ *
+ * <p>An alternative way to write to HBase is with {@link
HBaseIO#writeRowMutations()}, which takes
+ * as input a {@link PCollection<KV<byte[], RowMutations>>}, representing KVs
of byte row keys and
+ * {@link RowMutations}.
+ *
+ * <p>This implementation is Dataflow specific. Useful for preserving mutation
order if the upstream
Review Comment:
```suggestion
* <p>This implementation is useful for preserving mutation order if the
upstream
```
##########
sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java:
##########
@@ -150,6 +155,32 @@
* .withTableId("table"));
* }</pre>
*
+ * <h4>Writing {@link RowMutations}</h4>
+ *
+ * <p>An alternative way to write to HBase is with {@link
HBaseIO#writeRowMutations()}, which takes
+ * as input a {@link PCollection<KV<byte[], RowMutations>>}, representing KVs
of byte row keys and
Review Comment:
```suggestion
* as input a {@link PCollection<KV<byte[], RowMutations>>}, representing
KVs of bytes row keys and
```
##########
sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java:
##########
@@ -765,4 +796,195 @@ public void populateDisplayData(DisplayData.Builder
builder) {
private transient BufferedMutator mutator;
}
}
+
+ public static WriteRowMutations writeRowMutations() {
+ return new WriteRowMutations(null /* Configuration */, "");
+ }
+
+ /** Transformation that writes RowMutation objects to a Hbase table. */
+ public static class WriteRowMutations
+ extends PTransform<PCollection<KV<byte[], RowMutations>>,
PCollection<Integer>> {
+
+ /** Writes to the HBase instance indicated by the* given Configuration. */
Review Comment:
```suggestion
/** Writes to the HBase instance indicated by the given Configuration. */
```
##########
sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java:
##########
@@ -765,4 +796,195 @@ public void populateDisplayData(DisplayData.Builder
builder) {
private transient BufferedMutator mutator;
}
}
+
+ public static WriteRowMutations writeRowMutations() {
+ return new WriteRowMutations(null /* Configuration */, "");
+ }
+
+ /** Transformation that writes RowMutation objects to a Hbase table. */
+ public static class WriteRowMutations
+ extends PTransform<PCollection<KV<byte[], RowMutations>>,
PCollection<Integer>> {
+
+ /** Writes to the HBase instance indicated by the* given Configuration. */
+ public WriteRowMutations withConfiguration(Configuration configuration) {
+ checkNotNull(configuration, "configuration cannot be null");
+ return new WriteRowMutations(configuration, tableId);
+ }
+
+ /** Writes to the specified table. */
+ public WriteRowMutations withTableId(String tableId) {
+ checkNotNull(tableId, "tableId cannot be null");
+ return new WriteRowMutations(configuration, tableId);
+ }
+
+ private WriteRowMutations(Configuration configuration, String tableId) {
+ this.configuration = configuration;
+ this.tableId = tableId;
+ }
+
+ @Override
+ public PCollection<Integer> expand(PCollection<KV<byte[], RowMutations>>
input) {
+ checkNotNull(configuration, "withConfiguration() is required");
+ checkNotNull(tableId, "withTableId() is required");
+ checkArgument(!tableId.isEmpty(), "withTableId() cannot be empty");
+
+ return input.apply(ParDo.of(new WriteRowMutationsFn(this)));
+ }
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ super.populateDisplayData(builder);
+ builder.add(DisplayData.item("configuration", configuration.toString()));
+ builder.add(DisplayData.item("tableId", tableId));
+ }
+
+ public Configuration getConfiguration() {
+ return configuration;
+ }
+
+ public String getTableId() {
+ return tableId;
+ }
+
+ @Override
+ public boolean equals(@Nullable Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ WriteRowMutations writeRowMutations = (WriteRowMutations) o;
+ return
configuration.toString().equals(writeRowMutations.configuration.toString())
+ && Objects.equals(tableId, writeRowMutations.tableId);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(configuration, tableId);
+ }
+
+ /**
+ * The writeReplace method allows the developer to provide a replacement
object that will be
+ * serialized instead of the original one. We use this to keep the
enclosed class immutable. For
+ * more details on the technique see <a
+ *
href="https://lingpipe-blog.com/2009/08/10/serializing-immutable-singletons-serialization-proxy/">this
+ * article</a>.
+ */
+ private Object writeReplace() {
+ return new SerializationProxy(this);
+ }
+
+ private static class SerializationProxy implements Serializable {
+ public SerializationProxy() {}
+
+ public SerializationProxy(WriteRowMutations writeRowMutations) {
+ configuration = writeRowMutations.configuration;
+ tableId = writeRowMutations.tableId;
+ }
+
+ private void writeObject(ObjectOutputStream out) throws IOException {
+ SerializableCoder.of(SerializableConfiguration.class)
+ .encode(new SerializableConfiguration(this.configuration), out);
+
+ StringUtf8Coder.of().encode(this.tableId, out);
+ }
+
+ private void readObject(ObjectInputStream in) throws IOException {
+ this.configuration =
SerializableCoder.of(SerializableConfiguration.class).decode(in).get();
+ this.tableId = StringUtf8Coder.of().decode(in);
+ }
+
+ Object readResolve() {
+ return
HBaseIO.writeRowMutations().withConfiguration(configuration).withTableId(tableId);
+ }
+
+ private Configuration configuration;
+ private String tableId;
+ }
+
+ private final Configuration configuration;
+ private final String tableId;
+
+ /** Function to write row mutations to a hbase table. */
+ private class WriteRowMutationsFn extends DoFn<KV<byte[], RowMutations>,
Integer> {
+
+ public WriteRowMutationsFn(
+ WriteRowMutations writeRowMutations) { // , HbaseSharedConnection
hbaseSharedConnection) {
+ checkNotNull(writeRowMutations.tableId, "tableId");
+ checkNotNull(writeRowMutations.configuration, "configuration");
+ }
+
+ @Setup
+ public void setup() throws Exception {
+ connection = HBaseSharedConnection.getOrCreate(configuration);
+ }
+
+ @StartBundle
+ public void startBundle(StartBundleContext c) throws IOException {
+ table = connection.getTable(TableName.valueOf(tableId));
+ recordsWritten = 0;
+ }
+
+ @FinishBundle
+ public void finishBundle() throws Exception {
+ if (table != null) {
+ table.close();
+ table = null;
+ }
+
+ LOG.debug("Wrote {} records", recordsWritten);
+ }
+
+ @Teardown
+ public void tearDown() throws Exception {
+
+ if (table != null) {
+ table.close();
+ table = null;
+ }
+
+ HBaseSharedConnection.close();
+ }
+
+ @ProcessElement
+ public void processElement(ProcessContext c) throws Exception {
+ RowMutations mutations = c.element().getValue();
+
+ try {
+ // Use Table instead of BufferedMutator to preserve mutation-ordering
+ table.mutateRow(mutations);
+ } catch (Exception e) {
+ throw new Exception(
+ (String.join(
+ " ",
+ "Table",
+ tableId,
+ "row",
+ Bytes.toString(mutations.getRow()),
+ "mutation failed.",
+ "\nTable Available/Enabled:",
+ Boolean.toString(
+
connection.getAdmin().isTableAvailable(TableName.valueOf(tableId))),
+ Boolean.toString(
+
connection.getAdmin().isTableEnabled(TableName.valueOf(tableId))),
+ "\nConnection Closed/Aborted/Locks:",
+ Boolean.toString(connection.isClosed()),
+ Boolean.toString(connection.isAborted()))));
+ }
+
+ // Dummy output so that we can get Dataflow stats for throughput.
+ c.output(1);
Review Comment:
dummy output may not be suitable in production code. Either implement
WriteResult or use PDone for now (and implement WriteResult as future task)
##########
sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java:
##########
@@ -765,4 +796,195 @@ public void populateDisplayData(DisplayData.Builder
builder) {
private transient BufferedMutator mutator;
}
}
+
+ public static WriteRowMutations writeRowMutations() {
+ return new WriteRowMutations(null /* Configuration */, "");
+ }
+
+ /** Transformation that writes RowMutation objects to a Hbase table. */
+ public static class WriteRowMutations
+ extends PTransform<PCollection<KV<byte[], RowMutations>>,
PCollection<Integer>> {
+
+ /** Writes to the HBase instance indicated by the* given Configuration. */
+ public WriteRowMutations withConfiguration(Configuration configuration) {
+ checkNotNull(configuration, "configuration cannot be null");
+ return new WriteRowMutations(configuration, tableId);
+ }
+
+ /** Writes to the specified table. */
+ public WriteRowMutations withTableId(String tableId) {
+ checkNotNull(tableId, "tableId cannot be null");
+ return new WriteRowMutations(configuration, tableId);
+ }
+
+ private WriteRowMutations(Configuration configuration, String tableId) {
+ this.configuration = configuration;
+ this.tableId = tableId;
+ }
+
+ @Override
+ public PCollection<Integer> expand(PCollection<KV<byte[], RowMutations>>
input) {
+ checkNotNull(configuration, "withConfiguration() is required");
+ checkNotNull(tableId, "withTableId() is required");
+ checkArgument(!tableId.isEmpty(), "withTableId() cannot be empty");
+
+ return input.apply(ParDo.of(new WriteRowMutationsFn(this)));
+ }
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ super.populateDisplayData(builder);
+ builder.add(DisplayData.item("configuration", configuration.toString()));
+ builder.add(DisplayData.item("tableId", tableId));
+ }
+
+ public Configuration getConfiguration() {
+ return configuration;
+ }
+
+ public String getTableId() {
+ return tableId;
+ }
+
+ @Override
+ public boolean equals(@Nullable Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ WriteRowMutations writeRowMutations = (WriteRowMutations) o;
+ return
configuration.toString().equals(writeRowMutations.configuration.toString())
+ && Objects.equals(tableId, writeRowMutations.tableId);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(configuration, tableId);
+ }
+
+ /**
+ * The writeReplace method allows the developer to provide a replacement
object that will be
+ * serialized instead of the original one. We use this to keep the
enclosed class immutable. For
+ * more details on the technique see <a
+ *
href="https://lingpipe-blog.com/2009/08/10/serializing-immutable-singletons-serialization-proxy/">this
+ * article</a>.
+ */
+ private Object writeReplace() {
+ return new SerializationProxy(this);
+ }
+
+ private static class SerializationProxy implements Serializable {
+ public SerializationProxy() {}
+
+ public SerializationProxy(WriteRowMutations writeRowMutations) {
+ configuration = writeRowMutations.configuration;
+ tableId = writeRowMutations.tableId;
+ }
+
+ private void writeObject(ObjectOutputStream out) throws IOException {
+ SerializableCoder.of(SerializableConfiguration.class)
+ .encode(new SerializableConfiguration(this.configuration), out);
+
+ StringUtf8Coder.of().encode(this.tableId, out);
+ }
+
+ private void readObject(ObjectInputStream in) throws IOException {
+ this.configuration =
SerializableCoder.of(SerializableConfiguration.class).decode(in).get();
+ this.tableId = StringUtf8Coder.of().decode(in);
+ }
+
+ Object readResolve() {
+ return
HBaseIO.writeRowMutations().withConfiguration(configuration).withTableId(tableId);
+ }
+
+ private Configuration configuration;
+ private String tableId;
+ }
+
+ private final Configuration configuration;
+ private final String tableId;
+
+ /** Function to write row mutations to a hbase table. */
+ private class WriteRowMutationsFn extends DoFn<KV<byte[], RowMutations>,
Integer> {
+
+ public WriteRowMutationsFn(
+ WriteRowMutations writeRowMutations) { // , HbaseSharedConnection
hbaseSharedConnection) {
+ checkNotNull(writeRowMutations.tableId, "tableId");
+ checkNotNull(writeRowMutations.configuration, "configuration");
+ }
+
+ @Setup
+ public void setup() throws Exception {
+ connection = HBaseSharedConnection.getOrCreate(configuration);
+ }
+
+ @StartBundle
+ public void startBundle(StartBundleContext c) throws IOException {
+ table = connection.getTable(TableName.valueOf(tableId));
+ recordsWritten = 0;
+ }
+
+ @FinishBundle
+ public void finishBundle() throws Exception {
+ if (table != null) {
+ table.close();
+ table = null;
+ }
+
+ LOG.debug("Wrote {} records", recordsWritten);
Review Comment:
This is always 0, looks like missing a line of incrementing recordsWritten
somewhere?
##########
sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java:
##########
@@ -150,6 +155,32 @@
* .withTableId("table"));
* }</pre>
*
+ * <h4>Writing {@link RowMutations}</h4>
+ *
+ * <p>An alternative way to write to HBase is with {@link
HBaseIO#writeRowMutations()}, which takes
+ * as input a {@link PCollection<KV<byte[], RowMutations>>}, representing KVs
of byte row keys and
+ * {@link RowMutations}.
+ *
+ * <p>This implementation is Dataflow specific. Useful for preserving mutation
order if the upstream
+ * is ordered by row key, as RowMutations will only be applied after previous
RowMutations are
+ * successful.
+ *
+ * <p>To configure the sink, you must supply a table id string and a {@link
Configuration} to
+ * identify the HBase instance, for example:
+ *
+ * <pre>{@code
+ * Configuration configuration = ...;
+ * PCollection<KV<byte[], RowMutations>> data = ...;
+ *
+ * data.apply("write",
+ * HBaseIO.writeRowMutations()
+ * .withConfiguration(configuration)
+ * .withTableId("table"));
+ * }</pre>
+ *
+ * <p>Note that the transformation emits the number of RowMutations written as
an integer after
Review Comment:
It would be nice to emit some WriteResult (like BigTableIO did). To my
knowledge emitting the number of element written is not common practice for
Beam sinks. The other common practice (PDone, and is what HBaseIO.Write
currently uses) is an antipattern, and will be changed in the future.
More info: https://beam.apache.org/documentation/io/io-standards/#general
##########
sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java:
##########
@@ -150,6 +155,32 @@
* .withTableId("table"));
* }</pre>
*
+ * <h4>Writing {@link RowMutations}</h4>
+ *
+ * <p>An alternative way to write to HBase is with {@link
HBaseIO#writeRowMutations()}, which takes
+ * as input a {@link PCollection<KV<byte[], RowMutations>>}, representing KVs
of byte row keys and
+ * {@link RowMutations}.
+ *
+ * <p>This implementation is Dataflow specific. Useful for preserving mutation
order if the upstream
Review Comment:
the documentation in specific IO generally does not make statement of runner
specific because io connector artifact does not depend on dataflow-runner-java.
What makes this dataflow specific?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]