Murtadha Hubail has submitted this change and it was merged. ( 
https://asterix-gerrit.ics.uci.edu/3432 )

Change subject: [NO ISSUE][OTH] Simplify ResultJobRecord APIs
......................................................................

[NO ISSUE][OTH] Simplify ResultJobRecord APIs

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Simplify ResultJobRecord APIs by allowing only a single ResultSetId
  per ResultJobRecord (i.e. per job).
- Fail result partition registration when a job attempts to use
  multiple ResultSetIds or inconsistent number of partitions and
  log the inconsistency.
- Delete test ReplicateOperatorTest which duplicates
  the test PushRuntimeTest#scanReplicateWrite.

Change-Id: I37816efc92ee9f5e66f29ce74dec4c6c5bd07c6f
Reviewed-on: https://asterix-gerrit.ics.uci.edu/3432
Contrib: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mhub...@apache.org>
Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <ti...@apache.org>
---
M 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultJobRecord.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java
D 
hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/ReplicateOperatorTest.java
3 files changed, 37 insertions(+), 155 deletions(-)

Approvals:
  Jenkins: Verified; ; Verified
  Anon. E. Moose (1000171):
  Murtadha Hubail: Looks good to me, but someone else must approve
  Till Westmann: Looks good to me, approved

Objections:
  Jenkins: Violations found



diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultJobRecord.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultJobRecord.java
index b3b0706..02762ee 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultJobRecord.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultJobRecord.java
@@ -21,12 +21,12 @@
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;

 import org.apache.hyracks.api.exceptions.ErrorCode;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;

 public class ResultJobRecord implements IResultStateRecord {

@@ -77,15 +77,13 @@
     }

     private static final long serialVersionUID = 1L;
-
+    private static final Logger LOGGER = LogManager.getLogger();
     private final long timestamp;
     private long jobStartTime;
     private long jobEndTime;
     private Status status;
-
+    private ResultSetId rsId;
     private ResultSetMetaData resultSetMetaData;
-
-    private Map<ResultSetId, ResultSetMetaData> resultSetMetadataMap = new 
HashMap<>();

     public ResultJobRecord() {
         this.timestamp = System.nanoTime();
@@ -116,10 +114,6 @@
         updateState(State.SUCCESS);
     }

-    public void fail(ResultSetId rsId, int partition) {
-        getOrCreateDirectoryRecord(rsId, partition).fail();
-    }
-
     public void fail(List<Exception> exceptions) {
         updateState(State.FAILED);
         status.setExceptions(exceptions);
@@ -139,47 +133,40 @@
         StringBuilder sb = new StringBuilder();
         sb.append("{ \"status\": ").append(status.toString()).append(", ");
         sb.append("\"timestamp\": ").append(timestamp).append(", ");
-        sb.append("\"resultsets\": 
").append(Arrays.toString(resultSetMetadataMap.entrySet().toArray())).append(" 
}");
+        sb.append("\"resultset\": ").append(resultSetMetaData).append(" }");
         return sb.toString();
     }

     public synchronized void setResultSetMetaData(ResultSetId rsId, 
IResultMetadata metadata, int nPartitions)
             throws HyracksDataException {
-        ResultSetMetaData rsMd = resultSetMetadataMap.get(rsId);
-        if (rsMd == null) {
-            final ResultSetMetaData resultSetMetaData = new 
ResultSetMetaData(nPartitions, metadata);
-            resultSetMetadataMap.put(rsId, resultSetMetaData);
-            this.resultSetMetaData = resultSetMetaData;
-        } else if (rsMd.getRecords().length != nPartitions) {
-            throw 
HyracksDataException.create(ErrorCode.INCONSISTENT_RESULT_METADATA, 
rsId.toString());
+        if (this.rsId == null) {
+            this.rsId = rsId;
+            this.resultSetMetaData = new ResultSetMetaData(nPartitions, 
metadata);
+        } else if (!this.rsId.equals(rsId) || 
resultSetMetaData.getRecords().length != nPartitions) {
+            logInconsistentMetadata(rsId, nPartitions);
+            throw 
HyracksDataException.create(ErrorCode.INCONSISTENT_RESULT_METADATA, 
this.rsId.toString());
         }
-        //TODO(tillw) throwing a HyracksDataException here hangs the execution 
tests
     }

-    public ResultSetMetaData getResultSetMetaData(ResultSetId rsId) {
-        return resultSetMetadataMap.get(rsId);
-    }
-
-    public synchronized ResultDirectoryRecord 
getOrCreateDirectoryRecord(ResultSetId rsId, int partition) {
-        ResultDirectoryRecord[] records = 
getResultSetMetaData(rsId).getRecords();
+    public synchronized ResultDirectoryRecord getOrCreateDirectoryRecord(int 
partition) {
+        ResultDirectoryRecord[] records = resultSetMetaData.getRecords();
         if (records[partition] == null) {
             records[partition] = new ResultDirectoryRecord();
         }
         return records[partition];
     }

-    public synchronized ResultDirectoryRecord getDirectoryRecord(ResultSetId 
rsId, int partition)
-            throws HyracksDataException {
-        ResultDirectoryRecord[] records = 
getResultSetMetaData(rsId).getRecords();
+    public synchronized ResultDirectoryRecord getDirectoryRecord(int 
partition) throws HyracksDataException {
+        ResultDirectoryRecord[] records = resultSetMetaData.getRecords();
         if (records[partition] == null) {
             throw HyracksDataException.create(ErrorCode.RESULT_NO_RECORD, 
partition, rsId);
         }
         return records[partition];
     }

-    public synchronized void updateState(ResultSetId rsId) {
+    public synchronized void updateState() {
         int successCount = 0;
-        ResultDirectoryRecord[] records = 
getResultSetMetaData(rsId).getRecords();
+        ResultDirectoryRecord[] records = resultSetMetaData.getRecords();
         for (ResultDirectoryRecord record : records) {
             if ((record != null) && (record.getStatus() == 
ResultDirectoryRecord.Status.SUCCESS)) {
                 successCount++;
@@ -193,4 +180,18 @@
     public synchronized ResultSetMetaData getResultSetMetaData() {
         return resultSetMetaData;
     }
+
+    private void logInconsistentMetadata(ResultSetId rsId, int nPartitions) {
+        if (LOGGER.isWarnEnabled()) {
+            LOGGER.warn("inconsistent result metadata for result set {}", 
this.rsId);
+            if (!this.rsId.equals(rsId)) {
+                LOGGER.warn("inconsistent result set id. Current {}, new {}", 
this.rsId, rsId);
+            }
+            final int expectedPartitions = 
resultSetMetaData.getRecords().length;
+            if (expectedPartitions != nPartitions) {
+                LOGGER.warn("inconsistent result set number of partitions. 
Current {}, new {}", expectedPartitions,
+                        nPartitions);
+            }
+        }
+    }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java
index a2218e2..bfecc48 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java
@@ -121,7 +121,7 @@
             throws HyracksDataException {
         ResultJobRecord djr = getNonNullResultJobRecord(jobId);
         djr.setResultSetMetaData(rsId, metadata, nPartitions);
-        ResultDirectoryRecord record = djr.getOrCreateDirectoryRecord(rsId, 
partition);
+        ResultDirectoryRecord record = 
djr.getOrCreateDirectoryRecord(partition);

         record.setNetworkAddress(networkAddress);
         record.setEmpty(emptyResult);
@@ -147,8 +147,8 @@
     public synchronized void reportResultPartitionWriteCompletion(JobId jobId, 
ResultSetId rsId, int partition)
             throws HyracksDataException {
         ResultJobRecord djr = getNonNullResultJobRecord(jobId);
-        djr.getDirectoryRecord(rsId, partition).writeEOS();
-        djr.updateState(rsId);
+        djr.getDirectoryRecord(partition).writeEOS();
+        djr.updateState();
         notifyAll();
     }

@@ -178,7 +178,7 @@

     @Override
     public synchronized IResultMetadata getResultMetadata(JobId jobId, 
ResultSetId rsId) throws HyracksDataException {
-        return 
getNonNullResultJobRecord(jobId).getResultSetMetaData(rsId).getMetadata();
+        return 
getNonNullResultJobRecord(jobId).getResultSetMetaData().getMetadata();
     }

     @Override
@@ -228,7 +228,6 @@
     private ResultDirectoryRecord[] updatedRecords(JobId jobId, ResultSetId 
rsId, ResultDirectoryRecord[] knownRecords)
             throws HyracksDataException {
         ResultJobRecord djr = getNonNullResultJobRecord(jobId);
-
         if (djr.getStatus().getState() == State.FAILED) {
             List<Exception> caughtExceptions = djr.getStatus().getExceptions();
             if (caughtExceptions != null && !caughtExceptions.isEmpty()) {
@@ -241,13 +240,11 @@
                 throw 
HyracksDataException.create(ErrorCode.RESULT_FAILURE_NO_EXCEPTION, rsId, jobId);
             }
         }
-
-        final ResultSetMetaData resultSetMetaData = 
djr.getResultSetMetaData(rsId);
+        final ResultSetMetaData resultSetMetaData = djr.getResultSetMetaData();
         if (resultSetMetaData == null) {
             return null;
         }
         ResultDirectoryRecord[] records = resultSetMetaData.getRecords();
-
         return Arrays.equals(records, knownRecords) ? null : records;
     }

@@ -255,7 +252,7 @@
         for (JobId jId : getJobIds()) {
             pw.print(jId.toString());
             pw.print(" - ");
-            pw.println(String.valueOf(getResultJobRecord(jId)));
+            pw.println(getResultJobRecord(jId));
         }
         pw.flush();
         return pw;
diff --git 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/ReplicateOperatorTest.java
 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/ReplicateOperatorTest.java
deleted file mode 100644
index 0d4d0e8..0000000
--- 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/ReplicateOperatorTest.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.tests.integration;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileReader;
-import java.io.IOException;
-
-import org.apache.hyracks.api.constraints.PartitionConstraintHelper;
-import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
-import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.io.FileSplit;
-import org.apache.hyracks.api.io.ManagedFileSplit;
-import org.apache.hyracks.api.job.JobSpecification;
-import org.apache.hyracks.api.result.ResultSetId;
-import 
org.apache.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
-import org.apache.hyracks.dataflow.common.data.parsers.IValueParserFactory;
-import org.apache.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
-import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
-import org.apache.hyracks.dataflow.std.file.ConstantFileSplitProvider;
-import org.apache.hyracks.dataflow.std.file.DelimitedDataTupleParserFactory;
-import org.apache.hyracks.dataflow.std.file.FileScanOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.misc.ReplicateOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.result.ResultWriterOperatorDescriptor;
-import org.apache.hyracks.tests.util.ResultSerializerFactoryProvider;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class ReplicateOperatorTest extends AbstractIntegrationTest {
-
-    public void compareFiles(String fileNameA, String fileNameB) throws 
IOException {
-        BufferedReader fileA = new BufferedReader(new FileReader(fileNameA));
-        BufferedReader fileB = new BufferedReader(new FileReader(fileNameB));
-
-        String lineA, lineB;
-        while ((lineA = fileA.readLine()) != null) {
-            lineB = fileB.readLine();
-            Assert.assertEquals(lineA, lineB);
-        }
-        Assert.assertNull(fileB.readLine());
-        fileA.close();
-        fileB.close();
-    }
-
-    @Test
-    public void test() throws Exception {
-        final int outputArity = 2;
-
-        JobSpecification spec = new JobSpecification();
-
-        String inputFileName = "data" + File.separator + "nc1" + 
File.separator + "words.txt";
-        File[] outputFile = new File[outputArity];
-        for (int i = 0; i < outputArity; i++) {
-            outputFile[i] = File.createTempFile("replicateop", null);
-            outputFile[i].deleteOnExit();
-        }
-
-        FileSplit[] inputSplits = new FileSplit[] { new 
ManagedFileSplit(NC1_ID, inputFileName) };
-
-        String[] locations = new String[] { NC1_ID };
-
-        DelimitedDataTupleParserFactory stringParser = new 
DelimitedDataTupleParserFactory(
-                new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE 
}, '\u0000');
-        RecordDescriptor stringRec =
-                new RecordDescriptor(new ISerializerDeserializer[] { new 
UTF8StringSerializerDeserializer(), });
-
-        FileScanOperatorDescriptor scanOp = new 
FileScanOperatorDescriptor(spec,
-                new ConstantFileSplitProvider(inputSplits), stringParser, 
stringRec);
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, scanOp, 
locations);
-
-        ReplicateOperatorDescriptor replicateOp = new 
ReplicateOperatorDescriptor(spec, stringRec, outputArity);
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, 
replicateOp, locations);
-
-        IOperatorDescriptor outputOp[] = new 
IOperatorDescriptor[outputFile.length];
-        for (int i = 0; i < outputArity; i++) {
-            ResultSetId rsId = new ResultSetId(i);
-            spec.addResultSetId(rsId);
-
-            outputOp[i] = new ResultWriterOperatorDescriptor(spec, rsId, null, 
false,
-                    
ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 
1);
-            PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, 
outputOp[i], locations);
-        }
-
-        spec.connect(new OneToOneConnectorDescriptor(spec), scanOp, 0, 
replicateOp, 0);
-        for (int i = 0; i < outputArity; i++) {
-            spec.connect(new OneToOneConnectorDescriptor(spec), replicateOp, 
i, outputOp[i], 0);
-        }
-
-        for (int i = 0; i < outputArity; i++) {
-            spec.addRoot(outputOp[i]);
-        }
-        String[] expectedResultsFileNames = new String[outputArity];
-        for (int i = 0; i < outputArity; i++) {
-            expectedResultsFileNames[i] = "data" + File.separator + "device0" 
+ File.separator + inputFileName;
-        }
-        runTestAndCompareResults(spec, expectedResultsFileNames);
-    }
-}

--
To view, visit https://asterix-gerrit.ics.uci.edu/3432
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-MessageType: merged
Gerrit-Change-Id: I37816efc92ee9f5e66f29ce74dec4c6c5bd07c6f
Gerrit-Change-Number: 3432
Gerrit-PatchSet: 3
Gerrit-Owner: Murtadha Hubail <mhub...@apache.org>
Gerrit-Reviewer: Anon. E. Moose (1000171)
Gerrit-Reviewer: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Murtadha Hubail <mhub...@apache.org>
Gerrit-Reviewer: Till Westmann <ti...@apache.org>

Reply via email to