Apache9 commented on code in PR #6578:
URL: https://github.com/apache/hbase/pull/6578#discussion_r1929528151
##########
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java:
##########
@@ -153,6 +156,8 @@ private WALEntrySinkFilter setupWALEntrySinkFilter() throws
IOException {
filter = walEntryFilterClass == null
? null
: (WALEntrySinkFilter)
walEntryFilterClass.getDeclaredConstructor().newInstance();
+ } catch (RuntimeException e) {
Review Comment:
Here too.
##########
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java:
##########
@@ -139,6 +140,8 @@ public ReplicationSink(Configuration conf,
RegionServerCoprocessorHost rsServerH
Class<? extends SourceFSConfigurationProvider> c =
Class.forName(className).asSubclass(SourceFSConfigurationProvider.class);
this.provider = c.getDeclaredConstructor().newInstance();
+ } catch (RuntimeException e) {
Review Comment:
This is for debug?
##########
hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java:
##########
@@ -968,6 +968,9 @@ public enum OperationStatusCode {
public static final String REPLICATION_SINK_SERVICE_CLASSNAME_DEFAULT =
"org.apache.hadoop.hbase.replication.ReplicationSinkServiceImpl";
public static final String REPLICATION_BULKLOAD_ENABLE_KEY =
"hbase.replication.bulkload.enabled";
+ public static final String REPLICATION_SINK_TRANSLATOR =
"hbase.replication.sink.translator";
Review Comment:
We'd better not put things in HConstants, put them into the package where we
use it?
##########
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java:
##########
@@ -236,81 +258,71 @@ public void replicateEntries(List<WALEntry> entries,
final ExtendedCellScanner c
throw new ArrayIndexOutOfBoundsException("Expected=" + count + ",
index=" + i);
}
ExtendedCell cell = cells.current();
- // Handle bulk load hfiles replication
if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) {
+ // Bulk load events
BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell);
if (bld.getReplicate()) {
- if (bulkLoadsPerClusters == null) {
- bulkLoadsPerClusters = new HashMap<>();
- }
- // Map of table name Vs list of pair of family and list of
- // hfile paths from its namespace
+ // Map of tableNameStr to (family, hfile paths) pairs
Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap =
bulkLoadsPerClusters.computeIfAbsent(bld.getClusterIdsList(),
k -> new HashMap<>());
- buildBulkLoadHFileMap(bulkLoadHFileMap, table, bld);
+ buildBulkLoadHFileMap(bulkLoadHFileMap, bld);
}
} else if (CellUtil.matchingQualifier(cell,
WALEdit.REPLICATION_MARKER)) {
Mutation put = processReplicationMarkerEntry(cell);
if (put == null) {
continue;
}
- table = REPLICATION_SINK_TRACKER_TABLE_NAME;
- List<UUID> clusterIds = new ArrayList<>();
- for (HBaseProtos.UUID clusterId :
entry.getKey().getClusterIdsList()) {
- clusterIds.add(toUUID(clusterId));
- }
+ List<UUID> clusterIds = getSourceClusterIds(entry);
put.setClusterIds(clusterIds);
- addToHashMultiMap(rowMap, table, clusterIds, put);
+ addToHashMultiMap(sinkRowMap, REPLICATION_SINK_TRACKER_TABLE_NAME,
clusterIds, put);
} else {
- // Handle wal replication
- if (isNewRowOrType(previousCell, cell)) {
- // Create new mutation
- mutation = CellUtil.isDelete(cell)
- ? new Delete(cell.getRowArray(), cell.getRowOffset(),
cell.getRowLength())
- : new Put(cell.getRowArray(), cell.getRowOffset(),
cell.getRowLength());
- List<UUID> clusterIds = new
ArrayList<>(entry.getKey().getClusterIdsList().size());
- for (HBaseProtos.UUID clusterId :
entry.getKey().getClusterIdsList()) {
- clusterIds.add(toUUID(clusterId));
- }
- mutation.setClusterIds(clusterIds);
- mutation.setAttribute(ReplicationUtils.REPLICATION_ATTR_NAME,
+ TableName sinkTableName = translator.getSinkTableName(tableName);
+ ExtendedCell sinkCell = translator.getSinkExtendedCell(tableName,
cell);
Review Comment:
This is the reason why I want to see the javadoc for this method,as why do
we need to pass the original table name in? And I think we will just do
tableName mapping, so we do not need to call the above getSinkTableName
everytime as all the cells from the WALEntry are for the same table?
##########
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java:
##########
@@ -181,6 +186,19 @@ private void decorateConf() {
}
}
+ private ReplicationSinkTranslator getReplicationSinkTranslator() throws
IOException {
+ Class<?> translatorClass =
this.conf.getClass(HConstants.REPLICATION_SINK_TRANSLATOR,
+ IdentityReplicationSinkTranslator.class,
ReplicationSinkTranslator.class);
+ try {
+ return (ReplicationSinkTranslator)
translatorClass.getDeclaredConstructor().newInstance();
Review Comment:
IIRC we have a ReflectionUtils or something to call constructors of a class,
here we do not need to pass the Configuration object to it? Maybe the
translator needs to load some configurations?
##########
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/IdentityReplicationSinkTranslator.java:
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import org.apache.hadoop.hbase.ExtendedCell;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.yetus.audience.InterfaceAudience;
+
[email protected]
Review Comment:
This should be IA.LimitedPrivate("CONFIG")? Do we expected users to use it
directly in their code?
##########
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java:
##########
@@ -181,6 +186,19 @@ private void decorateConf() {
}
}
+ private ReplicationSinkTranslator getReplicationSinkTranslator() throws
IOException {
+ Class<?> translatorClass =
this.conf.getClass(HConstants.REPLICATION_SINK_TRANSLATOR,
+ IdentityReplicationSinkTranslator.class,
ReplicationSinkTranslator.class);
+ try {
+ return (ReplicationSinkTranslator)
translatorClass.getDeclaredConstructor().newInstance();
+ } catch (RuntimeException e) {
+ throw e;
+ } catch (Exception e) {
+ LOG.warn("Failed to instantiate " + translatorClass);
+ return new IdentityReplicationSinkTranslator();
Review Comment:
Is this the right choice to fallback to default implementation? I'm not
sure...
##########
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkTranslator.java:
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import org.apache.hadoop.hbase.ExtendedCell;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.yetus.audience.InterfaceAudience;
+
[email protected]
+public interface ReplicationSinkTranslator {
Review Comment:
Better add some javadoc's to explain the meanings of the methods and the
usage of this class?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]