Repository: asterixdb
Updated Branches:
  refs/heads/master d8694e29c -> 77109ea4b


[ASTERIXDB-1985][CLUS] Add rebalance callback

Change-Id: I9a90ba975467c136371236195f82d48430d8319d
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1863
Sonar-Qube: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <ti...@apache.org>
BAD: Jenkins <jenk...@fulliautomatix.ics.uci.edu>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/77109ea4
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/77109ea4
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/77109ea4

Branch: refs/heads/master
Commit: 77109ea4b71cad1285206b08eecd6427885eb24f
Parents: d8694e2
Author: Yingyi Bu <yin...@couchbase.com>
Authored: Thu Jul 13 13:34:38 2017 -0700
Committer: Till Westmann <ti...@apache.org>
Committed: Thu Jul 13 18:38:47 2017 -0700

----------------------------------------------------------------------
 .../api/http/server/ConnectorApiServlet.java    |  2 +-
 .../api/http/server/RebalanceApiServlet.java    |  3 +-
 .../asterix/app/translator/QueryTranslator.java | 21 +++----
 .../rebalance/IDatasetRebalanceCallback.java    | 64 ++++++++++++++++++++
 .../rebalance/NoOpDatasetRebalanceCallback.java | 47 ++++++++++++++
 .../apache/asterix/utils/FlushDatasetUtil.java  | 17 ++++--
 .../org/apache/asterix/utils/RebalanceUtil.java | 42 +++++++++++--
 .../asterix/metadata/entities/Dataset.java      | 21 +++----
 .../lsm/common/utils/ComponentMetadataUtil.java | 18 ++++++
 9 files changed, 199 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/77109ea4/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ConnectorApiServlet.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ConnectorApiServlet.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ConnectorApiServlet.java
index f1a123c..03958ed 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ConnectorApiServlet.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ConnectorApiServlet.java
@@ -121,7 +121,7 @@ public class ConnectorApiServlet extends AbstractServlet {
                         hcc.getNodeControllerInfos());
 
                 // Flush the cached contents of the dataset to file system.
-                FlushDatasetUtil.flushDataset(hcc, metadataProvider, 
dataverseName, datasetName, datasetName);
+                FlushDatasetUtil.flushDataset(hcc, metadataProvider, 
dataverseName, datasetName);
 
                 // Metadata transaction commits.
                 MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/77109ea4/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RebalanceApiServlet.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RebalanceApiServlet.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RebalanceApiServlet.java
index 3bd1be5..e9d231a 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RebalanceApiServlet.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RebalanceApiServlet.java
@@ -43,6 +43,7 @@ import org.apache.asterix.metadata.MetadataTransactionContext;
 import org.apache.asterix.metadata.declared.MetadataProvider;
 import org.apache.asterix.metadata.entities.Dataset;
 import org.apache.asterix.metadata.entities.Dataverse;
+import org.apache.asterix.rebalance.NoOpDatasetRebalanceCallback;
 import org.apache.asterix.utils.RebalanceUtil;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
@@ -242,7 +243,7 @@ public class RebalanceApiServlet extends AbstractServlet {
         IHyracksClientConnection hcc = (IHyracksClientConnection) 
ctx.get(HYRACKS_CONNECTION_ATTR);
         MetadataProvider metadataProvider = new MetadataProvider(appCtx, null, 
new StorageComponentProvider());
         RebalanceUtil.rebalance(dataverseName, datasetName, new 
LinkedHashSet<>(Arrays.asList(targetNodes)),
-                metadataProvider, hcc);
+                metadataProvider, hcc, NoOpDatasetRebalanceCallback.INSTANCE);
     }
 
     // Sends HTTP response to the request client.

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/77109ea4/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 2967a38..bd5c024 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -34,9 +34,9 @@ import java.util.Iterator;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Properties;
 import java.util.Set;
+import java.util.Map.Entry;
 import java.util.concurrent.ExecutorService;
 import java.util.logging.Level;
 import java.util.logging.Logger;
@@ -55,12 +55,12 @@ import org.apache.asterix.api.http.server.ResultUtil;
 import org.apache.asterix.app.result.ResultHandle;
 import org.apache.asterix.app.result.ResultReader;
 import org.apache.asterix.common.config.ClusterProperties;
+import org.apache.asterix.common.config.ExternalProperties;
+import org.apache.asterix.common.config.GlobalConfig;
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
 import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp;
 import org.apache.asterix.common.config.DatasetConfig.IndexType;
 import org.apache.asterix.common.config.DatasetConfig.TransactionState;
-import org.apache.asterix.common.config.ExternalProperties;
-import org.apache.asterix.common.config.GlobalConfig;
 import org.apache.asterix.common.context.IStorageComponentProvider;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.exceptions.ACIDException;
@@ -159,16 +159,16 @@ import org.apache.asterix.om.types.TypeSignature;
 import org.apache.asterix.runtime.utils.ClusterStateManager;
 import 
org.apache.asterix.transaction.management.service.transaction.DatasetIdFactory;
 import org.apache.asterix.translator.AbstractLangTranslator;
-import 
org.apache.asterix.translator.CompiledStatements.CompiledDeleteStatement;
-import 
org.apache.asterix.translator.CompiledStatements.CompiledInsertStatement;
-import 
org.apache.asterix.translator.CompiledStatements.CompiledLoadFromFileStatement;
-import 
org.apache.asterix.translator.CompiledStatements.CompiledUpsertStatement;
-import org.apache.asterix.translator.CompiledStatements.ICompiledDmlStatement;
 import org.apache.asterix.translator.IStatementExecutor;
 import org.apache.asterix.translator.IStatementExecutorContext;
 import org.apache.asterix.translator.SessionConfig;
 import org.apache.asterix.translator.SessionOutput;
 import org.apache.asterix.translator.TypeTranslator;
+import 
org.apache.asterix.translator.CompiledStatements.CompiledDeleteStatement;
+import 
org.apache.asterix.translator.CompiledStatements.CompiledInsertStatement;
+import 
org.apache.asterix.translator.CompiledStatements.CompiledLoadFromFileStatement;
+import 
org.apache.asterix.translator.CompiledStatements.CompiledUpsertStatement;
+import org.apache.asterix.translator.CompiledStatements.ICompiledDmlStatement;
 import org.apache.asterix.translator.util.ValidateUtil;
 import org.apache.asterix.utils.DataverseUtil;
 import org.apache.asterix.utils.FeedOperations;
@@ -967,8 +967,7 @@ public class QueryTranslator extends AbstractLangTranslator 
implements IStatemen
 
             // #. flush the internal dataset for correlated policy
             if (ds.isCorrelated() && ds.getDatasetType() == 
DatasetType.INTERNAL) {
-                FlushDatasetUtil.flushDataset(hcc, metadataProvider, 
index.getDataverseName(), index.getDatasetName(),
-                        index.getDatasetName());
+                FlushDatasetUtil.flushDataset(hcc, metadataProvider, 
index.getDataverseName(), index.getDatasetName());
             }
 
             mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
@@ -2821,7 +2820,7 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
         }
 
         // Flushes source dataset.
-        FlushDatasetUtil.flushDataset(hcc, metadataProvider, 
dataverseNameFrom, datasetNameFrom, datasetNameFrom);
+        FlushDatasetUtil.flushDataset(hcc, metadataProvider, 
dataverseNameFrom, datasetNameFrom);
     }
 
     // Executes external shell commands.

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/77109ea4/asterixdb/asterix-app/src/main/java/org/apache/asterix/rebalance/IDatasetRebalanceCallback.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/rebalance/IDatasetRebalanceCallback.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/rebalance/IDatasetRebalanceCallback.java
new file mode 100644
index 0000000..e8683c9
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/rebalance/IDatasetRebalanceCallback.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.rebalance;
+
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * This interface is used for customizing the before/after operation for 
rebalance.
+ */
+public interface IDatasetRebalanceCallback {
+
+    /**
+     * The action to perform before the target dataset is populated.
+     *
+     * @param metadataProvider,
+     *            the metadata provider.
+     * @param source,
+     *            the source dataset.
+     * @param target,
+     *            the target dataset.
+     * @param hcc,
+     *            the hyracks client connection.
+     * @throws HyracksDataException
+     */
+    void beforeRebalance(MetadataProvider metadataProvider, Dataset source, 
Dataset target,
+            IHyracksClientConnection hcc) throws HyracksDataException;
+
+    /**
+     * The action to perform after the target datasets is populated.
+     *
+     * @param metadataProvider,
+     *            the metadata provider.
+     * @param source,
+     *            the source dataset.
+     * @param target,
+     *            the target dataset.
+     * @param hcc,
+     *            the hyracks client connection.
+     * @throws HyracksDataException
+     */
+    void afterRebalance(MetadataProvider metadataProvider, Dataset source, 
Dataset target, IHyracksClientConnection hcc)
+            throws HyracksDataException;
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/77109ea4/asterixdb/asterix-app/src/main/java/org/apache/asterix/rebalance/NoOpDatasetRebalanceCallback.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/rebalance/NoOpDatasetRebalanceCallback.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/rebalance/NoOpDatasetRebalanceCallback.java
new file mode 100644
index 0000000..680adbf
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/rebalance/NoOpDatasetRebalanceCallback.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.rebalance;
+
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+
+// The callback performs no action before and after a rebalance.
+public class NoOpDatasetRebalanceCallback implements IDatasetRebalanceCallback 
{
+
+    public static final NoOpDatasetRebalanceCallback INSTANCE = new 
NoOpDatasetRebalanceCallback();
+
+    private NoOpDatasetRebalanceCallback() {
+
+    }
+
+    @Override
+    public void beforeRebalance(MetadataProvider metadataProvider, Dataset 
source, Dataset target,
+            IHyracksClientConnection hcc) {
+        // Does nothing.
+    }
+
+    @Override
+    public void afterRebalance(MetadataProvider metadataProvider, Dataset 
source, Dataset target,
+            IHyracksClientConnection hcc) {
+        // Does nothing.
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/77109ea4/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FlushDatasetUtil.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FlushDatasetUtil.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FlushDatasetUtil.java
index 5445986..958444c 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FlushDatasetUtil.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FlushDatasetUtil.java
@@ -44,7 +44,13 @@ public class FlushDatasetUtil {
     }
 
     public static void flushDataset(IHyracksClientConnection hcc, 
MetadataProvider metadataProvider,
-            String dataverseName, String datasetName, String indexName) throws 
Exception {
+            String dataverseName, String datasetName) throws Exception {
+        Dataset dataset = metadataProvider.findDataset(dataverseName, 
datasetName);
+        flushDataset(hcc, metadataProvider, dataset);
+    }
+
+    public static void flushDataset(IHyracksClientConnection hcc, 
MetadataProvider metadataProvider, Dataset dataset)
+            throws Exception {
         CompilerProperties compilerProperties = 
metadataProvider.getApplicationContext().getCompilerProperties();
         int frameSize = compilerProperties.getFrameSize();
         JobSpecification spec = new JobSpecification(frameSize);
@@ -54,14 +60,13 @@ public class FlushDatasetUtil {
                 new IPushRuntimeFactory[] { new 
EmptyTupleSourceRuntimeFactory() }, rDescs);
 
         org.apache.asterix.common.transactions.JobId jobId = 
JobIdFactory.generateJobId();
-        Dataset dataset = metadataProvider.findDataset(dataverseName, 
datasetName);
-        FlushDatasetOperatorDescriptor flushOperator =
-                new FlushDatasetOperatorDescriptor(spec, jobId, 
dataset.getDatasetId());
+        FlushDatasetOperatorDescriptor flushOperator = new 
FlushDatasetOperatorDescriptor(spec, jobId,
+                dataset.getDatasetId());
 
         spec.connect(new OneToOneConnectorDescriptor(spec), emptySource, 0, 
flushOperator, 0);
 
-        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> 
primarySplitsAndConstraint =
-                metadataProvider.getSplitProviderAndConstraints(dataset, 
indexName);
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> 
primarySplitsAndConstraint = metadataProvider
+                .getSplitProviderAndConstraints(dataset, 
dataset.getDatasetName());
         AlgebricksPartitionConstraint primaryPartitionConstraint = 
primarySplitsAndConstraint.second;
 
         
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, 
emptySource,

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/77109ea4/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
index 3b17a94..1ed37e0 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
@@ -42,6 +42,7 @@ import org.apache.asterix.metadata.lock.LockList;
 import org.apache.asterix.metadata.lock.MetadataLockManager;
 import org.apache.asterix.metadata.utils.DatasetUtil;
 import org.apache.asterix.metadata.utils.IndexUtil;
+import org.apache.asterix.rebalance.IDatasetRebalanceCallback;
 import org.apache.asterix.runtime.job.listener.JobEventListenerFactory;
 import 
org.apache.asterix.transaction.management.service.transaction.JobIdFactory;
 import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
@@ -85,7 +86,8 @@ public class RebalanceUtil {
      * @throws Exception
      */
     public static void rebalance(String dataverseName, String datasetName, 
Set<String> targetNcNames,
-            MetadataProvider metadataProvider, IHyracksClientConnection hcc) 
throws Exception {
+            MetadataProvider metadataProvider, IHyracksClientConnection hcc,
+            IDatasetRebalanceCallback datasetRebalanceCallback) throws 
Exception {
         Dataset sourceDataset;
         Dataset targetDataset;
         // Executes the first Metadata transaction.
@@ -115,10 +117,11 @@ public class RebalanceUtil {
                     metadataProvider);
 
             // The target dataset for rebalance.
-            targetDataset = new Dataset(sourceDataset, true, nodeGroupName);
+            targetDataset = 
sourceDataset.getTargetDatasetForRebalance(nodeGroupName);
+
 
             // Rebalances the source dataset into the target dataset.
-            rebalance(sourceDataset, targetDataset, metadataProvider, hcc);
+            rebalance(sourceDataset, targetDataset, metadataProvider, hcc, 
datasetRebalanceCallback);
 
             // Complete the metadata transaction.
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
@@ -192,10 +195,13 @@ public class RebalanceUtil {
 
     // Rebalances from the source to the target.
     private static void rebalance(Dataset source, Dataset target, 
MetadataProvider metadataProvider,
-            IHyracksClientConnection hcc) throws Exception {
+            IHyracksClientConnection hcc, IDatasetRebalanceCallback 
datasetRebalanceCallback) throws Exception {
         // Drops the target dataset files (if any) to make rebalance 
idempotent.
         dropDatasetFiles(target, metadataProvider, hcc);
 
+        // Performs the specified operation before the target dataset is 
populated.
+        datasetRebalanceCallback.beforeRebalance(metadataProvider, source, 
target, hcc);
+
         // Creates the rebalance target.
         createRebalanceTarget(target, metadataProvider, hcc);
 
@@ -204,6 +210,9 @@ public class RebalanceUtil {
 
         // Creates and loads indexes for the rebalance target.
         createAndLoadSecondaryIndexesForTarget(source, target, 
metadataProvider, hcc);
+
+        // Performs the specified operation after the target dataset is 
populated.
+        datasetRebalanceCallback.afterRebalance(metadataProvider, source, 
target, hcc);
     }
 
     // Switches the metadata entity from the source dataset to the target 
dataset.
@@ -305,8 +314,7 @@ public class RebalanceUtil {
     // Creates the commit operator for populating the target dataset.
     private static IOperatorDescriptor createUpsertCommitOp(JobSpecification 
spec, MetadataProvider metadataProvider,
             JobId jobId, Dataset target) throws AlgebricksException {
-        int numKeys = target.getPrimaryKeys().size();
-        int[] primaryKeyFields = IntStream.range(0, numKeys).toArray();
+        int[] primaryKeyFields = getPrimaryKeyPermutationForUpsert(target);
         return new AlgebricksMetaOperatorDescriptor(spec, 1, 0,
                 new IPushRuntimeFactory[] {
                         target.getCommitRuntimeFactory(metadataProvider, 
jobId, primaryKeyFields, true) },
@@ -351,4 +359,26 @@ public class RebalanceUtil {
             JobUtils.runJob(hcc, indexLoadingJobSpec, true);
         }
     }
+
+    // Gets the primary key permutation for upserts.
+    private static int[] getPrimaryKeyPermutationForUpsert(Dataset dataset) {
+        // prev record first
+        int f = 1;
+        // add the previous meta second
+        if (dataset.hasMetaPart()) {
+            f++;
+        }
+        // add the previous filter third
+        int numFilterFields = DatasetUtil.getFilterField(dataset) == null ? 0 
: 1;
+        if (numFilterFields > 0) {
+            f++;
+        }
+        int numPrimaryKeys = dataset.getPrimaryKeys().size();
+        int[] pkIndexes = new int[numPrimaryKeys];
+        for (int i = 0; i < pkIndexes.length; i++) {
+            pkIndexes[i] = f;
+            f++;
+        }
+        return pkIndexes;
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/77109ea4/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
index 9131692..020ff6c 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
@@ -40,8 +40,8 @@ import 
org.apache.asterix.common.ioopcallbacks.LSMBTreeWithBuddyIOOperationCallb
 import 
org.apache.asterix.common.ioopcallbacks.LSMInvertedIndexIOOperationCallbackFactory;
 import 
org.apache.asterix.common.ioopcallbacks.LSMRTreeIOOperationCallbackFactory;
 import org.apache.asterix.common.metadata.IDataset;
-import org.apache.asterix.common.transactions.IRecoveryManager.ResourceType;
 import org.apache.asterix.common.transactions.JobId;
+import org.apache.asterix.common.transactions.IRecoveryManager.ResourceType;
 import org.apache.asterix.common.utils.JobUtils;
 import org.apache.asterix.common.utils.JobUtils.ProgressState;
 import org.apache.asterix.external.feed.management.FeedConnectionId;
@@ -67,7 +67,6 @@ import 
org.apache.asterix.metadata.utils.RTreeResourceFactoryProvider;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.IAType;
 import org.apache.asterix.om.utils.RecordUtil;
-import 
org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModificationOperationCallback.Operation;
 import 
org.apache.asterix.transaction.management.opcallbacks.LockThenSearchOperationCallbackFactory;
 import 
org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexInstantSearchOperationCallbackFactory;
 import 
org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexModificationOperationCallbackFactory;
@@ -78,6 +77,7 @@ import 
org.apache.asterix.transaction.management.opcallbacks.SecondaryIndexSearc
 import 
org.apache.asterix.transaction.management.opcallbacks.TempDatasetPrimaryIndexModificationOperationCallbackFactory;
 import 
org.apache.asterix.transaction.management.opcallbacks.TempDatasetSecondaryIndexModificationOperationCallbackFactory;
 import 
org.apache.asterix.transaction.management.opcallbacks.UpsertOperationCallbackFactory;
+import 
org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModificationOperationCallback.Operation;
 import 
org.apache.asterix.transaction.management.resource.DatasetLocalResourceFactory;
 import org.apache.asterix.transaction.management.runtime.CommitRuntimeFactory;
 import 
org.apache.asterix.transaction.management.service.transaction.DatasetIdFactory;
@@ -176,15 +176,6 @@ public class Dataset implements IMetadataEntity<Dataset>, 
IDataset {
                 dataset.hints, dataset.datasetType, dataset.datasetId, 
dataset.pendingOp, dataset.rebalanceCount);
     }
 
-    public Dataset(Dataset dataset, boolean forRebalance, String 
targetNodeGroupName) {
-        this(dataset.dataverseName, dataset.datasetName, 
dataset.recordTypeDataverseName, dataset.recordTypeName,
-                dataset.metaTypeDataverseName, dataset.metaTypeName, 
targetNodeGroupName,
-                dataset.compactionPolicyFactory,
-                dataset.compactionPolicyProperties, dataset.datasetDetails, 
dataset.hints, dataset.datasetType,
-                forRebalance ? 
DatasetIdFactory.generateAlternatingDatasetId(dataset.datasetId) : 
dataset.datasetId,
-                dataset.pendingOp, forRebalance ? dataset.rebalanceCount + 1 : 
dataset.rebalanceCount);
-    }
-
     public Dataset(String dataverseName, String datasetName, String 
itemTypeDataverseName, String itemTypeName,
             String metaItemTypeDataverseName, String metaItemTypeName, String 
nodeGroupName, String compactionPolicy,
             Map<String, String> compactionPolicyProperties, IDatasetDetails 
datasetDetails, Map<String, String> hints,
@@ -801,6 +792,14 @@ public class Dataset implements IMetadataEntity<Dataset>, 
IDataset {
         return IntStream.range(0, numPrimaryKeys).toArray();
     }
 
+    // Gets the target dataset for the purpose of rebalance.
+    public Dataset getTargetDatasetForRebalance(String targetNodeGroupName) {
+        return new Dataset(this.dataverseName, this.datasetName, 
this.recordTypeDataverseName, this.recordTypeName,
+                this.metaTypeDataverseName, this.metaTypeName, 
targetNodeGroupName, this.compactionPolicyFactory,
+                this.compactionPolicyProperties, this.datasetDetails, 
this.hints, this.datasetType,
+                DatasetIdFactory.generateAlternatingDatasetId(this.datasetId), 
this.pendingOp, this.rebalanceCount + 1);
+    }
+
     // Gets an array of partition numbers for this dataset.
     protected int[] getDatasetPartitions(MetadataProvider metadataProvider) 
throws AlgebricksException {
         FileSplit[] splitsForDataset = 
metadataProvider.splitsForIndex(metadataProvider.getMetadataTxnContext(), this,

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/77109ea4/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/utils/ComponentMetadataUtil.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/utils/ComponentMetadataUtil.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/utils/ComponentMetadataUtil.java
index b55e8ad..6ccbc8d 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/utils/ComponentMetadataUtil.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/utils/ComponentMetadataUtil.java
@@ -85,6 +85,24 @@ public class ComponentMetadataUtil {
         }
     }
 
+    /**
+     * Put LSM metadata state into the index's current memory component.
+     *
+     * @param index,
+     *            the LSM index.
+     * @param key,
+     *            the key for the metadata state.
+     * @param pointable,
+     *            the value for the metadata state.
+     * @throws HyracksDataException
+     */
+    public static void put(ILSMIndex index, IValueReference key, IPointable 
pointable) throws HyracksDataException {
+        // write the opTracker to ensure the component layout don't change
+        synchronized (index.getOperationTracker()) {
+            index.getCurrentMemoryComponent().getMetadata().put(key, 
pointable);
+        }
+    }
+
     private static void fromDiskComponents(ILSMIndex index, IValueReference 
key, IPointable pointable)
             throws HyracksDataException {
         for (ILSMDiskComponent c : index.getImmutableComponents()) {

Reply via email to