pnowojski commented on a change in pull request #11403: 
[FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for 
AbstractStreamOperator
URL: https://github.com/apache/flink/pull/11403#discussion_r394848872
 
 

 ##########
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorBase.java
 ##########
 @@ -0,0 +1,488 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.KeyedStateStore;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.streaming.util.LatencyStats;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Locale;
+import java.util.Optional;
+
+/**
+ * New base class for all stream operators, replacing previous {@link 
AbstractStreamOperator}.
+ * Currently intended to work with {@link MultipleInputStreamOperator}.
+ *
+ * <p>One note-able difference in comparison to {@link AbstractStreamOperator} 
is lack of
+ * {@link AbstractStreamOperator#setup(StreamTask, StreamConfig, Output)} in 
favor of initialisation
+ * in the constructor, and removed some tight coupling with classes like 
{@link StreamTask}.
+ *
+ * <p>Methods are guaranteed not to be called concurrently.
+ *
+ * @param <OUT> The output type of the operator
+ */
+@Experimental
+public abstract class StreamOperatorBase<OUT> implements StreamOperator<OUT> {
+       /** The logger used by the operator class and its subclasses. */
+       protected static final Logger LOG = 
LoggerFactory.getLogger(StreamOperatorBase.class);
+
+       protected final StreamConfig config;
+       protected final Output<StreamRecord<OUT>> output;
+       private final StreamingRuntimeContext runtimeContext;
+       private final ExecutionConfig executionConfig;
+       private final ClassLoader userCodeClassLoader;
+       private final CloseableRegistry cancelables;
+       private final long[] inputWatermarks;
+
+       /** Metric group for the operator. */
+       protected final OperatorMetricGroup metrics;
+       protected final LatencyStats latencyStats;
+       protected final ProcessingTimeService processingTimeService;
+
+       private StreamOperatorStateHandler stateHandler;
+
+       // We keep track of watermarks from both inputs, the combined input is 
the minimum
+       // Once the minimum advances we emit a new watermark for downstream 
operators
+       private long combinedWatermark = Long.MIN_VALUE;
+
+       public StreamOperatorBase(StreamOperatorInitializer<OUT> initializer, 
int numberOfInputs) {
+               inputWatermarks = new long[numberOfInputs];
+               Arrays.fill(inputWatermarks, Long.MIN_VALUE);
+               final Environment environment = 
initializer.getContainingTask().getEnvironment();
+               config = initializer.getStreamConfig();
+               CountingOutput<OUT> countingOutput;
+               OperatorMetricGroup operatorMetricGroup;
+               try {
+                       operatorMetricGroup = 
environment.getMetricGroup().getOrAddOperator(config.getOperatorID(), 
config.getOperatorName());
+                       countingOutput = new 
CountingOutput(initializer.getOutput(), 
operatorMetricGroup.getIOMetricGroup().getNumRecordsOutCounter());
+                       if (config.isChainStart()) {
+                               
operatorMetricGroup.getIOMetricGroup().reuseInputMetricsForTask();
+                       }
+                       if (config.isChainEnd()) {
+                               
operatorMetricGroup.getIOMetricGroup().reuseOutputMetricsForTask();
+                       }
+               } catch (Exception e) {
+                       LOG.warn("An error occurred while instantiating task 
metrics.", e);
+                       countingOutput = null;
+                       operatorMetricGroup = null;
+               }
+
+               if (countingOutput == null || operatorMetricGroup == null) {
+                       metrics = 
UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup();
+                       output = initializer.getOutput();
+               }
+               else {
+                       metrics = operatorMetricGroup;
+                       output = countingOutput;
+               }
+
+               latencyStats = createLatencyStats(
+                       environment.getTaskManagerInfo().getConfiguration(),
+                       
initializer.getContainingTask().getIndexInSubtaskGroup());
+
+               processingTimeService = 
Preconditions.checkNotNull(initializer.getProcessingTimeService());
+               executionConfig = 
initializer.getContainingTask().getExecutionConfig();
+               userCodeClassLoader = 
initializer.getContainingTask().getUserCodeClassLoader();
+               cancelables = initializer.getContainingTask().getCancelables();
+
+               runtimeContext = new StreamingRuntimeContext(
+                       environment,
+                       initializer.getContainingTask().getAccumulatorMap(),
+                       operatorMetricGroup,
+                       getOperatorID(),
+                       processingTimeService,
+                       null);
+       }
+
+       private LatencyStats createLatencyStats(Configuration 
taskManagerConfig, int indexInSubtaskGroup) {
+               try {
+                       int historySize = 
taskManagerConfig.getInteger(MetricOptions.LATENCY_HISTORY_SIZE);
+                       if (historySize <= 0) {
+                               LOG.warn("{} has been set to a value equal or 
below 0: {}. Using default.", MetricOptions.LATENCY_HISTORY_SIZE, historySize);
+                               historySize = 
MetricOptions.LATENCY_HISTORY_SIZE.defaultValue();
+                       }
+
+                       final String configuredGranularity = 
taskManagerConfig.getString(MetricOptions.LATENCY_SOURCE_GRANULARITY);
+                       LatencyStats.Granularity granularity;
+                       try {
+                               granularity = 
LatencyStats.Granularity.valueOf(configuredGranularity.toUpperCase(Locale.ROOT));
+                       } catch (IllegalArgumentException iae) {
+                               granularity = LatencyStats.Granularity.OPERATOR;
+                               LOG.warn(
+                                       "Configured value {} option for {} is 
invalid. Defaulting to {}.",
+                                       configuredGranularity,
+                                       
MetricOptions.LATENCY_SOURCE_GRANULARITY.key(),
+                                       granularity);
+                       }
+                       TaskManagerJobMetricGroup jobMetricGroup = 
this.metrics.parent().parent();
+                       return new 
LatencyStats(jobMetricGroup.addGroup("latency"),
+                               historySize,
+                               indexInSubtaskGroup,
+                               getOperatorID(),
+                               granularity);
+               } catch (Exception e) {
+                       LOG.warn("An error occurred while instantiating latency 
metrics.", e);
+                       return new LatencyStats(
+                               
UnregisteredMetricGroups.createUnregisteredTaskManagerJobMetricGroup().addGroup("latency"),
+                               1,
+                               0,
+                               new OperatorID(),
+                               LatencyStats.Granularity.SINGLE);
+               }
+       }
+
+       @Override
+       public MetricGroup getMetricGroup() {
+               return metrics;
+       }
+
+       @Override
+       public final void initializeState(StreamTaskStateInitializer 
streamTaskStateManager) throws Exception {
+               final TypeSerializer<?> keySerializer = 
config.getStateKeySerializer(getUserCodeClassloader());
+
+               final StreamOperatorStateContext context =
+                       streamTaskStateManager.streamOperatorStateContext(
+                               getOperatorID(),
+                               getClass().getSimpleName(),
+                               getProcessingTimeService(),
+                               this,
+                               keySerializer,
+                               cancelables,
+                               metrics);
+
+               stateHandler = new StreamOperatorStateHandler(context, 
getExecutionConfig(), cancelables);
+               stateHandler.initializeOperatorState(this::initializeState);
+       }
+
+       /**
+        * This method is called immediately before any elements are processed, 
it should contain the
+        * operator's initialization logic, e.g. state initialization.
+        *
+        * <p>The default implementation does nothing.
+        *
+        * @throws Exception An exception in this method causes the operator to 
fail.
+        */
+       @Override
+       public void open() throws Exception {}
+
+       /**
+        * This method is called after all records have been added to the 
operators via the methods
+        * {@link OneInputStreamOperator#processElement(StreamRecord)}, or
+        * {@link TwoInputStreamOperator#processElement1(StreamRecord)} and
+        * {@link TwoInputStreamOperator#processElement2(StreamRecord)}.
+        *
+        * <p>The method is expected to flush all remaining buffered data. 
Exceptions during this flushing
+        * of buffered should be propagated, in order to cause the operation to 
be recognized asa failed,
+        * because the last data items are not processed properly.
+        *
+        * @throws Exception An exception in this method causes the operator to 
fail.
+        */
+       @Override
+       public void close() throws Exception {}
+
+       /**
+        * This method is called at the very end of the operator's life, both 
in the case of a successful
+        * completion of the operation, and in the case of a failure and 
canceling.
+        *
+        * <p>This method is expected to make a thorough effort to release all 
resources
+        * that the operator has acquired.
+        */
+       @Override
+       public void dispose() throws Exception {
+               if (stateHandler != null) {
+                       stateHandler.dispose();
+               }
+       }
+
+       @Override
+       public void prepareSnapshotPreBarrier(long checkpointId) throws 
Exception {
+               // the default implementation does nothing and accepts the 
checkpoint
+               // this is purely for subclasses to override
+       }
+
+       @Override
+       public final OperatorSnapshotFutures snapshotState(long checkpointId, 
long timestamp, CheckpointOptions checkpointOptions,
+                       CheckpointStreamFactory factory) throws Exception {
+               return stateHandler.snapshotState(
+                       this::snapshotState,
+                       getOperatorName(),
+                       checkpointId,
+                       timestamp,
+                       checkpointOptions,
+                       factory);
+       }
+
+       /**
+        * Stream operators with state, which want to participate in a snapshot 
need to override this hook method.
+        *
+        * @param context context that provides information and means required 
for taking a snapshot
+        */
+       public void snapshotState(StateSnapshotContext context) throws 
Exception {
+       }
+
+       /**
+        * Stream operators with state which can be restored need to override 
this hook method.
+        *
+        * @param context context that allows to register different states.
+        */
+       public void initializeState(StateInitializationContext context) throws 
Exception {
+
+       }
+
+       @Override
+       public void notifyCheckpointComplete(long checkpointId) throws 
Exception {
+               stateHandler.notifyCheckpointComplete(checkpointId);
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Properties and Services
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Gets the execution config defined on the execution environment of 
the job to which this
+        * operator belongs.
+        *
+        * @return The job's execution config.
+        */
+       public ExecutionConfig getExecutionConfig() {
+               return executionConfig;
+       }
+
+       public StreamConfig getOperatorConfig() {
+               return config;
+       }
+
+       public ClassLoader getUserCodeClassloader() {
+               return userCodeClassLoader;
+       }
+
+       /**
+        * Return the operator name. If the runtime context has been set, then 
the task name with
+        * subtask index is returned. Otherwise, the simple class name is 
returned.
+        *
+        * @return If runtime context is set, then return task name with 
subtask index. Otherwise return
+        *                      simple class name.
+        */
+       protected String getOperatorName() {
+               if (runtimeContext != null) {
+                       return runtimeContext.getTaskNameWithSubtasks();
+               } else {
+                       return getClass().getSimpleName();
+               }
+       }
+
+       /**
+        * Returns a context that allows the operator to query information 
about the execution and also
+        * to interact with systems such as broadcast variables and managed 
state. This also allows
+        * to register timers.
+        */
+       public StreamingRuntimeContext getRuntimeContext() {
+               return runtimeContext;
+       }
+
+       @SuppressWarnings("unchecked")
+       public <K> KeyedStateBackend<K> getKeyedStateBackend() {
+               return (KeyedStateBackend<K>) 
stateHandler.getKeyedStateBackend();
+       }
+
+       public OperatorStateBackend getOperatorStateBackend() {
+               return stateHandler.getOperatorStateBackend();
+       }
+
+       /**
+        * Returns the {@link ProcessingTimeService} responsible for getting 
the current
+        * processing time and registering timers.
+        */
+       public ProcessingTimeService getProcessingTimeService() {
 
 Review comment:
   At least for now, not really. This and other methods are being used in 
different places outside of an operator. Some just in tests, others not only. I 
would prefer to keep it as is in the first iteration using assumption, that if 
it was exposed before, there were some reasons behind it. Especially to make 
transitions from V1 to V2 as smooth as possible.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to