Abacn commented on code in PR #25864:
URL: https://github.com/apache/beam/pull/25864#discussion_r1142490279
##########
sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java:
##########
@@ -765,4 +795,194 @@ public void populateDisplayData(DisplayData.Builder
builder) {
private transient BufferedMutator mutator;
}
}
+
+ public static WriteRowMutations writeRowMutations() {
+ return new WriteRowMutations(null /* Configuration */, "");
+ }
+
+ /** Transformation that writes RowMutation objects to a Hbase table. */
+ public static class WriteRowMutations
+ extends PTransform<PCollection<KV<byte[], RowMutations>>, PDone> {
+
+ /** Writes to the HBase instance indicated by the given Configuration. */
+ public WriteRowMutations withConfiguration(Configuration configuration) {
+ checkNotNull(configuration, "configuration cannot be null");
+ return new WriteRowMutations(configuration, tableId);
+ }
+
+ /** Writes to the specified table. */
+ public WriteRowMutations withTableId(String tableId) {
+ checkNotNull(tableId, "tableId cannot be null");
+ return new WriteRowMutations(configuration, tableId);
+ }
+
+ private WriteRowMutations(Configuration configuration, String tableId) {
+ this.configuration = configuration;
+ this.tableId = tableId;
+ }
+
+ @Override
+ public PDone expand(PCollection<KV<byte[], RowMutations>> input) {
+ checkNotNull(configuration, "withConfiguration() is required");
+ checkNotNull(tableId, "withTableId() is required");
+ checkArgument(!tableId.isEmpty(), "withTableId() cannot be empty");
+
+ input.apply(ParDo.of(new WriteRowMutationsFn(this)));
+ return PDone.in(input.getPipeline());
+ }
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ super.populateDisplayData(builder);
+ builder.add(DisplayData.item("configuration", configuration.toString()));
+ builder.add(DisplayData.item("tableId", tableId));
+ }
+
+ public Configuration getConfiguration() {
+ return configuration;
+ }
+
+ public String getTableId() {
+ return tableId;
+ }
+
+ @Override
+ public boolean equals(@Nullable Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ WriteRowMutations writeRowMutations = (WriteRowMutations) o;
+ return
configuration.toString().equals(writeRowMutations.configuration.toString())
+ && Objects.equals(tableId, writeRowMutations.tableId);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(configuration, tableId);
+ }
+
+ /**
+ * The writeReplace method allows the developer to provide a replacement
object that will be
+ * serialized instead of the original one. We use this to keep the
enclosed class immutable. For
+ * more details on the technique see <a
+ *
href="https://lingpipe-blog.com/2009/08/10/serializing-immutable-singletons-serialization-proxy/">this
+ * article</a>.
+ */
+ private Object writeReplace() {
+ return new SerializationProxy(this);
+ }
+
+ private static class SerializationProxy implements Serializable {
+ public SerializationProxy() {}
+
+ public SerializationProxy(WriteRowMutations writeRowMutations) {
+ configuration = writeRowMutations.configuration;
+ tableId = writeRowMutations.tableId;
+ }
+
+ private void writeObject(ObjectOutputStream out) throws IOException {
+ SerializableCoder.of(SerializableConfiguration.class)
+ .encode(new SerializableConfiguration(this.configuration), out);
+
+ StringUtf8Coder.of().encode(this.tableId, out);
+ }
+
+ private void readObject(ObjectInputStream in) throws IOException {
+ this.configuration =
SerializableCoder.of(SerializableConfiguration.class).decode(in).get();
+ this.tableId = StringUtf8Coder.of().decode(in);
+ }
+
+ Object readResolve() {
+ return
HBaseIO.writeRowMutations().withConfiguration(configuration).withTableId(tableId);
+ }
+
+ private Configuration configuration;
+ private String tableId;
+ }
+
+ private final Configuration configuration;
+ private final String tableId;
+
+ /** Function to write row mutations to a hbase table. */
+ private class WriteRowMutationsFn extends DoFn<KV<byte[], RowMutations>,
Integer> {
+
+ public WriteRowMutationsFn(
+ WriteRowMutations writeRowMutations) { // , HbaseSharedConnection
hbaseSharedConnection) {
+ checkNotNull(writeRowMutations.tableId, "tableId");
+ checkNotNull(writeRowMutations.configuration, "configuration");
+ }
+
+ @Setup
+ public void setup() throws Exception {
+ connection = HBaseSharedConnection.getOrCreate(configuration);
+ }
+
+ @StartBundle
+ public void startBundle(StartBundleContext c) throws IOException {
+ table = connection.getTable(TableName.valueOf(tableId));
+ recordsWritten = 0;
+ }
+
+ @FinishBundle
+ public void finishBundle() throws Exception {
+ if (table != null) {
+ table.close();
+ table = null;
+ }
+
+ LOG.debug("Wrote {} records", recordsWritten);
+ }
+
+ @Teardown
+ public void tearDown() throws Exception {
+
+ if (table != null) {
+ table.close();
+ table = null;
+ }
+
+ HBaseSharedConnection.close();
+ }
+
+ @ProcessElement
+ public void processElement(ProcessContext c) throws Exception {
+ RowMutations mutations = c.element().getValue();
+
+ try {
+ // Use Table instead of BufferedMutator to preserve mutation-ordering
+ table.mutateRow(mutations);
+ recordsWritten++;
+ } catch (Exception e) {
+ throw new Exception(
Review Comment:
catch checked IOException and throw RuntimeException ?
##########
sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseSharedConnection.java:
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hbase;
+
+import java.io.IOException;
+import java.io.Serializable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Static connection shared between all threads of a worker. Connectors are
not persisted between
Review Comment:
i.e., Connectors are transient within single worker machine
##########
sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseSharedConnection.java:
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hbase;
+
+import java.io.IOException;
+import java.io.Serializable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Static connection shared between all threads of a worker. Connectors are
not persisted between
+ * worker machines as Connection serialization is not implemented. Each worker
will create its own
+ * connection and share it between all its threads.
+ */
+public class HBaseSharedConnection implements Serializable {
+ private static final long serialVersionUID = 5252999807656940415L;
+ private static final Logger LOG =
LoggerFactory.getLogger(HBaseSharedConnection.class);
+
+ // Transient connection to be initialized per worker
+ // Wrap Connection in array because static Connection cannot be non-null in
beam repo
+ private static Connection[] connection = new Connection[1];
Review Comment:
Should use
```
private static @MonotonicNonNull Connection connection = null;
```
here, then assign connection in getOrCreate. This resolves checkFramework
violation.
##########
sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseRowMutationsCoder.java:
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hbase;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.util.List;
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderProvider;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RowMutations;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
+import
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
+
+/**
+ * Row mutations coder to provide serialization support for Hbase RowMutations
object, which isn't
+ * natively serializable.
+ */
+public class HBaseRowMutationsCoder extends AtomicCoder<RowMutations>
implements Serializable {
Review Comment:
This should be a SstructuredCoder<RowMutations> and uses existing
AtomicCoders to encode/decode rather than operating to the inStream/outStream
directly. We already have the needed AtomicCoders (ByteArrayCoder for key,
ListCoder and HBaseMutationCoder for list of mutations.
##########
sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java:
##########
@@ -765,4 +795,194 @@ public void populateDisplayData(DisplayData.Builder
builder) {
private transient BufferedMutator mutator;
}
}
+
+ public static WriteRowMutations writeRowMutations() {
+ return new WriteRowMutations(null /* Configuration */, "");
+ }
+
+ /** Transformation that writes RowMutation objects to a Hbase table. */
+ public static class WriteRowMutations
+ extends PTransform<PCollection<KV<byte[], RowMutations>>, PDone> {
+
+ /** Writes to the HBase instance indicated by the given Configuration. */
+ public WriteRowMutations withConfiguration(Configuration configuration) {
+ checkNotNull(configuration, "configuration cannot be null");
+ return new WriteRowMutations(configuration, tableId);
+ }
+
+ /** Writes to the specified table. */
+ public WriteRowMutations withTableId(String tableId) {
+ checkNotNull(tableId, "tableId cannot be null");
+ return new WriteRowMutations(configuration, tableId);
+ }
+
+ private WriteRowMutations(Configuration configuration, String tableId) {
+ this.configuration = configuration;
+ this.tableId = tableId;
+ }
+
+ @Override
+ public PDone expand(PCollection<KV<byte[], RowMutations>> input) {
+ checkNotNull(configuration, "withConfiguration() is required");
+ checkNotNull(tableId, "withTableId() is required");
+ checkArgument(!tableId.isEmpty(), "withTableId() cannot be empty");
+
+ input.apply(ParDo.of(new WriteRowMutationsFn(this)));
+ return PDone.in(input.getPipeline());
+ }
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ super.populateDisplayData(builder);
+ builder.add(DisplayData.item("configuration", configuration.toString()));
+ builder.add(DisplayData.item("tableId", tableId));
+ }
+
+ public Configuration getConfiguration() {
+ return configuration;
+ }
+
+ public String getTableId() {
+ return tableId;
+ }
+
+ @Override
+ public boolean equals(@Nullable Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ WriteRowMutations writeRowMutations = (WriteRowMutations) o;
+ return
configuration.toString().equals(writeRowMutations.configuration.toString())
+ && Objects.equals(tableId, writeRowMutations.tableId);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(configuration, tableId);
+ }
+
+ /**
+ * The writeReplace method allows the developer to provide a replacement
object that will be
+ * serialized instead of the original one. We use this to keep the
enclosed class immutable. For
+ * more details on the technique see <a
+ *
href="https://lingpipe-blog.com/2009/08/10/serializing-immutable-singletons-serialization-proxy/">this
+ * article</a>.
+ */
+ private Object writeReplace() {
+ return new SerializationProxy(this);
+ }
+
+ private static class SerializationProxy implements Serializable {
+ public SerializationProxy() {}
+
+ public SerializationProxy(WriteRowMutations writeRowMutations) {
+ configuration = writeRowMutations.configuration;
+ tableId = writeRowMutations.tableId;
+ }
+
+ private void writeObject(ObjectOutputStream out) throws IOException {
+ SerializableCoder.of(SerializableConfiguration.class)
+ .encode(new SerializableConfiguration(this.configuration), out);
+
+ StringUtf8Coder.of().encode(this.tableId, out);
+ }
+
+ private void readObject(ObjectInputStream in) throws IOException {
+ this.configuration =
SerializableCoder.of(SerializableConfiguration.class).decode(in).get();
+ this.tableId = StringUtf8Coder.of().decode(in);
+ }
+
+ Object readResolve() {
+ return
HBaseIO.writeRowMutations().withConfiguration(configuration).withTableId(tableId);
+ }
+
+ private Configuration configuration;
+ private String tableId;
+ }
+
+ private final Configuration configuration;
+ private final String tableId;
+
+ /** Function to write row mutations to a hbase table. */
+ private class WriteRowMutationsFn extends DoFn<KV<byte[], RowMutations>,
Integer> {
+
+ public WriteRowMutationsFn(
+ WriteRowMutations writeRowMutations) { // , HbaseSharedConnection
hbaseSharedConnection) {
Review Comment:
clean up leftover code in comment?
##########
sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/utils/TestConstants.java:
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hbase.utils;
+
+import java.nio.charset.StandardCharsets;
+
+/** Constants used for testing purposes. */
+public class TestConstants {
Review Comment:
also here, no need to be public
##########
sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseSharedConnection.java:
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hbase;
+
+import java.io.IOException;
+import java.io.Serializable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Static connection shared between all threads of a worker. Connectors are
not persisted between
+ * worker machines as Connection serialization is not implemented. Each worker
will create its own
+ * connection and share it between all its threads.
+ */
+public class HBaseSharedConnection implements Serializable {
Review Comment:
no need to be public?
##########
sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseRowMutationsCoder.java:
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hbase;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.util.List;
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderProvider;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RowMutations;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
+import
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
+
+/**
+ * Row mutations coder to provide serialization support for Hbase RowMutations
object, which isn't
+ * natively serializable.
+ */
+public class HBaseRowMutationsCoder extends AtomicCoder<RowMutations>
implements Serializable {
Review Comment:
no need to be public
##########
sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/utils/HashUtils.java:
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hbase.utils;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.RowMutations;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Assert;
+
+/** Utility functions to help assert equality between mutation lists for
testing purposes. */
+public class HashUtils {
Review Comment:
no need to be public?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]