Murtadha Hubail has submitted this change and it was merged. ( https://asterix-gerrit.ics.uci.edu/3432 )
Change subject: [NO ISSUE][OTH] Simplify ResultJobRecord APIs ...................................................................... [NO ISSUE][OTH] Simplify ResultJobRecord APIs - user model changes: no - storage format changes: no - interface changes: no Details: - Simplify ResultJobRecord APIs by allowing only a single ResultSetId per ResultJobRecord (i.e. per job). - Fail result partition registration when a job attempts to use multiple ResultSetIds or inconsistent number of partitions and log the inconsistency. - Delete test ReplicateOperatorTest which duplicates the test PushRuntimeTest#scanReplicateWrite. Change-Id: I37816efc92ee9f5e66f29ce74dec4c6c5bd07c6f Reviewed-on: https://asterix-gerrit.ics.uci.edu/3432 Contrib: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Reviewed-by: Murtadha Hubail <mhub...@apache.org> Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Reviewed-by: Till Westmann <ti...@apache.org> --- M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultJobRecord.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java D hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/ReplicateOperatorTest.java 3 files changed, 37 insertions(+), 155 deletions(-) Approvals: Jenkins: Verified; ; Verified Anon. E. Moose (1000171): Murtadha Hubail: Looks good to me, but someone else must approve Till Westmann: Looks good to me, approved Objections: Jenkins: Violations found diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultJobRecord.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultJobRecord.java index b3b0706..02762ee 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultJobRecord.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/result/ResultJobRecord.java @@ -21,12 +21,12 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; -import java.util.HashMap; import java.util.List; -import java.util.Map; import org.apache.hyracks.api.exceptions.ErrorCode; import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; public class ResultJobRecord implements IResultStateRecord { @@ -77,15 +77,13 @@ } private static final long serialVersionUID = 1L; - + private static final Logger LOGGER = LogManager.getLogger(); private final long timestamp; private long jobStartTime; private long jobEndTime; private Status status; - + private ResultSetId rsId; private ResultSetMetaData resultSetMetaData; - - private Map<ResultSetId, ResultSetMetaData> resultSetMetadataMap = new HashMap<>(); public ResultJobRecord() { this.timestamp = System.nanoTime(); @@ -116,10 +114,6 @@ updateState(State.SUCCESS); } - public void fail(ResultSetId rsId, int partition) { - getOrCreateDirectoryRecord(rsId, partition).fail(); - } - public void fail(List<Exception> exceptions) { updateState(State.FAILED); status.setExceptions(exceptions); @@ -139,47 +133,40 @@ StringBuilder sb = new StringBuilder(); sb.append("{ \"status\": ").append(status.toString()).append(", "); sb.append("\"timestamp\": ").append(timestamp).append(", "); - sb.append("\"resultsets\": ").append(Arrays.toString(resultSetMetadataMap.entrySet().toArray())).append(" }"); + sb.append("\"resultset\": ").append(resultSetMetaData).append(" }"); return sb.toString(); } public synchronized void setResultSetMetaData(ResultSetId rsId, IResultMetadata metadata, int nPartitions) throws HyracksDataException { - ResultSetMetaData rsMd = resultSetMetadataMap.get(rsId); - if (rsMd == null) { - final ResultSetMetaData resultSetMetaData = new ResultSetMetaData(nPartitions, metadata); - resultSetMetadataMap.put(rsId, resultSetMetaData); - this.resultSetMetaData = resultSetMetaData; - } else if (rsMd.getRecords().length != nPartitions) { - throw HyracksDataException.create(ErrorCode.INCONSISTENT_RESULT_METADATA, rsId.toString()); + if (this.rsId == null) { + this.rsId = rsId; + this.resultSetMetaData = new ResultSetMetaData(nPartitions, metadata); + } else if (!this.rsId.equals(rsId) || resultSetMetaData.getRecords().length != nPartitions) { + logInconsistentMetadata(rsId, nPartitions); + throw HyracksDataException.create(ErrorCode.INCONSISTENT_RESULT_METADATA, this.rsId.toString()); } - //TODO(tillw) throwing a HyracksDataException here hangs the execution tests } - public ResultSetMetaData getResultSetMetaData(ResultSetId rsId) { - return resultSetMetadataMap.get(rsId); - } - - public synchronized ResultDirectoryRecord getOrCreateDirectoryRecord(ResultSetId rsId, int partition) { - ResultDirectoryRecord[] records = getResultSetMetaData(rsId).getRecords(); + public synchronized ResultDirectoryRecord getOrCreateDirectoryRecord(int partition) { + ResultDirectoryRecord[] records = resultSetMetaData.getRecords(); if (records[partition] == null) { records[partition] = new ResultDirectoryRecord(); } return records[partition]; } - public synchronized ResultDirectoryRecord getDirectoryRecord(ResultSetId rsId, int partition) - throws HyracksDataException { - ResultDirectoryRecord[] records = getResultSetMetaData(rsId).getRecords(); + public synchronized ResultDirectoryRecord getDirectoryRecord(int partition) throws HyracksDataException { + ResultDirectoryRecord[] records = resultSetMetaData.getRecords(); if (records[partition] == null) { throw HyracksDataException.create(ErrorCode.RESULT_NO_RECORD, partition, rsId); } return records[partition]; } - public synchronized void updateState(ResultSetId rsId) { + public synchronized void updateState() { int successCount = 0; - ResultDirectoryRecord[] records = getResultSetMetaData(rsId).getRecords(); + ResultDirectoryRecord[] records = resultSetMetaData.getRecords(); for (ResultDirectoryRecord record : records) { if ((record != null) && (record.getStatus() == ResultDirectoryRecord.Status.SUCCESS)) { successCount++; @@ -193,4 +180,18 @@ public synchronized ResultSetMetaData getResultSetMetaData() { return resultSetMetaData; } + + private void logInconsistentMetadata(ResultSetId rsId, int nPartitions) { + if (LOGGER.isWarnEnabled()) { + LOGGER.warn("inconsistent result metadata for result set {}", this.rsId); + if (!this.rsId.equals(rsId)) { + LOGGER.warn("inconsistent result set id. Current {}, new {}", this.rsId, rsId); + } + final int expectedPartitions = resultSetMetaData.getRecords().length; + if (expectedPartitions != nPartitions) { + LOGGER.warn("inconsistent result set number of partitions. Current {}, new {}", expectedPartitions, + nPartitions); + } + } + } } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java index a2218e2..bfecc48 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java @@ -121,7 +121,7 @@ throws HyracksDataException { ResultJobRecord djr = getNonNullResultJobRecord(jobId); djr.setResultSetMetaData(rsId, metadata, nPartitions); - ResultDirectoryRecord record = djr.getOrCreateDirectoryRecord(rsId, partition); + ResultDirectoryRecord record = djr.getOrCreateDirectoryRecord(partition); record.setNetworkAddress(networkAddress); record.setEmpty(emptyResult); @@ -147,8 +147,8 @@ public synchronized void reportResultPartitionWriteCompletion(JobId jobId, ResultSetId rsId, int partition) throws HyracksDataException { ResultJobRecord djr = getNonNullResultJobRecord(jobId); - djr.getDirectoryRecord(rsId, partition).writeEOS(); - djr.updateState(rsId); + djr.getDirectoryRecord(partition).writeEOS(); + djr.updateState(); notifyAll(); } @@ -178,7 +178,7 @@ @Override public synchronized IResultMetadata getResultMetadata(JobId jobId, ResultSetId rsId) throws HyracksDataException { - return getNonNullResultJobRecord(jobId).getResultSetMetaData(rsId).getMetadata(); + return getNonNullResultJobRecord(jobId).getResultSetMetaData().getMetadata(); } @Override @@ -228,7 +228,6 @@ private ResultDirectoryRecord[] updatedRecords(JobId jobId, ResultSetId rsId, ResultDirectoryRecord[] knownRecords) throws HyracksDataException { ResultJobRecord djr = getNonNullResultJobRecord(jobId); - if (djr.getStatus().getState() == State.FAILED) { List<Exception> caughtExceptions = djr.getStatus().getExceptions(); if (caughtExceptions != null && !caughtExceptions.isEmpty()) { @@ -241,13 +240,11 @@ throw HyracksDataException.create(ErrorCode.RESULT_FAILURE_NO_EXCEPTION, rsId, jobId); } } - - final ResultSetMetaData resultSetMetaData = djr.getResultSetMetaData(rsId); + final ResultSetMetaData resultSetMetaData = djr.getResultSetMetaData(); if (resultSetMetaData == null) { return null; } ResultDirectoryRecord[] records = resultSetMetaData.getRecords(); - return Arrays.equals(records, knownRecords) ? null : records; } @@ -255,7 +252,7 @@ for (JobId jId : getJobIds()) { pw.print(jId.toString()); pw.print(" - "); - pw.println(String.valueOf(getResultJobRecord(jId))); + pw.println(getResultJobRecord(jId)); } pw.flush(); return pw; diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/ReplicateOperatorTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/ReplicateOperatorTest.java deleted file mode 100644 index 0d4d0e8..0000000 --- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/ReplicateOperatorTest.java +++ /dev/null @@ -1,116 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.hyracks.tests.integration; - -import java.io.BufferedReader; -import java.io.File; -import java.io.FileReader; -import java.io.IOException; - -import org.apache.hyracks.api.constraints.PartitionConstraintHelper; -import org.apache.hyracks.api.dataflow.IOperatorDescriptor; -import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer; -import org.apache.hyracks.api.dataflow.value.RecordDescriptor; -import org.apache.hyracks.api.io.FileSplit; -import org.apache.hyracks.api.io.ManagedFileSplit; -import org.apache.hyracks.api.job.JobSpecification; -import org.apache.hyracks.api.result.ResultSetId; -import org.apache.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer; -import org.apache.hyracks.dataflow.common.data.parsers.IValueParserFactory; -import org.apache.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory; -import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor; -import org.apache.hyracks.dataflow.std.file.ConstantFileSplitProvider; -import org.apache.hyracks.dataflow.std.file.DelimitedDataTupleParserFactory; -import org.apache.hyracks.dataflow.std.file.FileScanOperatorDescriptor; -import org.apache.hyracks.dataflow.std.misc.ReplicateOperatorDescriptor; -import org.apache.hyracks.dataflow.std.result.ResultWriterOperatorDescriptor; -import org.apache.hyracks.tests.util.ResultSerializerFactoryProvider; -import org.junit.Assert; -import org.junit.Test; - -public class ReplicateOperatorTest extends AbstractIntegrationTest { - - public void compareFiles(String fileNameA, String fileNameB) throws IOException { - BufferedReader fileA = new BufferedReader(new FileReader(fileNameA)); - BufferedReader fileB = new BufferedReader(new FileReader(fileNameB)); - - String lineA, lineB; - while ((lineA = fileA.readLine()) != null) { - lineB = fileB.readLine(); - Assert.assertEquals(lineA, lineB); - } - Assert.assertNull(fileB.readLine()); - fileA.close(); - fileB.close(); - } - - @Test - public void test() throws Exception { - final int outputArity = 2; - - JobSpecification spec = new JobSpecification(); - - String inputFileName = "data" + File.separator + "nc1" + File.separator + "words.txt"; - File[] outputFile = new File[outputArity]; - for (int i = 0; i < outputArity; i++) { - outputFile[i] = File.createTempFile("replicateop", null); - outputFile[i].deleteOnExit(); - } - - FileSplit[] inputSplits = new FileSplit[] { new ManagedFileSplit(NC1_ID, inputFileName) }; - - String[] locations = new String[] { NC1_ID }; - - DelimitedDataTupleParserFactory stringParser = new DelimitedDataTupleParserFactory( - new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE }, '\u0000'); - RecordDescriptor stringRec = - new RecordDescriptor(new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer(), }); - - FileScanOperatorDescriptor scanOp = new FileScanOperatorDescriptor(spec, - new ConstantFileSplitProvider(inputSplits), stringParser, stringRec); - PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, scanOp, locations); - - ReplicateOperatorDescriptor replicateOp = new ReplicateOperatorDescriptor(spec, stringRec, outputArity); - PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, replicateOp, locations); - - IOperatorDescriptor outputOp[] = new IOperatorDescriptor[outputFile.length]; - for (int i = 0; i < outputArity; i++) { - ResultSetId rsId = new ResultSetId(i); - spec.addResultSetId(rsId); - - outputOp[i] = new ResultWriterOperatorDescriptor(spec, rsId, null, false, - ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider(), 1); - PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, outputOp[i], locations); - } - - spec.connect(new OneToOneConnectorDescriptor(spec), scanOp, 0, replicateOp, 0); - for (int i = 0; i < outputArity; i++) { - spec.connect(new OneToOneConnectorDescriptor(spec), replicateOp, i, outputOp[i], 0); - } - - for (int i = 0; i < outputArity; i++) { - spec.addRoot(outputOp[i]); - } - String[] expectedResultsFileNames = new String[outputArity]; - for (int i = 0; i < outputArity; i++) { - expectedResultsFileNames[i] = "data" + File.separator + "device0" + File.separator + inputFileName; - } - runTestAndCompareResults(spec, expectedResultsFileNames); - } -} -- To view, visit https://asterix-gerrit.ics.uci.edu/3432 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-MessageType: merged Gerrit-Change-Id: I37816efc92ee9f5e66f29ce74dec4c6c5bd07c6f Gerrit-Change-Number: 3432 Gerrit-PatchSet: 3 Gerrit-Owner: Murtadha Hubail <mhub...@apache.org> Gerrit-Reviewer: Anon. E. Moose (1000171) Gerrit-Reviewer: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Gerrit-Reviewer: Murtadha Hubail <mhub...@apache.org> Gerrit-Reviewer: Till Westmann <ti...@apache.org>