[GitHub] [flink] pnowojski commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator

2020-03-25 Thread GitBox
pnowojski commented on a change in pull request #11403: 
[FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for 
AbstractStreamOperator
URL: https://github.com/apache/flink/pull/11403#discussion_r397807817
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java
 ##
 @@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.KeyedStateStore;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.DefaultKeyedStateStore;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateInitializationContextImpl;
+import org.apache.flink.runtime.state.StatePartitionStreamProvider;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
+import org.apache.flink.util.CloseableIterable;
+import org.apache.flink.util.IOUtils;
+
+import org.apache.flink.shaded.guava18.com.google.common.io.Closer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Class encapsulating various state backend handling logic for {@link 
StreamOperator} implementations.
+ */
+@Internal
+public class StreamOperatorStateHandler {
+
+   protected static final Logger LOG = 
LoggerFactory.getLogger(StreamOperatorStateHandler.class);
+
+   /** Backend for keyed state. This might be empty if we're not on a 
keyed stream. */
+   @Nullable
+   private final AbstractKeyedStateBackend keyedStateBackend;
+   private final CloseableRegistry closeableRegistry;
+   @Nullable
+   private final DefaultKeyedStateStore keyedStateStore;
+   private final OperatorStateBackend operatorStateBackend;
+   private final StreamOperatorStateContext context;
+
+   public StreamOperatorStateHandler(
+   StreamOperatorStateContext context,
+   ExecutionConfig executionConfig,
+   CloseableRegistry closeableRegistry) {
+   this.context = context;
+   operatorStateBackend = context.operatorStateBackend();
+   keyedStateBackend = context.keyedStateBackend();
+   this.closeableRegistry = closeableRegistry;
+
+   if (keyedStateBackend != null) {
+   keyedStateStore = new 
DefaultKeyedStateStore(keyedStateBackend, executionConfig);
+   }
+   else {
+   keyedStateStore = null;
+   }
+   }
+
+   public void initializeOperatorState(CheckpointedStreamOperator 
streamOperator) throws Exception {
+   CloseableIterable 
keyedStateInputs = context.rawKeyedStateInputs();
+   CloseableIterable 
operatorStateInputs = context.rawOperatorStateInputs();
+
+   try {
+   StateInitializationContext initializationContext = new 
StateInitializationContextImpl(
+ 

[GitHub] [flink] pnowojski commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator

2020-03-25 Thread GitBox
pnowojski commented on a change in pull request #11403: 
[FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for 
AbstractStreamOperator
URL: https://github.com/apache/flink/pull/11403#discussion_r397806237
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java
 ##
 @@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.KeyedStateStore;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.DefaultKeyedStateStore;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateInitializationContextImpl;
+import org.apache.flink.runtime.state.StatePartitionStreamProvider;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
+import org.apache.flink.util.CloseableIterable;
+import org.apache.flink.util.IOUtils;
+
+import org.apache.flink.shaded.guava18.com.google.common.io.Closer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Class encapsulating various state backend handling logic for {@link 
StreamOperator} implementations.
+ */
+@Internal
+public class StreamOperatorStateHandler {
+
+   protected static final Logger LOG = 
LoggerFactory.getLogger(StreamOperatorStateHandler.class);
+
+   /** Backend for keyed state. This might be empty if we're not on a 
keyed stream. */
+   @Nullable
+   private final AbstractKeyedStateBackend keyedStateBackend;
+   private final CloseableRegistry closeableRegistry;
+   @Nullable
+   private final DefaultKeyedStateStore keyedStateStore;
+   private final OperatorStateBackend operatorStateBackend;
+   private final StreamOperatorStateContext context;
+
+   public StreamOperatorStateHandler(
+   StreamOperatorStateContext context,
+   ExecutionConfig executionConfig,
+   CloseableRegistry closeableRegistry) {
+   this.context = context;
+   operatorStateBackend = context.operatorStateBackend();
+   keyedStateBackend = context.keyedStateBackend();
+   this.closeableRegistry = closeableRegistry;
+
+   if (keyedStateBackend != null) {
+   keyedStateStore = new 
DefaultKeyedStateStore(keyedStateBackend, executionConfig);
+   }
+   else {
+   keyedStateStore = null;
+   }
+   }
+
+   public void initializeOperatorState(CheckpointedStreamOperator 
streamOperator) throws Exception {
+   CloseableIterable 
keyedStateInputs = context.rawKeyedStateInputs();
+   CloseableIterable 
operatorStateInputs = context.rawOperatorStateInputs();
+
+   try {
+   StateInitializationContext initializationContext = new 
StateInitializationContextImpl(
+ 

[GitHub] [flink] pnowojski commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator

2020-03-24 Thread GitBox
pnowojski commented on a change in pull request #11403: 
[FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for 
AbstractStreamOperator
URL: https://github.com/apache/flink/pull/11403#discussion_r397172292
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java
 ##
 @@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.KeyedStateStore;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.DefaultKeyedStateStore;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateInitializationContextImpl;
+import org.apache.flink.runtime.state.StatePartitionStreamProvider;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
+import org.apache.flink.util.CloseableIterable;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.function.ThrowingConsumer;
+
+import org.apache.flink.shaded.guava18.com.google.common.io.Closer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Optional;
+
+/**
+ * Class encapsulating various state backend handling logic for {@link 
StreamOperator} implementations.
+ */
+@Internal
+public class StreamOperatorStateHandler {
+
+   protected static final Logger LOG = 
LoggerFactory.getLogger(StreamOperatorStateHandler.class);
+
+   /** Backend for keyed state. This might be empty if we're not on a 
keyed stream. */
+   @Nullable
+   private final AbstractKeyedStateBackend keyedStateBackend;
+   private final CloseableRegistry closeableRegistry;
+   @Nullable
+   private final DefaultKeyedStateStore keyedStateStore;
+   private final OperatorStateBackend operatorStateBackend;
+   private final StreamOperatorStateContext context;
+
+   public StreamOperatorStateHandler(
+   StreamOperatorStateContext context,
+   ExecutionConfig executionConfig,
+   CloseableRegistry closeableRegistry) {
+   this.context = context;
+   operatorStateBackend = context.operatorStateBackend();
+   keyedStateBackend = context.keyedStateBackend();
+   this.closeableRegistry = closeableRegistry;
+
+   if (keyedStateBackend != null) {
+   keyedStateStore = new 
DefaultKeyedStateStore(keyedStateBackend, executionConfig);
+   }
+   else {
+   keyedStateStore = null;
+   }
+   }
+
+   public void 
initializeOperatorState(ThrowingConsumer 
initializeOperatorAction) throws Exception {
+   CloseableIterable 
keyedStateInputs = context.rawKeyedStateInputs();
+   CloseableIterable 
operatorStateInputs = context.rawOperatorStateInputs();
+
+   try {
+   StateInitializationContext initializationContext = new 
StateInitializationContextImpl(
+

[GitHub] [flink] pnowojski commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator

2020-03-24 Thread GitBox
pnowojski commented on a change in pull request #11403: 
[FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for 
AbstractStreamOperator
URL: https://github.com/apache/flink/pull/11403#discussion_r397107847
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java
 ##
 @@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.KeyedStateStore;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.DefaultKeyedStateStore;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateInitializationContextImpl;
+import org.apache.flink.runtime.state.StatePartitionStreamProvider;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
+import org.apache.flink.util.CloseableIterable;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.function.ThrowingConsumer;
+
+import org.apache.flink.shaded.guava18.com.google.common.io.Closer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Optional;
+
+/**
+ * Class encapsulating various state backend handling logic for {@link 
StreamOperator} implementations.
+ */
+@Internal
+public class StreamOperatorStateHandler {
+
+   protected static final Logger LOG = 
LoggerFactory.getLogger(StreamOperatorStateHandler.class);
+
+   /** Backend for keyed state. This might be empty if we're not on a 
keyed stream. */
+   @Nullable
+   private final AbstractKeyedStateBackend keyedStateBackend;
+   private final CloseableRegistry closeableRegistry;
+   @Nullable
+   private final DefaultKeyedStateStore keyedStateStore;
+   private final OperatorStateBackend operatorStateBackend;
+   private final StreamOperatorStateContext context;
+
+   public StreamOperatorStateHandler(
+   StreamOperatorStateContext context,
+   ExecutionConfig executionConfig,
+   CloseableRegistry closeableRegistry) {
+   this.context = context;
+   operatorStateBackend = context.operatorStateBackend();
+   keyedStateBackend = context.keyedStateBackend();
+   this.closeableRegistry = closeableRegistry;
+
+   if (keyedStateBackend != null) {
+   keyedStateStore = new 
DefaultKeyedStateStore(keyedStateBackend, executionConfig);
+   }
+   else {
+   keyedStateStore = null;
+   }
+   }
+
+   public void 
initializeOperatorState(ThrowingConsumer 
initializeOperatorAction) throws Exception {
+   CloseableIterable 
keyedStateInputs = context.rawKeyedStateInputs();
+   CloseableIterable 
operatorStateInputs = context.rawOperatorStateInputs();
+
+   try {
+   StateInitializationContext initializationContext = new 
StateInitializationContextImpl(
+

[GitHub] [flink] pnowojski commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator

2020-03-24 Thread GitBox
pnowojski commented on a change in pull request #11403: 
[FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for 
AbstractStreamOperator
URL: https://github.com/apache/flink/pull/11403#discussion_r397100621
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/YieldingOperatorFactory.java
 ##
 @@ -17,10 +17,13 @@
 
 package org.apache.flink.streaming.api.operators;
 
+import org.apache.flink.annotation.PublicEvolving;
+
 /**
  * An operator that needs access to the {@link MailboxExecutor} to yield to 
downstream operators needs to be created
  * through a factory implementing this interface.
  */
+@PublicEvolving
 public interface YieldingOperatorFactory extends 
StreamOperatorFactory {
 
 Review comment:
   Maybe let's keep it for now as it is? As I'm not sure if I like the idea of 
nullifying parameters based on `needsMailboxExecutor()`. I think I would prefer 
mixed pattern to that, but this probably needs some deeper thought through? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator

2020-03-24 Thread GitBox
pnowojski commented on a change in pull request #11403: 
[FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for 
AbstractStreamOperator
URL: https://github.com/apache/flink/pull/11403#discussion_r397100621
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/YieldingOperatorFactory.java
 ##
 @@ -17,10 +17,13 @@
 
 package org.apache.flink.streaming.api.operators;
 
+import org.apache.flink.annotation.PublicEvolving;
+
 /**
  * An operator that needs access to the {@link MailboxExecutor} to yield to 
downstream operators needs to be created
  * through a factory implementing this interface.
  */
+@PublicEvolving
 public interface YieldingOperatorFactory extends 
StreamOperatorFactory {
 
 Review comment:
   Maybe let's keep it for now as it is? As I'm not sure if I like the idea of 
nullifying parameters based on `needsMailboxExecutor()`. I think I would prefer 
mixed pattern to that.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator

2020-03-24 Thread GitBox
pnowojski commented on a change in pull request #11403: 
[FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for 
AbstractStreamOperator
URL: https://github.com/apache/flink/pull/11403#discussion_r397067956
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java
 ##
 @@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.KeyedStateStore;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.DefaultKeyedStateStore;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
+import org.apache.flink.runtime.state.KeyGroupsList;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream;
+import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateInitializationContextImpl;
+import org.apache.flink.runtime.state.StatePartitionStreamProvider;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.util.CloseableIterable;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.function.ThrowingConsumer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Optional;
+
+/**
+ * Class encapsulating various state backend handling logic for {@link 
StreamOperator} implementations.
+ */
+@PublicEvolving
+public class StreamOperatorStateHandler {
+
+   protected static final Logger LOG = 
LoggerFactory.getLogger(StreamOperatorStateHandler.class);
+
+   /** Backend for keyed state. This might be empty if we're not on a 
keyed stream. */
+   @Nullable
+   private final AbstractKeyedStateBackend keyedStateBackend;
+   private final CloseableRegistry closeableRegistry;
+   @Nullable
+   private final DefaultKeyedStateStore keyedStateStore;
+   private final OperatorStateBackend operatorStateBackend;
+   private final InternalTimeServiceManager timeServiceManager;
+   private final StreamOperatorStateContext context;
+
+   public StreamOperatorStateHandler(
+   StreamOperatorStateContext context,
+   ExecutionConfig executionConfig,
+   CloseableRegistry closeableRegistry) {
+   this.context = context;
+   operatorStateBackend = context.operatorStateBackend();
+   keyedStateBackend = context.keyedStateBackend();
+   this.closeableRegistry = closeableRegistry;
+
+   if (keyedStateBackend != null) {
+   keyedStateStore = new 
DefaultKeyedStateStore(keyedStateBackend, executionConfig);
+   }
+   else {
+   keyedStateStore = null;
+   }
+
+   timeServiceManager = context.internalTimerServiceManager();
+ 

[GitHub] [flink] pnowojski commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator

2020-03-23 Thread GitBox
pnowojski commented on a change in pull request #11403: 
[FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for 
AbstractStreamOperator
URL: https://github.com/apache/flink/pull/11403#discussion_r396623514
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java
 ##
 @@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.KeyedStateStore;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.DefaultKeyedStateStore;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
+import org.apache.flink.runtime.state.KeyGroupsList;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream;
+import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateInitializationContextImpl;
+import org.apache.flink.runtime.state.StatePartitionStreamProvider;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.util.CloseableIterable;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.function.ThrowingConsumer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Optional;
+
+/**
+ * Class encapsulating various state backend handling logic for {@link 
StreamOperator} implementations.
+ */
+@PublicEvolving
+public class StreamOperatorStateHandler {
+
+   protected static final Logger LOG = 
LoggerFactory.getLogger(StreamOperatorStateHandler.class);
+
+   /** Backend for keyed state. This might be empty if we're not on a 
keyed stream. */
+   @Nullable
+   private final AbstractKeyedStateBackend keyedStateBackend;
+   private final CloseableRegistry closeableRegistry;
+   @Nullable
+   private final DefaultKeyedStateStore keyedStateStore;
+   private final OperatorStateBackend operatorStateBackend;
+   private final InternalTimeServiceManager timeServiceManager;
+   private final StreamOperatorStateContext context;
+
+   public StreamOperatorStateHandler(
+   StreamOperatorStateContext context,
+   ExecutionConfig executionConfig,
+   CloseableRegistry closeableRegistry) {
+   this.context = context;
+   operatorStateBackend = context.operatorStateBackend();
+   keyedStateBackend = context.keyedStateBackend();
+   this.closeableRegistry = closeableRegistry;
+
+   if (keyedStateBackend != null) {
+   keyedStateStore = new 
DefaultKeyedStateStore(keyedStateBackend, executionConfig);
+   }
+   else {
+   keyedStateStore = null;
+   }
+
+   timeServiceManager = context.internalTimerServiceManager();
+ 

[GitHub] [flink] pnowojski commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator

2020-03-23 Thread GitBox
pnowojski commented on a change in pull request #11403: 
[FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for 
AbstractStreamOperator
URL: https://github.com/apache/flink/pull/11403#discussion_r396608259
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java
 ##
 @@ -0,0 +1,488 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.KeyedStateStore;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.streaming.util.LatencyStats;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Locale;
+import java.util.Optional;
+
+/**
+ * New base class for all stream operators, intended to eventually replace 
{@link AbstractStreamOperator}.
+ * Currently intended to work smoothly just with {@link 
MultipleInputStreamOperator}.
+ *
+ * One note-able difference in comparison to {@link AbstractStreamOperator} 
is lack of
+ * {@link AbstractStreamOperator#setup(StreamTask, StreamConfig, Output)} in 
favor of initialisation
+ * in the constructor, and removed some tight coupling with classes like 
{@link StreamTask}.
+ *
+ * Methods are guaranteed not to be called concurrently.
+ *
+ * @param  The output type of the operator
+ */
+@Experimental
+public abstract class AbstractStreamOperatorV2 implements 
StreamOperator {
+   /** The logger used by the operator class and its subclasses. */
+   protected static final Logger LOG = 
LoggerFactory.getLogger(AbstractStreamOperatorV2.class);
+
+   protected final StreamConfig config;
+   protected final Output> output;
+   private final StreamingRuntimeContext runtimeContext;
+   private final ExecutionConfig executionConfig;
+   private final ClassLoader userCodeClassLoader;
+   private final CloseableRegistry cancelables;
+   private final long[] inputWatermarks;
+
+   /** Metric group for the operator. */
+   protected final OperatorMetricGroup metrics;
+   protected final LatencyStats latencyStats;
+   protected final ProcessingTimeService processingTimeService;
+
+   private StreamOperatorStateHandler stateHandler;
+
+   // We keep track of watermarks from both inputs, the combined input is 
the minimum
+   

[GitHub] [flink] pnowojski commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator

2020-03-23 Thread GitBox
pnowojski commented on a change in pull request #11403: 
[FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for 
AbstractStreamOperator
URL: https://github.com/apache/flink/pull/11403#discussion_r396607696
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java
 ##
 @@ -0,0 +1,411 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.KeyedStateStore;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.DefaultKeyedStateStore;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
+import org.apache.flink.runtime.state.KeyGroupsList;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream;
+import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateInitializationContextImpl;
+import org.apache.flink.runtime.state.StatePartitionStreamProvider;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.util.CloseableIterable;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.function.ThrowingConsumer;
+
+import org.apache.flink.shaded.guava18.com.google.common.io.Closer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Optional;
+
+/**
+ * Class encapsulating various state backend handling logic for {@link 
StreamOperator} implementations.
+ */
+@Internal
+public class StreamOperatorStateHandler {
+
+   protected static final Logger LOG = 
LoggerFactory.getLogger(StreamOperatorStateHandler.class);
+
+   /** Backend for keyed state. This might be empty if we're not on a 
keyed stream. */
+   @Nullable
+   private final AbstractKeyedStateBackend keyedStateBackend;
+   private final CloseableRegistry closeableRegistry;
+   @Nullable
+   private final DefaultKeyedStateStore keyedStateStore;
+   private final OperatorStateBackend operatorStateBackend;
+   private final InternalTimeServiceManager timeServiceManager;
+   private final StreamOperatorStateContext context;
+
+   public StreamOperatorStateHandler(
+   StreamOperatorStateContext context,
+   ExecutionConfig executionConfig,
+   CloseableRegistry closeableRegistry) {
+   this.context = context;
+   operatorStateBackend = context.operatorStateBackend();
+   keyedStateBackend = context.keyedStateBackend();
+   this.closeableRegistry = closeableRegistry;
+
+   if (keyedStateBackend != null) {
+   keyedStateStore = new 
DefaultKeyedStateStore(keyedStateBackend, executionConfig);
+   }
+   else {
+   keyedStateStore = null;
+   }
+
+   timeServiceManager = 

[GitHub] [flink] pnowojski commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator

2020-03-23 Thread GitBox
pnowojski commented on a change in pull request #11403: 
[FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for 
AbstractStreamOperator
URL: https://github.com/apache/flink/pull/11403#discussion_r396444130
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java
 ##
 @@ -0,0 +1,411 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.KeyedStateStore;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.DefaultKeyedStateStore;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
+import org.apache.flink.runtime.state.KeyGroupsList;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream;
+import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateInitializationContextImpl;
+import org.apache.flink.runtime.state.StatePartitionStreamProvider;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.util.CloseableIterable;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.function.ThrowingConsumer;
+
+import org.apache.flink.shaded.guava18.com.google.common.io.Closer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Optional;
+
+/**
+ * Class encapsulating various state backend handling logic for {@link 
StreamOperator} implementations.
+ */
+@Internal
+public class StreamOperatorStateHandler {
+
+   protected static final Logger LOG = 
LoggerFactory.getLogger(StreamOperatorStateHandler.class);
+
+   /** Backend for keyed state. This might be empty if we're not on a 
keyed stream. */
+   @Nullable
+   private final AbstractKeyedStateBackend keyedStateBackend;
+   private final CloseableRegistry closeableRegistry;
+   @Nullable
+   private final DefaultKeyedStateStore keyedStateStore;
+   private final OperatorStateBackend operatorStateBackend;
+   private final InternalTimeServiceManager timeServiceManager;
+   private final StreamOperatorStateContext context;
+
+   public StreamOperatorStateHandler(
+   StreamOperatorStateContext context,
+   ExecutionConfig executionConfig,
+   CloseableRegistry closeableRegistry) {
+   this.context = context;
+   operatorStateBackend = context.operatorStateBackend();
+   keyedStateBackend = context.keyedStateBackend();
+   this.closeableRegistry = closeableRegistry;
+
+   if (keyedStateBackend != null) {
+   keyedStateStore = new 
DefaultKeyedStateStore(keyedStateBackend, executionConfig);
+   }
+   else {
+   keyedStateStore = null;
+   }
+
+   timeServiceManager = 

[GitHub] [flink] pnowojski commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator

2020-03-23 Thread GitBox
pnowojski commented on a change in pull request #11403: 
[FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for 
AbstractStreamOperator
URL: https://github.com/apache/flink/pull/11403#discussion_r396430496
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java
 ##
 @@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.KeyedStateStore;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.DefaultKeyedStateStore;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
+import org.apache.flink.runtime.state.KeyGroupsList;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream;
+import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateInitializationContextImpl;
+import org.apache.flink.runtime.state.StatePartitionStreamProvider;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.util.CloseableIterable;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.function.ThrowingConsumer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Optional;
+
+/**
+ * Class encapsulating various state backend handling logic for {@link 
StreamOperator} implementations.
+ */
+@PublicEvolving
+public class StreamOperatorStateHandler {
+
+   protected static final Logger LOG = 
LoggerFactory.getLogger(StreamOperatorStateHandler.class);
+
+   /** Backend for keyed state. This might be empty if we're not on a 
keyed stream. */
+   @Nullable
+   private final AbstractKeyedStateBackend keyedStateBackend;
+   private final CloseableRegistry closeableRegistry;
+   @Nullable
+   private final DefaultKeyedStateStore keyedStateStore;
+   private final OperatorStateBackend operatorStateBackend;
+   private final InternalTimeServiceManager timeServiceManager;
+   private final StreamOperatorStateContext context;
+
+   public StreamOperatorStateHandler(
+   StreamOperatorStateContext context,
+   ExecutionConfig executionConfig,
+   CloseableRegistry closeableRegistry) {
+   this.context = context;
+   operatorStateBackend = context.operatorStateBackend();
+   keyedStateBackend = context.keyedStateBackend();
+   this.closeableRegistry = closeableRegistry;
+
+   if (keyedStateBackend != null) {
+   keyedStateStore = new 
DefaultKeyedStateStore(keyedStateBackend, executionConfig);
+   }
+   else {
+   keyedStateStore = null;
+   }
+
+   timeServiceManager = context.internalTimerServiceManager();
+ 

[GitHub] [flink] pnowojski commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator

2020-03-23 Thread GitBox
pnowojski commented on a change in pull request #11403: 
[FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for 
AbstractStreamOperator
URL: https://github.com/apache/flink/pull/11403#discussion_r396430496
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java
 ##
 @@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.KeyedStateStore;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.DefaultKeyedStateStore;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
+import org.apache.flink.runtime.state.KeyGroupsList;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream;
+import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateInitializationContextImpl;
+import org.apache.flink.runtime.state.StatePartitionStreamProvider;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.util.CloseableIterable;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.function.ThrowingConsumer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Optional;
+
+/**
+ * Class encapsulating various state backend handling logic for {@link 
StreamOperator} implementations.
+ */
+@PublicEvolving
+public class StreamOperatorStateHandler {
+
+   protected static final Logger LOG = 
LoggerFactory.getLogger(StreamOperatorStateHandler.class);
+
+   /** Backend for keyed state. This might be empty if we're not on a 
keyed stream. */
+   @Nullable
+   private final AbstractKeyedStateBackend keyedStateBackend;
+   private final CloseableRegistry closeableRegistry;
+   @Nullable
+   private final DefaultKeyedStateStore keyedStateStore;
+   private final OperatorStateBackend operatorStateBackend;
+   private final InternalTimeServiceManager timeServiceManager;
+   private final StreamOperatorStateContext context;
+
+   public StreamOperatorStateHandler(
+   StreamOperatorStateContext context,
+   ExecutionConfig executionConfig,
+   CloseableRegistry closeableRegistry) {
+   this.context = context;
+   operatorStateBackend = context.operatorStateBackend();
+   keyedStateBackend = context.keyedStateBackend();
+   this.closeableRegistry = closeableRegistry;
+
+   if (keyedStateBackend != null) {
+   keyedStateStore = new 
DefaultKeyedStateStore(keyedStateBackend, executionConfig);
+   }
+   else {
+   keyedStateStore = null;
+   }
+
+   timeServiceManager = context.internalTimerServiceManager();
+ 

[GitHub] [flink] pnowojski commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator

2020-03-23 Thread GitBox
pnowojski commented on a change in pull request #11403: 
[FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for 
AbstractStreamOperator
URL: https://github.com/apache/flink/pull/11403#discussion_r396410040
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorFactory.java
 ##
 @@ -32,14 +30,13 @@
  *
  * @param  The output type of the operator
  */
-@Internal
+@PublicEvolving
 public interface StreamOperatorFactory extends Serializable {
 
 Review comment:
   It should have been `@PublicEvolving` (or `@Experimental`) from the 
beginning, as it's on the same level as `StreamOperator`. Also you need 
factories, to actually construct any multiple input stream operator. I could 
mark it `@Experimental` if you prefer so.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator

2020-03-19 Thread GitBox
pnowojski commented on a change in pull request #11403: 
[FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for 
AbstractStreamOperator
URL: https://github.com/apache/flink/pull/11403#discussion_r394915205
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorBase.java
 ##
 @@ -0,0 +1,488 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.KeyedStateStore;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.streaming.util.LatencyStats;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Locale;
+import java.util.Optional;
+
+/**
+ * New base class for all stream operators, replacing previous {@link 
AbstractStreamOperator}.
+ * Currently intended to work with {@link MultipleInputStreamOperator}.
+ *
+ * One note-able difference in comparison to {@link AbstractStreamOperator} 
is lack of
+ * {@link AbstractStreamOperator#setup(StreamTask, StreamConfig, Output)} in 
favor of initialisation
+ * in the constructor, and removed some tight coupling with classes like 
{@link StreamTask}.
+ *
+ * Methods are guaranteed not to be called concurrently.
+ *
+ * @param  The output type of the operator
+ */
+@Experimental
+public abstract class StreamOperatorBase implements StreamOperator {
+   /** The logger used by the operator class and its subclasses. */
+   protected static final Logger LOG = 
LoggerFactory.getLogger(StreamOperatorBase.class);
+
+   protected final StreamConfig config;
+   protected final Output> output;
+   private final StreamingRuntimeContext runtimeContext;
+   private final ExecutionConfig executionConfig;
+   private final ClassLoader userCodeClassLoader;
+   private final CloseableRegistry cancelables;
+   private final long[] inputWatermarks;
+
+   /** Metric group for the operator. */
+   protected final OperatorMetricGroup metrics;
+   protected final LatencyStats latencyStats;
+   protected final ProcessingTimeService processingTimeService;
+
+   private StreamOperatorStateHandler stateHandler;
+
+   // We keep track of watermarks from both inputs, the combined input is 
the minimum
+   // Once the minimum advances we emit a new 

[GitHub] [flink] pnowojski commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator

2020-03-19 Thread GitBox
pnowojski commented on a change in pull request #11403: 
[FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for 
AbstractStreamOperator
URL: https://github.com/apache/flink/pull/11403#discussion_r394933689
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java
 ##
 @@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.KeyedStateStore;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.DefaultKeyedStateStore;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
+import org.apache.flink.runtime.state.KeyGroupsList;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream;
+import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateInitializationContextImpl;
+import org.apache.flink.runtime.state.StatePartitionStreamProvider;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.util.CloseableIterable;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.function.ThrowingConsumer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Optional;
+
+/**
+ * Class encapsulating various state backend handling logic for {@link 
StreamOperator} implementations.
+ */
+@PublicEvolving
+public class StreamOperatorStateHandler {
+
+   protected static final Logger LOG = 
LoggerFactory.getLogger(StreamOperatorStateHandler.class);
+
+   /** Backend for keyed state. This might be empty if we're not on a 
keyed stream. */
+   @Nullable
+   private final AbstractKeyedStateBackend keyedStateBackend;
+   private final CloseableRegistry closeableRegistry;
+   @Nullable
+   private final DefaultKeyedStateStore keyedStateStore;
+   private final OperatorStateBackend operatorStateBackend;
+   private final InternalTimeServiceManager timeServiceManager;
+   private final StreamOperatorStateContext context;
+
+   public StreamOperatorStateHandler(
+   StreamOperatorStateContext context,
+   ExecutionConfig executionConfig,
+   CloseableRegistry closeableRegistry) {
+   this.context = context;
+   operatorStateBackend = context.operatorStateBackend();
+   keyedStateBackend = context.keyedStateBackend();
+   this.closeableRegistry = closeableRegistry;
+
+   if (keyedStateBackend != null) {
+   keyedStateStore = new 
DefaultKeyedStateStore(keyedStateBackend, executionConfig);
+   }
+   else {
+   keyedStateStore = null;
+   }
+
+   timeServiceManager = context.internalTimerServiceManager();
+ 

[GitHub] [flink] pnowojski commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator

2020-03-19 Thread GitBox
pnowojski commented on a change in pull request #11403: 
[FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for 
AbstractStreamOperator
URL: https://github.com/apache/flink/pull/11403#discussion_r394933689
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java
 ##
 @@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.KeyedStateStore;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.DefaultKeyedStateStore;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
+import org.apache.flink.runtime.state.KeyGroupsList;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream;
+import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateInitializationContextImpl;
+import org.apache.flink.runtime.state.StatePartitionStreamProvider;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.util.CloseableIterable;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.function.ThrowingConsumer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Optional;
+
+/**
+ * Class encapsulating various state backend handling logic for {@link 
StreamOperator} implementations.
+ */
+@PublicEvolving
+public class StreamOperatorStateHandler {
+
+   protected static final Logger LOG = 
LoggerFactory.getLogger(StreamOperatorStateHandler.class);
+
+   /** Backend for keyed state. This might be empty if we're not on a 
keyed stream. */
+   @Nullable
+   private final AbstractKeyedStateBackend keyedStateBackend;
+   private final CloseableRegistry closeableRegistry;
+   @Nullable
+   private final DefaultKeyedStateStore keyedStateStore;
+   private final OperatorStateBackend operatorStateBackend;
+   private final InternalTimeServiceManager timeServiceManager;
+   private final StreamOperatorStateContext context;
+
+   public StreamOperatorStateHandler(
+   StreamOperatorStateContext context,
+   ExecutionConfig executionConfig,
+   CloseableRegistry closeableRegistry) {
+   this.context = context;
+   operatorStateBackend = context.operatorStateBackend();
+   keyedStateBackend = context.keyedStateBackend();
+   this.closeableRegistry = closeableRegistry;
+
+   if (keyedStateBackend != null) {
+   keyedStateStore = new 
DefaultKeyedStateStore(keyedStateBackend, executionConfig);
+   }
+   else {
+   keyedStateStore = null;
+   }
+
+   timeServiceManager = context.internalTimerServiceManager();
+ 

[GitHub] [flink] pnowojski commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator

2020-03-19 Thread GitBox
pnowojski commented on a change in pull request #11403: 
[FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for 
AbstractStreamOperator
URL: https://github.com/apache/flink/pull/11403#discussion_r394932631
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java
 ##
 @@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.KeyedStateStore;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.DefaultKeyedStateStore;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
+import org.apache.flink.runtime.state.KeyGroupsList;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream;
+import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateInitializationContextImpl;
+import org.apache.flink.runtime.state.StatePartitionStreamProvider;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.util.CloseableIterable;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.function.ThrowingConsumer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Optional;
+
+/**
+ * Class encapsulating various state backend handling logic for {@link 
StreamOperator} implementations.
+ */
+@PublicEvolving
+public class StreamOperatorStateHandler {
+
+   protected static final Logger LOG = 
LoggerFactory.getLogger(StreamOperatorStateHandler.class);
+
+   /** Backend for keyed state. This might be empty if we're not on a 
keyed stream. */
+   @Nullable
+   private final AbstractKeyedStateBackend keyedStateBackend;
+   private final CloseableRegistry closeableRegistry;
+   @Nullable
+   private final DefaultKeyedStateStore keyedStateStore;
+   private final OperatorStateBackend operatorStateBackend;
+   private final InternalTimeServiceManager timeServiceManager;
+   private final StreamOperatorStateContext context;
+
+   public StreamOperatorStateHandler(
+   StreamOperatorStateContext context,
+   ExecutionConfig executionConfig,
+   CloseableRegistry closeableRegistry) {
+   this.context = context;
+   operatorStateBackend = context.operatorStateBackend();
+   keyedStateBackend = context.keyedStateBackend();
+   this.closeableRegistry = closeableRegistry;
+
+   if (keyedStateBackend != null) {
+   keyedStateStore = new 
DefaultKeyedStateStore(keyedStateBackend, executionConfig);
+   }
+   else {
+   keyedStateStore = null;
+   }
+
+   timeServiceManager = context.internalTimerServiceManager();
+ 

[GitHub] [flink] pnowojski commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator

2020-03-19 Thread GitBox
pnowojski commented on a change in pull request #11403: 
[FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for 
AbstractStreamOperator
URL: https://github.com/apache/flink/pull/11403#discussion_r394932631
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java
 ##
 @@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.KeyedStateStore;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.DefaultKeyedStateStore;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
+import org.apache.flink.runtime.state.KeyGroupsList;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream;
+import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateInitializationContextImpl;
+import org.apache.flink.runtime.state.StatePartitionStreamProvider;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.util.CloseableIterable;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.function.ThrowingConsumer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Optional;
+
+/**
+ * Class encapsulating various state backend handling logic for {@link 
StreamOperator} implementations.
+ */
+@PublicEvolving
+public class StreamOperatorStateHandler {
+
+   protected static final Logger LOG = 
LoggerFactory.getLogger(StreamOperatorStateHandler.class);
+
+   /** Backend for keyed state. This might be empty if we're not on a 
keyed stream. */
+   @Nullable
+   private final AbstractKeyedStateBackend keyedStateBackend;
+   private final CloseableRegistry closeableRegistry;
+   @Nullable
+   private final DefaultKeyedStateStore keyedStateStore;
+   private final OperatorStateBackend operatorStateBackend;
+   private final InternalTimeServiceManager timeServiceManager;
+   private final StreamOperatorStateContext context;
+
+   public StreamOperatorStateHandler(
+   StreamOperatorStateContext context,
+   ExecutionConfig executionConfig,
+   CloseableRegistry closeableRegistry) {
+   this.context = context;
+   operatorStateBackend = context.operatorStateBackend();
+   keyedStateBackend = context.keyedStateBackend();
+   this.closeableRegistry = closeableRegistry;
+
+   if (keyedStateBackend != null) {
+   keyedStateStore = new 
DefaultKeyedStateStore(keyedStateBackend, executionConfig);
+   }
+   else {
+   keyedStateStore = null;
+   }
+
+   timeServiceManager = context.internalTimerServiceManager();
+ 

[GitHub] [flink] pnowojski commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator

2020-03-19 Thread GitBox
pnowojski commented on a change in pull request #11403: 
[FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for 
AbstractStreamOperator
URL: https://github.com/apache/flink/pull/11403#discussion_r394915205
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorBase.java
 ##
 @@ -0,0 +1,488 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.KeyedStateStore;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.streaming.util.LatencyStats;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Locale;
+import java.util.Optional;
+
+/**
+ * New base class for all stream operators, replacing previous {@link 
AbstractStreamOperator}.
+ * Currently intended to work with {@link MultipleInputStreamOperator}.
+ *
+ * One note-able difference in comparison to {@link AbstractStreamOperator} 
is lack of
+ * {@link AbstractStreamOperator#setup(StreamTask, StreamConfig, Output)} in 
favor of initialisation
+ * in the constructor, and removed some tight coupling with classes like 
{@link StreamTask}.
+ *
+ * Methods are guaranteed not to be called concurrently.
+ *
+ * @param  The output type of the operator
+ */
+@Experimental
+public abstract class StreamOperatorBase implements StreamOperator {
+   /** The logger used by the operator class and its subclasses. */
+   protected static final Logger LOG = 
LoggerFactory.getLogger(StreamOperatorBase.class);
+
+   protected final StreamConfig config;
+   protected final Output> output;
+   private final StreamingRuntimeContext runtimeContext;
+   private final ExecutionConfig executionConfig;
+   private final ClassLoader userCodeClassLoader;
+   private final CloseableRegistry cancelables;
+   private final long[] inputWatermarks;
+
+   /** Metric group for the operator. */
+   protected final OperatorMetricGroup metrics;
+   protected final LatencyStats latencyStats;
+   protected final ProcessingTimeService processingTimeService;
+
+   private StreamOperatorStateHandler stateHandler;
+
+   // We keep track of watermarks from both inputs, the combined input is 
the minimum
+   // Once the minimum advances we emit a new 

[GitHub] [flink] pnowojski commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator

2020-03-19 Thread GitBox
pnowojski commented on a change in pull request #11403: 
[FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for 
AbstractStreamOperator
URL: https://github.com/apache/flink/pull/11403#discussion_r394369655
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java
 ##
 @@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.KeyedStateStore;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.DefaultKeyedStateStore;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
+import org.apache.flink.runtime.state.KeyGroupsList;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream;
+import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateInitializationContextImpl;
+import org.apache.flink.runtime.state.StatePartitionStreamProvider;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.util.CloseableIterable;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.function.ThrowingConsumer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Optional;
+
+/**
+ * Class encapsulating various state backend handling logic for {@link 
StreamOperator} implementations.
+ */
+@PublicEvolving
+public class StreamOperatorStateHandler {
+
+   protected static final Logger LOG = 
LoggerFactory.getLogger(StreamOperatorStateHandler.class);
+
+   /** Backend for keyed state. This might be empty if we're not on a 
keyed stream. */
+   @Nullable
+   private final AbstractKeyedStateBackend keyedStateBackend;
+   private final CloseableRegistry closeableRegistry;
+   @Nullable
+   private final DefaultKeyedStateStore keyedStateStore;
+   private final OperatorStateBackend operatorStateBackend;
+   private final InternalTimeServiceManager timeServiceManager;
+   private final StreamOperatorStateContext context;
+
+   public StreamOperatorStateHandler(
+   StreamOperatorStateContext context,
+   ExecutionConfig executionConfig,
+   CloseableRegistry closeableRegistry) {
+   this.context = context;
+   operatorStateBackend = context.operatorStateBackend();
+   keyedStateBackend = context.keyedStateBackend();
+   this.closeableRegistry = closeableRegistry;
+
+   if (keyedStateBackend != null) {
+   keyedStateStore = new 
DefaultKeyedStateStore(keyedStateBackend, executionConfig);
+   }
+   else {
+   keyedStateStore = null;
+   }
+
+   timeServiceManager = context.internalTimerServiceManager();
+ 

[GitHub] [flink] pnowojski commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator

2020-03-19 Thread GitBox
pnowojski commented on a change in pull request #11403: 
[FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for 
AbstractStreamOperator
URL: https://github.com/apache/flink/pull/11403#discussion_r394823900
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorFactory.java
 ##
 @@ -32,14 +30,13 @@
  *
  * @param  The output type of the operator
  */
-@Internal
+@PublicEvolving
 public interface StreamOperatorFactory extends Serializable {
 
/**
 * Create the operator. Sets access to the context and the output.
 */
-   > T createStreamOperator(
-   StreamTask containingTask, StreamConfig config, 
Output> output);
+   > T 
createStreamOperator(StreamOperatorInitializer initializer);
 
 Review comment:
   I agree and I don't remember what was the reason behind the template 
argument here, maybe it was just some error that went under our radar when 
reviewing/merging. Definitely trying to fix it is on my to do list. 
   
   Currently I would lean towards option 2. But I don't want to change status 
quo right know, as the PR is already pretty big and I don't know if that will 
not explode into some larger change/fix - especially that I'm not sure that 
there might be some reason behind this construct?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator

2020-03-19 Thread GitBox
pnowojski commented on a change in pull request #11403: 
[FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for 
AbstractStreamOperator
URL: https://github.com/apache/flink/pull/11403#discussion_r394828846
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorFactoryUtil.java
 ##
 @@ -57,7 +57,13 @@
((ProcessingTimeServiceAware) 
operatorFactory).setProcessingTimeService(processingTimeService);
}
 
-   OP op = operatorFactory.createStreamOperator(containingTask, 
configuration, output);
+   // TODO: what to do with ProcessingTimeServiceAware?
+   OP op = operatorFactory.createStreamOperator(
+   new StreamOperatorInitializer<>(
+   containingTask,
+   configuration,
+   output,
+   processingTimeService));
 
 Review comment:
   > Note for that goal, we would need to get rid of SimpleOperatorFactory: 
Once an operator has been created, it cannot go back into factory. If we need 
to functionality, then I only see builder pattern as a clean solution, where 
going back and forth between operator and operator builder is doable
   
   `SimpleOperatorFactory` is not intended to make possible to go back and 
forth between operators and factories, but just to provide backward compatible 
class for transporting `SetupableStreamOperator` classes. Also as it's intended 
to be removed in the future (we can do it as that's `PublicEvolving` API), in 
the design let's assume `SimpleOperatorFactory` doesn't exist.
   
   But @AHeise, what do you think we should do with 
`ProcessingTimeServiceAware`? `@Deprecate` and mark it for removal? Or Should I 
do it in this PR?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator

2020-03-19 Thread GitBox
pnowojski commented on a change in pull request #11403: 
[FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for 
AbstractStreamOperator
URL: https://github.com/apache/flink/pull/11403#discussion_r394829936
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorInitializer.java
 ##
 @@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+
+/**
+ * Helper  class to construct {@link StreamOperatorBase}. Wraps couple of 
internal parameters
+ * to simplify for users construction of classes extending {@link 
StreamOperatorBase} and to
+ * allow for backward compatible changes in the {@link StreamOperatorBase}'s 
constructor.
+ */
+@Experimental
+public class StreamOperatorInitializer {
 
 Review comment:
   Do you mean adding java doc? I've added it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator

2020-03-19 Thread GitBox
pnowojski commented on a change in pull request #11403: 
[FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for 
AbstractStreamOperator
URL: https://github.com/apache/flink/pull/11403#discussion_r394843938
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorBase.java
 ##
 @@ -0,0 +1,488 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.KeyedStateStore;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.streaming.util.LatencyStats;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Locale;
+import java.util.Optional;
+
+/**
+ * New base class for all stream operators, replacing previous {@link 
AbstractStreamOperator}.
+ * Currently intended to work with {@link MultipleInputStreamOperator}.
+ *
+ * One note-able difference in comparison to {@link AbstractStreamOperator} 
is lack of
+ * {@link AbstractStreamOperator#setup(StreamTask, StreamConfig, Output)} in 
favor of initialisation
+ * in the constructor, and removed some tight coupling with classes like 
{@link StreamTask}.
+ *
+ * Methods are guaranteed not to be called concurrently.
+ *
+ * @param  The output type of the operator
+ */
+@Experimental
+public abstract class StreamOperatorBase implements StreamOperator {
+   /** The logger used by the operator class and its subclasses. */
+   protected static final Logger LOG = 
LoggerFactory.getLogger(StreamOperatorBase.class);
+
+   protected final StreamConfig config;
+   protected final Output> output;
+   private final StreamingRuntimeContext runtimeContext;
+   private final ExecutionConfig executionConfig;
+   private final ClassLoader userCodeClassLoader;
+   private final CloseableRegistry cancelables;
+   private final long[] inputWatermarks;
+
+   /** Metric group for the operator. */
+   protected final OperatorMetricGroup metrics;
+   protected final LatencyStats latencyStats;
+   protected final ProcessingTimeService processingTimeService;
+
+   private StreamOperatorStateHandler stateHandler;
+
+   // We keep track of watermarks from both inputs, the combined input is 
the minimum
+   // Once the minimum advances we emit a new 

[GitHub] [flink] pnowojski commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator

2020-03-19 Thread GitBox
pnowojski commented on a change in pull request #11403: 
[FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for 
AbstractStreamOperator
URL: https://github.com/apache/flink/pull/11403#discussion_r394405466
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperatorFactory.java
 ##
 @@ -58,11 +56,11 @@ public void setMailboxExecutor(MailboxExecutor 
mailboxExecutor) {
}
 
@Override
-   public StreamOperator createStreamOperator(StreamTask containingTask, 
StreamConfig config, Output output) {
+   public > T 
createStreamOperator(StreamOperatorInitializer initializer) {
 
 Review comment:
   I will go with `StreamOperatorParameters`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator

2020-03-19 Thread GitBox
pnowojski commented on a change in pull request #11403: 
[FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for 
AbstractStreamOperator
URL: https://github.com/apache/flink/pull/11403#discussion_r394850101
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/YieldingOperatorFactory.java
 ##
 @@ -17,10 +17,13 @@
 
 package org.apache.flink.streaming.api.operators;
 
+import org.apache.flink.annotation.PublicEvolving;
+
 /**
  * An operator that needs access to the {@link MailboxExecutor} to yield to 
downstream operators needs to be created
  * through a factory implementing this interface.
  */
+@PublicEvolving
 public interface YieldingOperatorFactory extends 
StreamOperatorFactory {
 
 Review comment:
   That would contradict
   ```
// yielding operators cannot be chained to legacy sources
if (downStreamOperator instanceof YieldingOperatorFactory) {
// unfortunately the information that vertices have 
been chained is not preserved at this point
return !getHeadOperator(upStreamVertex, 
streamGraph).isStreamSource();
}
   ```
   What do you think about having mixed pattern? Common parameters in 
`StreamOperatorParameters`, and "special" or rare ones as decorators?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator

2020-03-19 Thread GitBox
pnowojski commented on a change in pull request #11403: 
[FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for 
AbstractStreamOperator
URL: https://github.com/apache/flink/pull/11403#discussion_r394828951
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorInitializer.java
 ##
 @@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+
+/**
+ * Helper  class to construct {@link StreamOperatorBase}. Wraps couple of 
internal parameters
+ * to simplify for users construction of classes extending {@link 
StreamOperatorBase} and to
+ * allow for backward compatible changes in the {@link StreamOperatorBase}'s 
constructor.
+ */
+@Experimental
+public class StreamOperatorInitializer {
 
 Review comment:
   Let's keep it simple for now.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator

2020-03-19 Thread GitBox
pnowojski commented on a change in pull request #11403: 
[FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for 
AbstractStreamOperator
URL: https://github.com/apache/flink/pull/11403#discussion_r394843638
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorBase.java
 ##
 @@ -0,0 +1,484 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.KeyedStateStore;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.streaming.util.LatencyStats;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Locale;
+import java.util.Optional;
+
+/**
+ * New base class for all stream operators, replacing previous {@link 
AbstractStreamOperator}.
+ * Currently intended to work with {@link MultipleInputStreamOperator}.
+ *
+ * One note-able difference in comparison to {@link AbstractStreamOperator} 
is lack of
+ * {@link AbstractStreamOperator#setup(StreamTask, StreamConfig, Output)} in 
favor of initialisation
+ * in the constructor, and removed some tight coupling with classes like 
{@link StreamTask}.
+ *
+ * Methods are guaranteed not to be called concurrently.
+ *
+ * @param  The output type of the operator
+ */
+@Experimental
+public abstract class StreamOperatorBase implements StreamOperator {
 
 Review comment:
   Renamed to `AbstractStreamOperatorV2`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator

2020-03-19 Thread GitBox
pnowojski commented on a change in pull request #11403: 
[FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for 
AbstractStreamOperator
URL: https://github.com/apache/flink/pull/11403#discussion_r394370772
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java
 ##
 @@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.KeyedStateStore;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.DefaultKeyedStateStore;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
+import org.apache.flink.runtime.state.KeyGroupsList;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream;
+import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateInitializationContextImpl;
+import org.apache.flink.runtime.state.StatePartitionStreamProvider;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.util.CloseableIterable;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.function.ThrowingConsumer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Optional;
+
+/**
+ * Class encapsulating various state backend handling logic for {@link 
StreamOperator} implementations.
+ */
+@PublicEvolving
+public class StreamOperatorStateHandler {
+
+   protected static final Logger LOG = 
LoggerFactory.getLogger(StreamOperatorStateHandler.class);
+
+   /** Backend for keyed state. This might be empty if we're not on a 
keyed stream. */
+   @Nullable
+   private final AbstractKeyedStateBackend keyedStateBackend;
+   private final CloseableRegistry closeableRegistry;
+   @Nullable
+   private final DefaultKeyedStateStore keyedStateStore;
+   private final OperatorStateBackend operatorStateBackend;
+   private final InternalTimeServiceManager timeServiceManager;
+   private final StreamOperatorStateContext context;
+
+   public StreamOperatorStateHandler(
+   StreamOperatorStateContext context,
+   ExecutionConfig executionConfig,
+   CloseableRegistry closeableRegistry) {
+   this.context = context;
+   operatorStateBackend = context.operatorStateBackend();
+   keyedStateBackend = context.keyedStateBackend();
+   this.closeableRegistry = closeableRegistry;
+
+   if (keyedStateBackend != null) {
+   keyedStateStore = new 
DefaultKeyedStateStore(keyedStateBackend, executionConfig);
+   }
+   else {
+   keyedStateStore = null;
+   }
+
+   timeServiceManager = context.internalTimerServiceManager();
+ 

[GitHub] [flink] pnowojski commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator

2020-03-19 Thread GitBox
pnowojski commented on a change in pull request #11403: 
[FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for 
AbstractStreamOperator
URL: https://github.com/apache/flink/pull/11403#discussion_r394848872
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorBase.java
 ##
 @@ -0,0 +1,488 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.KeyedStateStore;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.streaming.util.LatencyStats;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Locale;
+import java.util.Optional;
+
+/**
+ * New base class for all stream operators, replacing previous {@link 
AbstractStreamOperator}.
+ * Currently intended to work with {@link MultipleInputStreamOperator}.
+ *
+ * One note-able difference in comparison to {@link AbstractStreamOperator} 
is lack of
+ * {@link AbstractStreamOperator#setup(StreamTask, StreamConfig, Output)} in 
favor of initialisation
+ * in the constructor, and removed some tight coupling with classes like 
{@link StreamTask}.
+ *
+ * Methods are guaranteed not to be called concurrently.
+ *
+ * @param  The output type of the operator
+ */
+@Experimental
+public abstract class StreamOperatorBase implements StreamOperator {
+   /** The logger used by the operator class and its subclasses. */
+   protected static final Logger LOG = 
LoggerFactory.getLogger(StreamOperatorBase.class);
+
+   protected final StreamConfig config;
+   protected final Output> output;
+   private final StreamingRuntimeContext runtimeContext;
+   private final ExecutionConfig executionConfig;
+   private final ClassLoader userCodeClassLoader;
+   private final CloseableRegistry cancelables;
+   private final long[] inputWatermarks;
+
+   /** Metric group for the operator. */
+   protected final OperatorMetricGroup metrics;
+   protected final LatencyStats latencyStats;
+   protected final ProcessingTimeService processingTimeService;
+
+   private StreamOperatorStateHandler stateHandler;
+
+   // We keep track of watermarks from both inputs, the combined input is 
the minimum
+   // Once the minimum advances we emit a new 

[GitHub] [flink] pnowojski commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator

2020-03-19 Thread GitBox
pnowojski commented on a change in pull request #11403: 
[FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for 
AbstractStreamOperator
URL: https://github.com/apache/flink/pull/11403#discussion_r394374886
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandlerTest.java
 ##
 @@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.metrics.util.InterceptingOperatorMetricGroup;
+import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
+import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.junit.Test;
+
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.TimeUnit;
+
+import static junit.framework.TestCase.assertTrue;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for {@link StreamOperatorStateHandlerTest}.
+ */
+public class StreamOperatorStateHandlerTest {
+   /**
+* Tests that a failing snapshot method call to the keyed state backend 
will trigger the closing
+* of the StateSnapshotContextSynchronousImpl and the cancellation of 
the
+* OperatorSnapshotResult. The latter is supposed to also cancel all 
assigned futures.
+*/
+   @Test
+   public void testFailingBackendSnapshotMethod() throws Exception {
+   final long checkpointId = 42L;
+   final long timestamp = 1L;
+
+   final CloseableRegistry closeableRegistry = new 
CloseableRegistry();
+
+   RunnableFuture> 
keyedStateManagedFuture = new CancelableFuture<>();
+   RunnableFuture> 
keyedStateRawFuture = new CancelableFuture<>();
+   RunnableFuture> 
operatorStateManagedFuture = new CancelableFuture<>();
+   RunnableFuture> 
operatorStateRawFuture = new CancelableFuture<>();
+
+   OperatorSnapshotFutures operatorSnapshotResult = new 
OperatorSnapshotFutures(
+   keyedStateManagedFuture,
+   keyedStateRawFuture,
+   operatorStateManagedFuture,
+   operatorStateRawFuture);
+
+   StateSnapshotContextSynchronousImpl context = new 
TestStateSnapshotContextSynchronousImpl(checkpointId, timestamp, 
closeableRegistry);
+   context.getRawKeyedOperatorStateOutput();
+   context.getRawOperatorStateOutput();
+
+   StreamTaskStateInitializerImpl stateInitializer =
+   new StreamTaskStateInitializerImpl(new 
MockEnvironmentBuilder().build(), new MemoryStateBackend());
+   StreamOperatorStateContext stateContext = 
stateInitializer.streamOperatorStateContext(
+   new OperatorID(),
+  

[GitHub] [flink] pnowojski commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator

2020-03-19 Thread GitBox
pnowojski commented on a change in pull request #11403: 
[FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for 
AbstractStreamOperator
URL: https://github.com/apache/flink/pull/11403#discussion_r394831873
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorFactory.java
 ##
 @@ -61,16 +59,16 @@ public void setMailboxExecutor(MailboxExecutor 
mailboxExecutor) {
}
 
@Override
-   public StreamOperator createStreamOperator(StreamTask containingTask, 
StreamConfig config, Output output) {
+   public > T 
createStreamOperator(StreamOperatorInitializer initializer) {
 
 Review comment:
   I know, but I had to do it for now, because of compile errors:
   
   ```
   Error:(62, 31) java: name clash: 
createStreamOperator(org.apache.flink.streaming.api.operators.StreamOperatorParameters)
 in org.apache.flink.streaming.api.operators.async.AsyncWaitOperatorFactory and 
createStreamOperator(org.apache.flink.streaming.api.operators.StreamOperatorParameters)
 in org.apache.flink.streaming.api.operators.StreamOperatorFactory have the 
same erasure, yet neither overrides the other
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator

2020-03-19 Thread GitBox
pnowojski commented on a change in pull request #11403: 
[FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for 
AbstractStreamOperator
URL: https://github.com/apache/flink/pull/11403#discussion_r394375674
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandlerTest.java
 ##
 @@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.metrics.util.InterceptingOperatorMetricGroup;
+import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
+import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.junit.Test;
+
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.TimeUnit;
+
+import static junit.framework.TestCase.assertTrue;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for {@link StreamOperatorStateHandlerTest}.
+ */
+public class StreamOperatorStateHandlerTest {
+   /**
+* Tests that a failing snapshot method call to the keyed state backend 
will trigger the closing
+* of the StateSnapshotContextSynchronousImpl and the cancellation of 
the
+* OperatorSnapshotResult. The latter is supposed to also cancel all 
assigned futures.
+*/
+   @Test
+   public void testFailingBackendSnapshotMethod() throws Exception {
+   final long checkpointId = 42L;
+   final long timestamp = 1L;
+
+   final CloseableRegistry closeableRegistry = new 
CloseableRegistry();
+
+   RunnableFuture> 
keyedStateManagedFuture = new CancelableFuture<>();
+   RunnableFuture> 
keyedStateRawFuture = new CancelableFuture<>();
+   RunnableFuture> 
operatorStateManagedFuture = new CancelableFuture<>();
+   RunnableFuture> 
operatorStateRawFuture = new CancelableFuture<>();
+
+   OperatorSnapshotFutures operatorSnapshotResult = new 
OperatorSnapshotFutures(
+   keyedStateManagedFuture,
+   keyedStateRawFuture,
+   operatorStateManagedFuture,
+   operatorStateRawFuture);
+
+   StateSnapshotContextSynchronousImpl context = new 
TestStateSnapshotContextSynchronousImpl(checkpointId, timestamp, 
closeableRegistry);
+   context.getRawKeyedOperatorStateOutput();
+   context.getRawOperatorStateOutput();
+
+   StreamTaskStateInitializerImpl stateInitializer =
+   new StreamTaskStateInitializerImpl(new 
MockEnvironmentBuilder().build(), new MemoryStateBackend());
+   StreamOperatorStateContext stateContext = 
stateInitializer.streamOperatorStateContext(
+   new OperatorID(),
+  

[GitHub] [flink] pnowojski commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator

2020-03-19 Thread GitBox
pnowojski commented on a change in pull request #11403: 
[FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for 
AbstractStreamOperator
URL: https://github.com/apache/flink/pull/11403#discussion_r394369655
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java
 ##
 @@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.KeyedStateStore;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.DefaultKeyedStateStore;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
+import org.apache.flink.runtime.state.KeyGroupsList;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream;
+import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateInitializationContextImpl;
+import org.apache.flink.runtime.state.StatePartitionStreamProvider;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.util.CloseableIterable;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.function.ThrowingConsumer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Optional;
+
+/**
+ * Class encapsulating various state backend handling logic for {@link 
StreamOperator} implementations.
+ */
+@PublicEvolving
+public class StreamOperatorStateHandler {
+
+   protected static final Logger LOG = 
LoggerFactory.getLogger(StreamOperatorStateHandler.class);
+
+   /** Backend for keyed state. This might be empty if we're not on a 
keyed stream. */
+   @Nullable
+   private final AbstractKeyedStateBackend keyedStateBackend;
+   private final CloseableRegistry closeableRegistry;
+   @Nullable
+   private final DefaultKeyedStateStore keyedStateStore;
+   private final OperatorStateBackend operatorStateBackend;
+   private final InternalTimeServiceManager timeServiceManager;
+   private final StreamOperatorStateContext context;
+
+   public StreamOperatorStateHandler(
+   StreamOperatorStateContext context,
+   ExecutionConfig executionConfig,
+   CloseableRegistry closeableRegistry) {
+   this.context = context;
+   operatorStateBackend = context.operatorStateBackend();
+   keyedStateBackend = context.keyedStateBackend();
+   this.closeableRegistry = closeableRegistry;
+
+   if (keyedStateBackend != null) {
+   keyedStateStore = new 
DefaultKeyedStateStore(keyedStateBackend, executionConfig);
+   }
+   else {
+   keyedStateStore = null;
+   }
+
+   timeServiceManager = context.internalTimerServiceManager();
+ 

[GitHub] [flink] pnowojski commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator

2020-03-19 Thread GitBox
pnowojski commented on a change in pull request #11403: 
[FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for 
AbstractStreamOperator
URL: https://github.com/apache/flink/pull/11403#discussion_r394373171
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandlerTest.java
 ##
 @@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.metrics.util.InterceptingOperatorMetricGroup;
+import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
+import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.junit.Test;
+
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.TimeUnit;
+
+import static junit.framework.TestCase.assertTrue;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for {@link StreamOperatorStateHandlerTest}.
+ */
+public class StreamOperatorStateHandlerTest {
+   /**
+* Tests that a failing snapshot method call to the keyed state backend 
will trigger the closing
+* of the StateSnapshotContextSynchronousImpl and the cancellation of 
the
+* OperatorSnapshotResult. The latter is supposed to also cancel all 
assigned futures.
+*/
+   @Test
+   public void testFailingBackendSnapshotMethod() throws Exception {
+   final long checkpointId = 42L;
+   final long timestamp = 1L;
+
+   final CloseableRegistry closeableRegistry = new 
CloseableRegistry();
 
 Review comment:
   I don't know, but closing doesn't hurt :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator

2020-03-13 Thread GitBox
pnowojski commented on a change in pull request #11403: 
[FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for 
AbstractStreamOperator
URL: https://github.com/apache/flink/pull/11403#discussion_r392202032
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorBase.java
 ##
 @@ -0,0 +1,484 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.KeyedStateStore;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.streaming.util.LatencyStats;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Locale;
+import java.util.Optional;
+
+/**
+ * New base class for all stream operators, replacing previous {@link 
AbstractStreamOperator}.
+ * Currently intended to work with {@link MultipleInputStreamOperator}.
+ *
+ * One note-able difference in comparison to {@link AbstractStreamOperator} 
is lack of
+ * {@link AbstractStreamOperator#setup(StreamTask, StreamConfig, Output)} in 
favor of initialisation
+ * in the constructor, and removed some tight coupling with classes like 
{@link StreamTask}.
+ *
+ * Methods are guaranteed not to be called concurrently.
+ *
+ * @param  The output type of the operator
+ */
+@Experimental
+public abstract class StreamOperatorBase implements StreamOperator {
 
 Review comment:
   alternative name could be `AbstractStreamOperatorV2`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator

2020-03-13 Thread GitBox
pnowojski commented on a change in pull request #11403: 
[FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for 
AbstractStreamOperator
URL: https://github.com/apache/flink/pull/11403#discussion_r392201502
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorFactoryUtil.java
 ##
 @@ -57,7 +57,13 @@
((ProcessingTimeServiceAware) 
operatorFactory).setProcessingTimeService(processingTimeService);
}
 
-   OP op = operatorFactory.createStreamOperator(containingTask, 
configuration, output);
+   // TODO: what to do with ProcessingTimeServiceAware?
+   OP op = operatorFactory.createStreamOperator(
+   new StreamOperatorInitializer<>(
+   containingTask,
+   configuration,
+   output,
+   processingTimeService));
 
 Review comment:
   @AHeise what do you think about including `processingTimeService` in 
`StreamOperatorInitializer` always, regardless of the 
`ProcessingTimeServiceAware`? Generally speaking what do you think about 
`StreamOperatorInitializer`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services