http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8d284433/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/PrintDateDescriptor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/PrintDateDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/PrintDateDescriptor.java index 1ddb7e8..2879576 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/PrintDateDescriptor.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/PrintDateDescriptor.java @@ -23,14 +23,14 @@ import java.io.IOException; import org.apache.asterix.dataflow.data.nontagged.serde.ADateSerializerDeserializer; import org.apache.asterix.om.base.temporal.DateTimeFormatUtils; -import org.apache.asterix.om.base.temporal.GregorianCalendarSystem; import org.apache.asterix.om.base.temporal.DateTimeFormatUtils.DateTimeParseMode; -import org.apache.asterix.runtime.exceptions.TypeMismatchException; +import org.apache.asterix.om.base.temporal.GregorianCalendarSystem; import org.apache.asterix.om.functions.BuiltinFunctions; import org.apache.asterix.om.functions.IFunctionDescriptor; import org.apache.asterix.om.functions.IFunctionDescriptorFactory; import org.apache.asterix.om.types.ATypeTag; import org.apache.asterix.runtime.evaluators.base.AbstractScalarFunctionDynamicDescriptor; +import org.apache.asterix.runtime.exceptions.TypeMismatchException; import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier; import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator; import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; @@ -45,8 +45,6 @@ import org.apache.hyracks.util.string.UTF8StringWriter; public class PrintDateDescriptor extends AbstractScalarFunctionDynamicDescriptor { private static final long serialVersionUID = 1L; - public final static FunctionIdentifier FID = BuiltinFunctions.PRINT_DATE; - private final static DateTimeFormatUtils DT_UTILS = DateTimeFormatUtils.getInstance(); public final static IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() { @@ -76,6 +74,8 @@ public class PrintDateDescriptor extends AbstractScalarFunctionDynamicDescriptor private StringBuilder sbder = new StringBuilder(); private final UTF8StringWriter utf8Writer = new UTF8StringWriter(); + private final DateTimeFormatUtils util = DateTimeFormatUtils.getInstance(); + @Override public void evaluate(IFrameTupleReference tuple, IPointable result) throws HyracksDataException { resultStorage.reset(); @@ -101,11 +101,11 @@ public class PrintDateDescriptor extends AbstractScalarFunctionDynamicDescriptor int formatLength = UTF8StringUtil.getUTFLength(bytes1, offset1 + 1); int offset = UTF8StringUtil.getNumBytesToStoreLength(formatLength); sbder.delete(0, sbder.length()); - DT_UTILS.printDateTime(chronon, 0, bytes1, offset1 + 1 + offset, formatLength, sbder, + util.printDateTime(chronon, 0, bytes1, offset1 + 1 + offset, formatLength, sbder, DateTimeParseMode.DATE_ONLY); out.writeByte(ATypeTag.SERIALIZED_STRING_TYPE_TAG); - utf8Writer.writeUTF8(sbder.toString(), out); + utf8Writer.writeUTF8(sbder, out); } catch (IOException ex) { throw new HyracksDataException(ex); } @@ -113,7 +113,6 @@ public class PrintDateDescriptor extends AbstractScalarFunctionDynamicDescriptor } }; } - }; } @@ -122,7 +121,6 @@ public class PrintDateDescriptor extends AbstractScalarFunctionDynamicDescriptor */ @Override public FunctionIdentifier getIdentifier() { - return FID; + return BuiltinFunctions.PRINT_DATE; } - }
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8d284433/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/PrintDateTimeDescriptor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/PrintDateTimeDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/PrintDateTimeDescriptor.java index b9f6e50..6f22bf1 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/PrintDateTimeDescriptor.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/PrintDateTimeDescriptor.java @@ -24,12 +24,12 @@ import java.io.IOException; import org.apache.asterix.dataflow.data.nontagged.serde.ADateTimeSerializerDeserializer; import org.apache.asterix.om.base.temporal.DateTimeFormatUtils; import org.apache.asterix.om.base.temporal.DateTimeFormatUtils.DateTimeParseMode; -import org.apache.asterix.runtime.exceptions.TypeMismatchException; import org.apache.asterix.om.functions.BuiltinFunctions; import org.apache.asterix.om.functions.IFunctionDescriptor; import org.apache.asterix.om.functions.IFunctionDescriptorFactory; import org.apache.asterix.om.types.ATypeTag; import org.apache.asterix.runtime.evaluators.base.AbstractScalarFunctionDynamicDescriptor; +import org.apache.asterix.runtime.exceptions.TypeMismatchException; import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier; import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator; import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; @@ -44,8 +44,6 @@ import org.apache.hyracks.util.string.UTF8StringWriter; public class PrintDateTimeDescriptor extends AbstractScalarFunctionDynamicDescriptor { private static final long serialVersionUID = 1L; - public final static FunctionIdentifier FID = BuiltinFunctions.PRINT_DATETIME; - private final static DateTimeFormatUtils DT_UTILS = DateTimeFormatUtils.getInstance(); public final static IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() { @@ -75,6 +73,8 @@ public class PrintDateTimeDescriptor extends AbstractScalarFunctionDynamicDescri private UTF8StringWriter utf8Writer = new UTF8StringWriter(); private UTF8StringPointable utf8Ptr = new UTF8StringPointable(); + private final DateTimeFormatUtils util = DateTimeFormatUtils.getInstance(); + @Override public void evaluate(IFrameTupleReference tuple, IPointable result) throws HyracksDataException { resultStorage.reset(); @@ -100,11 +100,11 @@ public class PrintDateTimeDescriptor extends AbstractScalarFunctionDynamicDescri utf8Ptr.set(bytes1, offset1 + 1, len1 - 1); int formatLength = utf8Ptr.getUTF8Length(); sbder.delete(0, sbder.length()); - DT_UTILS.printDateTime(chronon, 0, utf8Ptr.getByteArray(), utf8Ptr.getCharStartOffset(), + util.printDateTime(chronon, 0, utf8Ptr.getByteArray(), utf8Ptr.getCharStartOffset(), formatLength, sbder, DateTimeParseMode.DATETIME); out.writeByte(ATypeTag.SERIALIZED_STRING_TYPE_TAG); - utf8Writer.writeUTF8(sbder.toString(), out); + utf8Writer.writeUTF8(sbder, out); } catch (IOException ex) { throw new HyracksDataException(ex); } @@ -112,7 +112,6 @@ public class PrintDateTimeDescriptor extends AbstractScalarFunctionDynamicDescri } }; } - }; } @@ -121,7 +120,6 @@ public class PrintDateTimeDescriptor extends AbstractScalarFunctionDynamicDescri */ @Override public FunctionIdentifier getIdentifier() { - return FID; + return BuiltinFunctions.PRINT_DATETIME; } - } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8d284433/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/PrintTimeDescriptor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/PrintTimeDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/PrintTimeDescriptor.java index 51c9dcd..0e7004f 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/PrintTimeDescriptor.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/PrintTimeDescriptor.java @@ -24,12 +24,12 @@ import java.io.IOException; import org.apache.asterix.dataflow.data.nontagged.serde.ATimeSerializerDeserializer; import org.apache.asterix.om.base.temporal.DateTimeFormatUtils; import org.apache.asterix.om.base.temporal.DateTimeFormatUtils.DateTimeParseMode; -import org.apache.asterix.runtime.exceptions.TypeMismatchException; import org.apache.asterix.om.functions.BuiltinFunctions; import org.apache.asterix.om.functions.IFunctionDescriptor; import org.apache.asterix.om.functions.IFunctionDescriptorFactory; import org.apache.asterix.om.types.ATypeTag; import org.apache.asterix.runtime.evaluators.base.AbstractScalarFunctionDynamicDescriptor; +import org.apache.asterix.runtime.exceptions.TypeMismatchException; import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier; import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator; import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; @@ -44,8 +44,6 @@ import org.apache.hyracks.util.string.UTF8StringWriter; public class PrintTimeDescriptor extends AbstractScalarFunctionDynamicDescriptor { private static final long serialVersionUID = 1L; - public final static FunctionIdentifier FID = BuiltinFunctions.PRINT_TIME; - private final static DateTimeFormatUtils DT_UTILS = DateTimeFormatUtils.getInstance(); public final static IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() { @@ -74,6 +72,8 @@ public class PrintTimeDescriptor extends AbstractScalarFunctionDynamicDescriptor private final UTF8StringWriter writer = new UTF8StringWriter(); private final UTF8StringPointable utf8Ptr = new UTF8StringPointable(); + private final DateTimeFormatUtils util = DateTimeFormatUtils.getInstance(); + @Override public void evaluate(IFrameTupleReference tuple, IPointable result) throws HyracksDataException { resultStorage.reset(); @@ -100,11 +100,11 @@ public class PrintTimeDescriptor extends AbstractScalarFunctionDynamicDescriptor utf8Ptr.set(bytes1, offset1 + 1, len1 - 1); int formatLength = utf8Ptr.getUTF8Length(); sbder.delete(0, sbder.length()); - DT_UTILS.printDateTime(chronon, 0, utf8Ptr.getByteArray(), utf8Ptr.getCharStartOffset(), + util.printDateTime(chronon, 0, utf8Ptr.getByteArray(), utf8Ptr.getCharStartOffset(), formatLength, sbder, DateTimeParseMode.TIME_ONLY); out.writeByte(ATypeTag.SERIALIZED_STRING_TYPE_TAG); - writer.writeUTF8(sbder.toString(), out); + writer.writeUTF8(sbder, out); } catch (IOException ex) { throw new HyracksDataException(ex); } @@ -112,7 +112,6 @@ public class PrintTimeDescriptor extends AbstractScalarFunctionDynamicDescriptor } }; } - }; } @@ -121,7 +120,6 @@ public class PrintTimeDescriptor extends AbstractScalarFunctionDynamicDescriptor */ @Override public FunctionIdentifier getIdentifier() { - return FID; + return BuiltinFunctions.PRINT_TIME; } - } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8d284433/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksJobletContext.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksJobletContext.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksJobletContext.java index 1100335..7a59926 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksJobletContext.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksJobletContext.java @@ -31,6 +31,8 @@ public interface IHyracksJobletContext extends IWorkspaceFileFactory, IDeallocat JobId getJobId(); + long getJobStartTime(); + ICounterContext getCounterContext(); Object getGlobalJobData(); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8d284433/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java index ac06344..5245571 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java @@ -518,7 +518,8 @@ public class JobExecutor { byte[] jagBytes = changed ? acgBytes : null; node.getNodeController().startTasks(deploymentId, jobId, jagBytes, taskDescriptors, connectorPolicies, jobRun.getFlags(), - ccs.createOrGetJobParameterByteStore(jobId).getParameterMap(), deployedJobSpecId); + ccs.createOrGetJobParameterByteStore(jobId).getParameterMap(), deployedJobSpecId, + jobRun.getStartTime()); } } } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8d284433/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java index 9ec55f4..92764a7 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java @@ -38,7 +38,7 @@ import org.apache.hyracks.control.common.job.TaskAttemptDescriptor; public interface INodeController { void startTasks(DeploymentId deploymentId, JobId jobId, byte[] planBytes, List<TaskAttemptDescriptor> taskDescriptors, Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies, - Set<JobFlag> flags, Map<byte[], byte[]> jobParameters, DeployedJobSpecId deployedJobSpecId) + Set<JobFlag> flags, Map<byte[], byte[]> jobParameters, DeployedJobSpecId deployedJobSpecId, long startTime) throws Exception; void abortTasks(JobId jobId, List<TaskAttemptId> tasks) throws Exception; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8d284433/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java index c10c8981..8e02936 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java @@ -764,11 +764,12 @@ public class CCNCFunctions { private final Set<JobFlag> flags; private final Map<byte[], byte[]> jobParameters; private final DeployedJobSpecId deployedJobSpecId; + private final long jobStartTime; public StartTasksFunction(DeploymentId deploymentId, JobId jobId, byte[] planBytes, List<TaskAttemptDescriptor> taskDescriptors, Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies, Set<JobFlag> flags, - Map<byte[], byte[]> jobParameters, DeployedJobSpecId deployedJobSpecId) { + Map<byte[], byte[]> jobParameters, DeployedJobSpecId deployedJobSpecId, long jobStartTime) { this.deploymentId = deploymentId; this.jobId = jobId; this.planBytes = planBytes; @@ -777,6 +778,7 @@ public class CCNCFunctions { this.flags = flags; this.jobParameters = jobParameters; this.deployedJobSpecId = deployedJobSpecId; + this.jobStartTime = jobStartTime; } @Override @@ -816,6 +818,10 @@ public class CCNCFunctions { return flags; } + public long getJobStartTime() { + return jobStartTime; + } + public static Object deserialize(ByteBuffer buffer, int length) throws Exception { ByteArrayInputStream bais = new ByteArrayInputStream(buffer.array(), buffer.position(), length); DataInputStream dis = new DataInputStream(bais); @@ -885,8 +891,10 @@ public class CCNCFunctions { deployedJobSpecId = DeployedJobSpecId.create(dis); } + long jobStartTime = dis.readLong(); + return new StartTasksFunction(deploymentId, jobId, planBytes, taskDescriptors, connectorPolicies, flags, - jobParameters, deployedJobSpecId); + jobParameters, deployedJobSpecId, jobStartTime); } public static void serialize(OutputStream out, Object object) throws Exception { @@ -935,11 +943,13 @@ public class CCNCFunctions { } //write deployed job spec id - dos.writeBoolean(fn.getDeployedJobSpecId() == null ? false : true); + dos.writeBoolean(fn.getDeployedJobSpecId() != null); if (fn.getDeployedJobSpecId() != null) { fn.getDeployedJobSpecId().writeFields(dos); } + //write job start time + dos.writeLong(fn.jobStartTime); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8d284433/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java index 429cb26..b78e53f 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java @@ -63,10 +63,10 @@ public class NodeControllerRemoteProxy implements INodeController { @Override public void startTasks(DeploymentId deploymentId, JobId jobId, byte[] planBytes, List<TaskAttemptDescriptor> taskDescriptors, Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies, - Set<JobFlag> flags, Map<byte[], byte[]> jobParameters, DeployedJobSpecId deployedJobSpecId) - throws Exception { + Set<JobFlag> flags, Map<byte[], byte[]> jobParameters, DeployedJobSpecId deployedJobSpecId, + long jobStartTime) throws Exception { StartTasksFunction stf = new StartTasksFunction(deploymentId, jobId, planBytes, taskDescriptors, - connectorPolicies, flags, jobParameters, deployedJobSpecId); + connectorPolicies, flags, jobParameters, deployedJobSpecId, jobStartTime); ipcHandle.send(-1, stf, null); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8d284433/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java index 8790434..55bc192 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java @@ -103,9 +103,11 @@ public class Joblet implements IHyracksJobletContext, ICounterContext { private final IJobletEventListenerFactory jobletEventListenerFactory; + private final long jobStartTime; + public Joblet(NodeControllerService nodeController, DeploymentId deploymentId, JobId jobId, INCServiceContext serviceCtx, ActivityClusterGraph acg, - IJobletEventListenerFactory jobletEventListenerFactory) { + IJobletEventListenerFactory jobletEventListenerFactory, long jobStartTime) { this.nodeController = nodeController; this.serviceCtx = serviceCtx; this.deploymentId = deploymentId; @@ -131,6 +133,7 @@ public class Joblet implements IHyracksJobletContext, ICounterContext { } IGlobalJobDataFactory gjdf = acg.getGlobalJobDataFactory(); globalJobData = gjdf != null ? gjdf.createGlobalJobData(this) : null; + this.jobStartTime = jobStartTime; } @Override @@ -151,6 +154,11 @@ public class Joblet implements IHyracksJobletContext, ICounterContext { return env; } + @Override + public long getJobStartTime() { + return jobStartTime; + } + public void addTask(Task task) { taskMap.put(task.getTaskAttemptId(), task); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8d284433/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java index f55e250..735f7cf 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java @@ -65,7 +65,7 @@ final class NodeControllerIPCI implements IIPCI { ncs.getWorkQueue() .schedule(new StartTasksWork(ncs, stf.getDeploymentId(), stf.getJobId(), stf.getPlanBytes(), stf.getTaskDescriptors(), stf.getConnectorPolicies(), stf.getFlags(), - stf.getJobParameters(), stf.getDeployedJobSpecId())); + stf.getJobParameters(), stf.getDeployedJobSpecId(), stf.getJobStartTime())); return; case ABORT_TASKS: CCNCFunctions.AbortTasksFunction atf = (CCNCFunctions.AbortTasksFunction) fn; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8d284433/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java index 6a5785a..660621e 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java @@ -95,10 +95,12 @@ public class StartTasksWork extends AbstractWork { private final Map<byte[], byte[]> jobParameters; + private final long jobStartTime; + public StartTasksWork(NodeControllerService ncs, DeploymentId deploymentId, JobId jobId, byte[] acgBytes, List<TaskAttemptDescriptor> taskDescriptors, Map<ConnectorDescriptorId, IConnectorPolicy> connectorPoliciesMap, Set<JobFlag> flags, - Map<byte[], byte[]> jobParameters, DeployedJobSpecId deployedJobSpecId) { + Map<byte[], byte[]> jobParameters, DeployedJobSpecId deployedJobSpecId, long jobStartTime) { this.ncs = ncs; this.deploymentId = deploymentId; this.jobId = jobId; @@ -108,6 +110,7 @@ public class StartTasksWork extends AbstractWork { this.connectorPoliciesMap = connectorPoliciesMap; this.flags = flags; this.jobParameters = jobParameters; + this.jobStartTime = jobStartTime; } @Override @@ -212,7 +215,7 @@ public class StartTasksWork extends AbstractWork { } listenerFactory.updateListenerJobParameters(ncs.createOrGetJobParameterByteStore(jobId)); } - ji = new Joblet(ncs, deploymentId, jobId, appCtx, acg, listenerFactory); + ji = new Joblet(ncs, deploymentId, jobId, appCtx, acg, listenerFactory, jobStartTime); jobletMap.put(jobId, ji); } return ji; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/8d284433/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestJobletContext.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestJobletContext.java b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestJobletContext.java index 1d38d3d..b022b15 100644 --- a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestJobletContext.java +++ b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestJobletContext.java @@ -39,6 +39,7 @@ public class TestJobletContext implements IHyracksJobletContext { private final FrameManager frameManger; private JobId jobId; private WorkspaceFileFactory fileFactory; + private final long jobStartTime; public TestJobletContext(int frameSize, INCServiceContext serviceContext, JobId jobId) throws HyracksException { this.frameSize = frameSize; @@ -46,6 +47,7 @@ public class TestJobletContext implements IHyracksJobletContext { this.jobId = jobId; fileFactory = new WorkspaceFileFactory(this, (IIOManager) getIOManager()); this.frameManger = new FrameManager(frameSize); + this.jobStartTime = System.currentTimeMillis(); } ByteBuffer allocateFrame() throws HyracksDataException { @@ -113,6 +115,11 @@ public class TestJobletContext implements IHyracksJobletContext { } @Override + public long getJobStartTime() { + return jobStartTime; + } + + @Override public Object getGlobalJobData() { return null; }