Abacn commented on code in PR #25864:
URL: https://github.com/apache/beam/pull/25864#discussion_r1145484111


##########
sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseRowMutationsCoder.java:
##########
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hbase;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderProvider;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.coders.StructuredCoder;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RowMutations;
+import 
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
+import org.checkerframework.checker.initialization.qual.Initialized;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
+
+/**
+ * Row mutations coder to provide serialization support for Hbase RowMutations 
object, which isn't
+ * natively serializable.
+ */
+class HBaseRowMutationsCoder extends StructuredCoder<RowMutations> implements 
Serializable {
+  private static final HBaseRowMutationsCoder INSTANCE = new 
HBaseRowMutationsCoder();
+
+  public HBaseRowMutationsCoder() {
+    byteArrayCoder = ByteArrayCoder.of();
+    listCoder = ListCoder.of(HBaseMutationCoder.of());
+  }
+
+  public static HBaseRowMutationsCoder of() {
+    return INSTANCE;
+  }
+
+  private final ByteArrayCoder byteArrayCoder;
+  private final ListCoder<Mutation> listCoder;
+
+  @Override
+  public void encode(RowMutations value, OutputStream outStream) throws 
IOException {
+    byteArrayCoder.encode(value.getRow(), outStream);
+    listCoder.encode(value.getMutations(), outStream);
+  }
+
+  @Override
+  public RowMutations decode(InputStream inStream) throws IOException {
+
+    byte[] rowKey = byteArrayCoder.decode(inStream);
+    List<Mutation> mutations = listCoder.decode(inStream);
+
+    RowMutations rowMutations = new RowMutations(rowKey);
+    for (Mutation m : mutations) {
+      MutationType type = getType(m);
+
+      if (type == MutationType.PUT) {
+        rowMutations.add((Put) m);
+      } else if (type == MutationType.DELETE) {
+        rowMutations.add((Delete) m);
+      }

Review Comment:
   Do we need to raise IllegalArguementException instead of implicitly ignore 
here as getType() below?



##########
sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseSharedConnection.java:
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hbase;
+
+import java.io.IOException;
+import java.io.Serializable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Static connection shared between all threads of a worker, i.e. connectors 
are transient within
+ * single worker machine. Connectors are not persisted between worker machines 
as Connection
+ * serialization is not implemented. Each worker will create its own 
connection and share it between
+ * all its threads.
+ */
+class HBaseSharedConnection implements Serializable {
+  private static final long serialVersionUID = 5252999807656940415L;
+  private static final Logger LOG = 
LoggerFactory.getLogger(HBaseSharedConnection.class);
+
+  // Transient connection to be initialized per worker
+  // Wrap Connection in array because static Connection cannot be non-null in 
beam repo
+  private static @MonotonicNonNull Connection connection = null;
+  // Number of threads using the shared connection, close connection if 
connectionCount goes to 0
+  private static int connectionCount;
+
+  /**
+   * Create or return existing Hbase connection.
+   *
+   * @param configuration Hbase configuration
+   * @return Hbase connection
+   * @throws IOException
+   */
+  public static synchronized @MonotonicNonNull Connection 
getOrCreate(Configuration configuration)
+      throws IOException {
+    if (connection == null || connection.isClosed()) {
+      connection = ConnectionFactory.createConnection(configuration);
+      connectionCount = 0;
+    }
+    connectionCount++;
+    return connection;
+  }
+
+  /**
+   * Decrement connector count and close connection if no more connector is 
using it.
+   *
+   * @throws IOException
+   */
+  public static synchronized void close() throws IOException {
+    connectionCount--;
+    if (connectionCount == 0) {
+      if (connection != null) {
+        connection.close();
+      }
+    }
+    if (connectionCount < 0) {
+      LOG.warn("Connection count at " + connectionCount + ", should not be 
possible");

Review Comment:
   Looking at the class alone it is actually possible, if close() is called 
multiple times, because this is a public method. warn that closing a closed 
connection and set counter back to 0?



##########
sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseSharedConnection.java:
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hbase;
+
+import java.io.IOException;
+import java.io.Serializable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Static connection shared between all threads of a worker, i.e. connectors 
are transient within
+ * single worker machine. Connectors are not persisted between worker machines 
as Connection
+ * serialization is not implemented. Each worker will create its own 
connection and share it between
+ * all its threads.
+ */
+class HBaseSharedConnection implements Serializable {
+  private static final long serialVersionUID = 5252999807656940415L;
+  private static final Logger LOG = 
LoggerFactory.getLogger(HBaseSharedConnection.class);
+
+  // Transient connection to be initialized per worker
+  // Wrap Connection in array because static Connection cannot be non-null in 
beam repo
+  private static @MonotonicNonNull Connection connection = null;
+  // Number of threads using the shared connection, close connection if 
connectionCount goes to 0
+  private static int connectionCount;
+
+  /**
+   * Create or return existing Hbase connection.
+   *
+   * @param configuration Hbase configuration
+   * @return Hbase connection
+   * @throws IOException
+   */
+  public static synchronized @MonotonicNonNull Connection 
getOrCreate(Configuration configuration)

Review Comment:
   No need `@MonotonicNonNull` here? As it should return a non-null connection 
always.



##########
sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseRowMutationsCoder.java:
##########
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hbase;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderProvider;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.coders.StructuredCoder;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RowMutations;
+import 
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
+import org.checkerframework.checker.initialization.qual.Initialized;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
+
+/**
+ * Row mutations coder to provide serialization support for Hbase RowMutations 
object, which isn't
+ * natively serializable.
+ */
+class HBaseRowMutationsCoder extends StructuredCoder<RowMutations> implements 
Serializable {
+  private static final HBaseRowMutationsCoder INSTANCE = new 
HBaseRowMutationsCoder();
+
+  public HBaseRowMutationsCoder() {
+    byteArrayCoder = ByteArrayCoder.of();
+    listCoder = ListCoder.of(HBaseMutationCoder.of());
+  }
+
+  public static HBaseRowMutationsCoder of() {
+    return INSTANCE;
+  }
+
+  private final ByteArrayCoder byteArrayCoder;
+  private final ListCoder<Mutation> listCoder;
+
+  @Override
+  public void encode(RowMutations value, OutputStream outStream) throws 
IOException {
+    byteArrayCoder.encode(value.getRow(), outStream);
+    listCoder.encode(value.getMutations(), outStream);
+  }
+
+  @Override
+  public RowMutations decode(InputStream inStream) throws IOException {
+
+    byte[] rowKey = byteArrayCoder.decode(inStream);
+    List<Mutation> mutations = listCoder.decode(inStream);
+
+    RowMutations rowMutations = new RowMutations(rowKey);
+    for (Mutation m : mutations) {
+      MutationType type = getType(m);
+
+      if (type == MutationType.PUT) {
+        rowMutations.add((Put) m);
+      } else if (type == MutationType.DELETE) {
+        rowMutations.add((Delete) m);
+      }
+    }
+    return rowMutations;
+  }
+
+  @Override
+  public @UnknownKeyFor @NonNull @Initialized List<

Review Comment:
   These notations are not necessary and should be automatically added during 
compiling. `public List<? extends Coder<?>>` should suffice. Same below.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to