This is an automated email from the ASF dual-hosted git repository.
inigoiri pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 8e173851410 HDFS-17009. RBF: state store putAll should also return
failed records (#5664)
8e173851410 is described below
commit 8e173851410a529ed7d85600d17fc1f6580029df
Author: Viraj Jasani <[email protected]>
AuthorDate: Wed May 17 09:33:34 2023 -0700
HDFS-17009. RBF: state store putAll should also return failed records
(#5664)
---
.../store/driver/StateStoreOperationResult.java | 79 ++++++++++++++++++++++
.../store/driver/StateStoreRecordOperations.java | 5 +-
.../store/driver/impl/StateStoreBaseImpl.java | 2 +-
.../store/driver/impl/StateStoreFileBaseImpl.java | 27 +++++---
.../store/driver/impl/StateStoreMySQLImpl.java | 12 ++--
.../store/driver/impl/StateStoreZooKeeperImpl.java | 10 ++-
.../federation/store/impl/MountTableStoreImpl.java | 2 +-
.../store/FederationStateStoreTestUtils.java | 4 +-
.../store/driver/TestStateStoreDriverBase.java | 23 ++++++-
.../store/records/MockStateStoreDriver.java | 11 +--
10 files changed, 143 insertions(+), 32 deletions(-)
diff --git
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreOperationResult.java
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreOperationResult.java
new file mode 100644
index 00000000000..02e54bfced2
--- /dev/null
+++
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreOperationResult.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.federation.store.driver;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * State store operation result with list of failed records.
+ */
[email protected]
[email protected]
+public class StateStoreOperationResult {
+
+ private final List<String> failedRecordsKeys;
+ private final boolean isOperationSuccessful;
+
+ private static final StateStoreOperationResult
DEFAULT_OPERATION_SUCCESS_RESULT =
+ new StateStoreOperationResult(Collections.emptyList(), true);
+
+ /**
+ * State store operation result constructor with list of failed records keys
and boolean
+ * to inform whether the overall operation is successful.
+ *
+ * @param failedRecordsKeys The list of failed records keys.
+ * @param isOperationSuccessful True if the operation was successful, False
otherwise.
+ */
+ public StateStoreOperationResult(List<String> failedRecordsKeys,
+ boolean isOperationSuccessful) {
+ this.failedRecordsKeys = failedRecordsKeys;
+ this.isOperationSuccessful = isOperationSuccessful;
+ }
+
+ /**
+ * State store operation result constructor with a single failed record key.
+ *
+ * @param failedRecordKey The failed record key.
+ */
+ public StateStoreOperationResult(String failedRecordKey) {
+ if (failedRecordKey != null && failedRecordKey.length() > 0) {
+ this.isOperationSuccessful = false;
+ this.failedRecordsKeys = Collections.singletonList(failedRecordKey);
+ } else {
+ this.isOperationSuccessful = true;
+ this.failedRecordsKeys = Collections.emptyList();
+ }
+ }
+
+ public List<String> getFailedRecordsKeys() {
+ return failedRecordsKeys;
+ }
+
+ public boolean isOperationSuccessful() {
+ return isOperationSuccessful;
+ }
+
+ public static StateStoreOperationResult getDefaultSuccessResult() {
+ return DEFAULT_OPERATION_SUCCESS_RESULT;
+ }
+}
diff --git
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreRecordOperations.java
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreRecordOperations.java
index 04929d5fcc1..716f41daf4d 100644
---
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreRecordOperations.java
+++
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreRecordOperations.java
@@ -107,12 +107,11 @@ public interface StateStoreRecordOperations {
* @param allowUpdate True if update of exiting record is allowed.
* @param errorIfExists True if an error should be returned when inserting
* an existing record. Only used if allowUpdate = false.
- * @return true if all operations were successful.
- *
+ * @return The result of the putAll operation.
* @throws IOException Throws exception if unable to query the data store.
*/
@AtMostOnce
- <T extends BaseRecord> boolean putAll(
+ <T extends BaseRecord> StateStoreOperationResult putAll(
List<T> records, boolean allowUpdate, boolean errorIfExists)
throws IOException;
diff --git
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreBaseImpl.java
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreBaseImpl.java
index f7a6174226e..df3ce21dee2 100644
---
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreBaseImpl.java
+++
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreBaseImpl.java
@@ -75,7 +75,7 @@ public abstract class StateStoreBaseImpl extends
StateStoreDriver {
T record, boolean allowUpdate, boolean errorIfExists) throws IOException
{
List<T> singletonList = new ArrayList<>();
singletonList.add(record);
- return putAll(singletonList, allowUpdate, errorIfExists);
+ return putAll(singletonList, allowUpdate,
errorIfExists).isOperationSuccessful();
}
@Override
diff --git
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileBaseImpl.java
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileBaseImpl.java
index 4f8feee6093..a0f6fba9bac 100644
---
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileBaseImpl.java
+++
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileBaseImpl.java
@@ -47,6 +47,7 @@ import
org.apache.hadoop.hdfs.server.federation.metrics.StateStoreMetrics;
import
org.apache.hadoop.hdfs.server.federation.store.StateStoreUnavailableException;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils;
import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
+import
org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreOperationResult;
import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
import org.apache.hadoop.hdfs.server.federation.store.records.Query;
import org.apache.hadoop.hdfs.server.federation.store.records.QueryResult;
@@ -372,12 +373,12 @@ public abstract class StateStoreFileBaseImpl
}
@Override
- public <T extends BaseRecord> boolean putAll(
+ public <T extends BaseRecord> StateStoreOperationResult putAll(
List<T> records, boolean allowUpdate, boolean errorIfExists)
throws StateStoreUnavailableException {
verifyDriverReady();
if (records.isEmpty()) {
- return true;
+ return StateStoreOperationResult.getDefaultSuccessResult();
}
long start = monotonicNow();
@@ -402,7 +403,7 @@ public abstract class StateStoreFileBaseImpl
if (metrics != null) {
metrics.addFailure(monotonicNow() - start);
}
- return false;
+ return new StateStoreOperationResult(primaryKey);
} else {
LOG.debug("Not updating {}", record);
}
@@ -414,7 +415,9 @@ public abstract class StateStoreFileBaseImpl
// Write the records
final AtomicBoolean success = new AtomicBoolean(true);
final List<Callable<Void>> callables = new ArrayList<>();
- toWrite.entrySet().forEach(entry -> callables.add(() ->
writeRecordToFile(success, entry)));
+ final List<String> failedRecordsKeys = Collections.synchronizedList(new
ArrayList<>());
+ toWrite.entrySet().forEach(
+ entry -> callables.add(() -> writeRecordToFile(success, entry,
failedRecordsKeys)));
if (this.concurrentStoreAccessPool != null) {
// Write records concurrently
List<Future<Void>> futures = null;
@@ -454,36 +457,40 @@ public abstract class StateStoreFileBaseImpl
metrics.addFailure(end - start);
}
}
- return success.get();
+ return new StateStoreOperationResult(failedRecordsKeys, success.get());
}
/**
* Writes the state store record to the file. At first, the record is
written to a temp location
* and then later renamed to the final location that is passed with the
entry key.
*
+ * @param <T> Record class of the records.
* @param success The atomic boolean that gets updated to false if the file
write operation fails.
* @param entry The entry of the record path and the state store record to
be written to the file
* by first writing to a temp location and then renaming it to the record
path.
- * @param <T> Record class of the records.
+ * @param failedRecordsList The list of paths of the failed records.
* @return Void.
*/
private <T extends BaseRecord> Void writeRecordToFile(AtomicBoolean success,
- Entry<String, T> entry) {
- String recordPath = entry.getKey();
- String recordPathTemp = recordPath + "." + now() + TMP_MARK;
+ Entry<String, T> entry, List<String> failedRecordsList) {
+ final String recordPath = entry.getKey();
+ final T record = entry.getValue();
+ final String primaryKey = getPrimaryKey(record);
+ final String recordPathTemp = recordPath + "." + now() + TMP_MARK;
boolean recordWrittenSuccessfully = true;
try (BufferedWriter writer = getWriter(recordPathTemp)) {
- T record = entry.getValue();
String line = serializeString(record);
writer.write(line);
} catch (IOException e) {
LOG.error("Cannot write {}", recordPathTemp, e);
recordWrittenSuccessfully = false;
+ failedRecordsList.add(primaryKey);
success.set(false);
}
// Commit
if (recordWrittenSuccessfully && !rename(recordPathTemp, recordPath)) {
LOG.error("Failed committing record into {}", recordPath);
+ failedRecordsList.add(primaryKey);
success.set(false);
}
return null;
diff --git
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreMySQLImpl.java
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreMySQLImpl.java
index 9b32c883f54..bbeee8e40f2 100644
---
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreMySQLImpl.java
+++
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreMySQLImpl.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.federation.metrics.StateStoreMetrics;
import
org.apache.hadoop.hdfs.server.federation.router.security.token.SQLConnectionFactory;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils;
+import
org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreOperationResult;
import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
import
org.apache.hadoop.hdfs.server.federation.store.records.DisabledNameservice;
import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState;
@@ -161,10 +162,10 @@ public class StateStoreMySQLImpl extends
StateStoreSerializableImpl {
}
@Override
- public <T extends BaseRecord> boolean putAll(
+ public <T extends BaseRecord> StateStoreOperationResult putAll(
List<T> records, boolean allowUpdate, boolean errorIfExists) throws
IOException {
if (records.isEmpty()) {
- return true;
+ return StateStoreOperationResult.getDefaultSuccessResult();
}
verifyDriverReady();
@@ -173,6 +174,7 @@ public class StateStoreMySQLImpl extends
StateStoreSerializableImpl {
long start = Time.monotonicNow();
boolean success = true;
+ final List<String> failedRecordsKeys = new ArrayList<>();
for (T record : records) {
String tableName = getAndValidateTableNameForClass(record.getClass());
String primaryKey = getPrimaryKey(record);
@@ -185,6 +187,7 @@ public class StateStoreMySQLImpl extends
StateStoreSerializableImpl {
record.setDateModified(this.getTime());
if (!updateRecord(tableName, primaryKey, data)) {
LOG.error("Cannot write {} into table {}", primaryKey, tableName);
+ failedRecordsKeys.add(primaryKey);
success = false;
}
} else {
@@ -194,7 +197,7 @@ public class StateStoreMySQLImpl extends
StateStoreSerializableImpl {
if (metrics != null) {
metrics.addFailure(Time.monotonicNow() - start);
}
- return false;
+ return new StateStoreOperationResult(primaryKey);
} else {
LOG.debug("Not updating {} as updates are not allowed", record);
}
@@ -202,6 +205,7 @@ public class StateStoreMySQLImpl extends
StateStoreSerializableImpl {
} else {
if (!insertRecord(tableName, primaryKey, data)) {
LOG.error("Cannot write {} in table {}", primaryKey, tableName);
+ failedRecordsKeys.add(primaryKey);
success = false;
}
}
@@ -215,7 +219,7 @@ public class StateStoreMySQLImpl extends
StateStoreSerializableImpl {
metrics.addFailure(end - start);
}
}
- return success;
+ return new StateStoreOperationResult(failedRecordsKeys, success);
}
@Override
diff --git
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java
index 7882c8f8273..18d3e1a11d0 100644
---
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java
+++
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java
@@ -24,6 +24,7 @@ import static org.apache.hadoop.util.Time.monotonicNow;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
@@ -40,6 +41,7 @@ import
org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
+import
org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreOperationResult;
import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
import org.apache.hadoop.hdfs.server.federation.store.records.Query;
import org.apache.hadoop.hdfs.server.federation.store.records.QueryResult;
@@ -230,11 +232,11 @@ public class StateStoreZooKeeperImpl extends
StateStoreSerializableImpl {
}
@Override
- public <T extends BaseRecord> boolean putAll(
+ public <T extends BaseRecord> StateStoreOperationResult putAll(
List<T> records, boolean update, boolean error) throws IOException {
verifyDriverReady();
if (records.isEmpty()) {
- return true;
+ return StateStoreOperationResult.getDefaultSuccessResult();
}
// All records should be the same
@@ -245,6 +247,7 @@ public class StateStoreZooKeeperImpl extends
StateStoreSerializableImpl {
long start = monotonicNow();
final AtomicBoolean status = new AtomicBoolean(true);
List<Callable<Void>> callables = new ArrayList<>();
+ final List<String> failedRecordsKeys = Collections.synchronizedList(new
ArrayList<>());
records.forEach(record ->
callables.add(
() -> {
@@ -252,6 +255,7 @@ public class StateStoreZooKeeperImpl extends
StateStoreSerializableImpl {
String recordZNode = getNodePath(znode, primaryKey);
byte[] data = serialize(record);
if (!writeNode(recordZNode, data, update, error)) {
+ failedRecordsKeys.add(primaryKey);
status.set(false);
}
return null;
@@ -276,7 +280,7 @@ public class StateStoreZooKeeperImpl extends
StateStoreSerializableImpl {
} else {
getMetrics().addFailure(end - start);
}
- return status.get();
+ return new StateStoreOperationResult(failedRecordsKeys, status.get());
}
@Override
diff --git
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/MountTableStoreImpl.java
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/MountTableStoreImpl.java
index b6428f7923b..b2a608ce933 100644
---
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/MountTableStoreImpl.java
+++
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/MountTableStoreImpl.java
@@ -145,7 +145,7 @@ public class MountTableStoreImpl extends MountTableStore {
final String src = mountTable.getSourcePath();
checkMountTablePermission(src);
}
- boolean status = getDriver().putAll(mountTables, false, true);
+ boolean status = getDriver().putAll(mountTables, false,
true).isOperationSuccessful();
AddMountTableEntriesResponse response =
AddMountTableEntriesResponse.newInstance();
response.setStatus(status);
if (status) {
diff --git
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/FederationStateStoreTestUtils.java
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/FederationStateStoreTestUtils.java
index 50840460a39..789458100ac 100644
---
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/FederationStateStoreTestUtils.java
+++
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/FederationStateStoreTestUtils.java
@@ -235,9 +235,7 @@ public final class FederationStateStoreTestUtils {
StateStoreDriver driver = stateStore.getDriver();
driver.verifyDriverReady();
if (driver.removeAll(clazz)) {
- if (driver.putAll(records, true, false)) {
- return true;
- }
+ return driver.putAll(records, true, false).isOperationSuccessful();
}
return false;
}
diff --git
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreDriverBase.java
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreDriverBase.java
index 73d0774ace3..8b734305a20 100644
---
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreDriverBase.java
+++
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreDriverBase.java
@@ -307,7 +307,15 @@ public class TestStateStoreDriverBase {
}
// Verify
- assertTrue(driver.putAll(insertList, false, true));
+ StateStoreOperationResult result1 = driver.putAll(insertList, false, true);
+ assertTrue(result1.isOperationSuccessful());
+ assertEquals(0, result1.getFailedRecordsKeys().size());
+
+ StateStoreOperationResult result2 = driver.putAll(insertList.subList(0,
1), false, true);
+ assertFalse(result2.isOperationSuccessful());
+ assertEquals(1, result2.getFailedRecordsKeys().size());
+ assertEquals(getPrimaryKey(insertList.get(0)),
result2.getFailedRecordsKeys().get(0));
+
records = driver.get(clazz);
assertEquals(records.getRecords().size(), 10);
@@ -384,7 +392,10 @@ public class TestStateStoreDriverBase {
}
// Verify
- assertTrue(driver.putAll(insertList, false, true));
+ StateStoreOperationResult result = driver.putAll(insertList, false, true);
+ assertTrue(result.isOperationSuccessful());
+ assertEquals(0, result.getFailedRecordsKeys().size());
+
records = driver.get(clazz);
assertEquals(records.getRecords().size(), 10);
@@ -689,4 +700,12 @@ public class TestStateStoreDriverBase {
}
return null;
}
+
+ private static String getPrimaryKey(BaseRecord record) {
+ String primaryKey = record.getPrimaryKey();
+ primaryKey = primaryKey.replaceAll("/", "0SLASH0");
+ primaryKey = primaryKey.replaceAll(":", "_");
+ return primaryKey;
+ }
+
}
\ No newline at end of file
diff --git
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/records/MockStateStoreDriver.java
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/records/MockStateStoreDriver.java
index 9f600cb6f3f..d0821a1711b 100644
---
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/records/MockStateStoreDriver.java
+++
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/records/MockStateStoreDriver.java
@@ -18,11 +18,13 @@
package org.apache.hadoop.hdfs.server.federation.store.records;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils;
+import
org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreOperationResult;
import
org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreBaseImpl;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@@ -89,10 +91,9 @@ public class MockStateStoreDriver extends StateStoreBaseImpl
{
}
@Override
- public <T extends BaseRecord> boolean putAll(List<T> records,
- boolean allowUpdate,
- boolean errorIfExists)
- throws IOException {
+ public <T extends BaseRecord> StateStoreOperationResult putAll(List<T>
records,
+ boolean allowUpdate,
+ boolean errorIfExists) throws IOException {
checkErrors();
for (T record : records) {
Map<String, BaseRecord> map =
@@ -107,7 +108,7 @@ public class MockStateStoreDriver extends
StateStoreBaseImpl {
+ ": " + key);
}
}
- return true;
+ return new StateStoreOperationResult(Collections.emptyList(), true);
}
/**
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]