Apache9 commented on code in PR #6578:
URL: https://github.com/apache/hbase/pull/6578#discussion_r1929528151


##########
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java:
##########
@@ -153,6 +156,8 @@ private WALEntrySinkFilter setupWALEntrySinkFilter() throws 
IOException {
       filter = walEntryFilterClass == null
         ? null
         : (WALEntrySinkFilter) 
walEntryFilterClass.getDeclaredConstructor().newInstance();
+    } catch (RuntimeException e) {

Review Comment:
   Here too.



##########
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java:
##########
@@ -139,6 +140,8 @@ public ReplicationSink(Configuration conf, 
RegionServerCoprocessorHost rsServerH
       Class<? extends SourceFSConfigurationProvider> c =
         
Class.forName(className).asSubclass(SourceFSConfigurationProvider.class);
       this.provider = c.getDeclaredConstructor().newInstance();
+    } catch (RuntimeException e) {

Review Comment:
   This is for debug?



##########
hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java:
##########
@@ -968,6 +968,9 @@ public enum OperationStatusCode {
   public static final String REPLICATION_SINK_SERVICE_CLASSNAME_DEFAULT =
     "org.apache.hadoop.hbase.replication.ReplicationSinkServiceImpl";
   public static final String REPLICATION_BULKLOAD_ENABLE_KEY = 
"hbase.replication.bulkload.enabled";
+  public static final String REPLICATION_SINK_TRANSLATOR = 
"hbase.replication.sink.translator";

Review Comment:
   We'd better not put things in HConstants, put them into the package where we 
use it?



##########
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java:
##########
@@ -236,81 +258,71 @@ public void replicateEntries(List<WALEntry> entries, 
final ExtendedCellScanner c
             throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", 
index=" + i);
           }
           ExtendedCell cell = cells.current();
-          // Handle bulk load hfiles replication
           if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) {
+            // Bulk load events
             BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell);
             if (bld.getReplicate()) {
-              if (bulkLoadsPerClusters == null) {
-                bulkLoadsPerClusters = new HashMap<>();
-              }
-              // Map of table name Vs list of pair of family and list of
-              // hfile paths from its namespace
+              // Map of tableNameStr to (family, hfile paths) pairs
               Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap =
                 bulkLoadsPerClusters.computeIfAbsent(bld.getClusterIdsList(), 
k -> new HashMap<>());
-              buildBulkLoadHFileMap(bulkLoadHFileMap, table, bld);
+              buildBulkLoadHFileMap(bulkLoadHFileMap, bld);
             }
           } else if (CellUtil.matchingQualifier(cell, 
WALEdit.REPLICATION_MARKER)) {
             Mutation put = processReplicationMarkerEntry(cell);
             if (put == null) {
               continue;
             }
-            table = REPLICATION_SINK_TRACKER_TABLE_NAME;
-            List<UUID> clusterIds = new ArrayList<>();
-            for (HBaseProtos.UUID clusterId : 
entry.getKey().getClusterIdsList()) {
-              clusterIds.add(toUUID(clusterId));
-            }
+            List<UUID> clusterIds = getSourceClusterIds(entry);
             put.setClusterIds(clusterIds);
-            addToHashMultiMap(rowMap, table, clusterIds, put);
+            addToHashMultiMap(sinkRowMap, REPLICATION_SINK_TRACKER_TABLE_NAME, 
clusterIds, put);
           } else {
-            // Handle wal replication
-            if (isNewRowOrType(previousCell, cell)) {
-              // Create new mutation
-              mutation = CellUtil.isDelete(cell)
-                ? new Delete(cell.getRowArray(), cell.getRowOffset(), 
cell.getRowLength())
-                : new Put(cell.getRowArray(), cell.getRowOffset(), 
cell.getRowLength());
-              List<UUID> clusterIds = new 
ArrayList<>(entry.getKey().getClusterIdsList().size());
-              for (HBaseProtos.UUID clusterId : 
entry.getKey().getClusterIdsList()) {
-                clusterIds.add(toUUID(clusterId));
-              }
-              mutation.setClusterIds(clusterIds);
-              mutation.setAttribute(ReplicationUtils.REPLICATION_ATTR_NAME,
+            TableName sinkTableName = translator.getSinkTableName(tableName);
+            ExtendedCell sinkCell = translator.getSinkExtendedCell(tableName, 
cell);

Review Comment:
   This is the reason why I want to see the javadoc for this method,as why do 
we need to pass the original table name in? And I think we will just do 
tableName mapping, so we do not need to call the above getSinkTableName 
everytime as all the cells from the WALEntry are for the same table?



##########
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java:
##########
@@ -181,6 +186,19 @@ private void decorateConf() {
     }
   }
 
+  private ReplicationSinkTranslator getReplicationSinkTranslator() throws 
IOException {
+    Class<?> translatorClass = 
this.conf.getClass(HConstants.REPLICATION_SINK_TRANSLATOR,
+      IdentityReplicationSinkTranslator.class, 
ReplicationSinkTranslator.class);
+    try {
+      return (ReplicationSinkTranslator) 
translatorClass.getDeclaredConstructor().newInstance();

Review Comment:
   IIRC we have a ReflectionUtils or something to call constructors of a class, 
here we do not need to pass the Configuration object to it? Maybe the 
translator needs to load some configurations?



##########
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/IdentityReplicationSinkTranslator.java:
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import org.apache.hadoop.hbase.ExtendedCell;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.yetus.audience.InterfaceAudience;
+
[email protected]

Review Comment:
   This should be IA.LimitedPrivate("CONFIG")? Do we expected users to use it 
directly in their code?



##########
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java:
##########
@@ -181,6 +186,19 @@ private void decorateConf() {
     }
   }
 
+  private ReplicationSinkTranslator getReplicationSinkTranslator() throws 
IOException {
+    Class<?> translatorClass = 
this.conf.getClass(HConstants.REPLICATION_SINK_TRANSLATOR,
+      IdentityReplicationSinkTranslator.class, 
ReplicationSinkTranslator.class);
+    try {
+      return (ReplicationSinkTranslator) 
translatorClass.getDeclaredConstructor().newInstance();
+    } catch (RuntimeException e) {
+      throw e;
+    } catch (Exception e) {
+      LOG.warn("Failed to instantiate " + translatorClass);
+      return new IdentityReplicationSinkTranslator();

Review Comment:
   Is this the right choice to fallback to default implementation? I'm not 
sure...



##########
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkTranslator.java:
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import org.apache.hadoop.hbase.ExtendedCell;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.yetus.audience.InterfaceAudience;
+
[email protected]
+public interface ReplicationSinkTranslator {

Review Comment:
   Better add some javadoc's to explain the meanings of the methods and the 
usage of this class?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to