[GitHub] dawidwys commented on a change in pull request #6711: [FLINK-9377] [core, state backends] Remove serializers from checkpoints

2018-09-25 Thread GitBox
dawidwys commented on a change in pull request #6711: [FLINK-9377] [core, state 
backends] Remove serializers from checkpoints
URL: https://github.com/apache/flink/pull/6711#discussion_r220213096
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/BackwardsCompatibleConfigSnapshot.java
 ##
 @@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.Objects;
+
+/**
+ * A utility {@link TypeSerializerConfigSnapshot} that is used for backwards 
compatibility purposes.
+ *
+ * In older versions of Flink (<= 1.6), we used to write state serializers 
into checkpoints, along
+ * with the serializer's configuration snapshot. Since 1.7.0, we no longer 
wrote the serializers, but
+ * instead used the configuration snapshot as a factory to instantiate 
serializers for restoring state.
+ * However, since some outdated implementations of configuration snapshots did 
not contain sufficient
 
 Review comment:
   `did` -> `do` I would use present tense here, as you describe current state, 
don't you?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] dawidwys commented on a change in pull request #6711: [FLINK-9377] [core, state backends] Remove serializers from checkpoints

2018-09-25 Thread GitBox
dawidwys commented on a change in pull request #6711: [FLINK-9377] [core, state 
backends] Remove serializers from checkpoints
URL: https://github.com/apache/flink/pull/6711#discussion_r220212410
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/BackwardsCompatibleConfigSnapshot.java
 ##
 @@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.Objects;
+
+/**
+ * A utility {@link TypeSerializerConfigSnapshot} that is used for backwards 
compatibility purposes.
+ *
+ * In older versions of Flink (<= 1.6), we used to write state serializers 
into checkpoints, along
+ * with the serializer's configuration snapshot. Since 1.7.0, we no longer 
wrote the serializers, but
+ * instead used the configuration snapshot as a factory to instantiate 
serializers for restoring state.
 
 Review comment:
   `used` -> `use`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] dawidwys commented on a change in pull request #6711: [FLINK-9377] [core, state backends] Remove serializers from checkpoints

2018-09-25 Thread GitBox
dawidwys commented on a change in pull request #6711: [FLINK-9377] [core, state 
backends] Remove serializers from checkpoints
URL: https://github.com/apache/flink/pull/6711#discussion_r220227910
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshot.java
 ##
 @@ -21,35 +21,131 @@
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.core.io.VersionedIOReadableWritable;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.util.Preconditions;
 
+import java.io.IOException;
+
 /**
  * A {@code TypeSerializerConfigSnapshot} is a point-in-time view of a {@link 
TypeSerializer's} configuration.
- * The configuration snapshot of a serializer is persisted along with 
checkpoints of the managed state that the
- * serializer is registered to.
+ * The configuration snapshot of a serializer is persisted within checkpoints
+ * as a single source of meta information about the schema of serialized data 
in the checkpoint.
+ * This serves three purposes:
+ *
+ * 
+ *   Capturing serializer parameters and schema: a 
serializer's configuration snapshot
+ *   represents information about the parameters, state, and schema of a 
serializer.
+ *   This is explained in more detail below.
  *
- * The persisted configuration may later on be used by new serializers to 
ensure serialization compatibility
- * for the same managed state. In order for new serializers to be able to 
ensure this, the configuration snapshot
- * should encode sufficient information about:
+ *   Compatibility checks for new serializers: when new 
serializers are available,
+ *   they need to be checked whether or not they are compatible to read the 
data written by the previous serializer.
+ *   This is performed by providing the new serializer to the correspondibng 
serializer configuration
+ *   snapshots in checkpoints.
+ *
+ *   Factory for a read serializer when schema conversion is 
required: in the case that new
+ *   serializers are not compatible to read previous data, a schema conversion 
process executed across all data
+ *   is required before the new serializer can be continued to be used. This 
conversion process requires a compatible
+ *   read serializer to restore serialized bytes as objects, and then written 
back again using the new serializer.
+ *   In this scenario, the serializer configuration snapshots in checkpoints 
doubles as a factory for the read
+ *   serializer of the conversion process.
+ * 
+ *
+ * Serializer Configuration and Schema
+ *
+ * Since serializer configuration snapshots needs to be used to ensure 
serialization compatibility
+ * for the same managed state as well as serving as a factory for compatible 
read serializers, the configuration
+ * snapshot should encode sufficient information about:
  *
  * 
  *   Parameter settings of the serializer: parameters of 
the serializer include settings
  *   required to setup the serializer, or the state of the serializer if it is 
stateful. If the serializer
  *   has nested serializers, then the configuration snapshot should also 
contain the parameters of the nested
  *   serializers.
  *
- *   Serialization schema of the serializer: the data 
format used by the serializer.
+ *   Serialization schema of the serializer: the binary 
format used by the serializer, or
+ *   in other words, the schema of data written by the serializer.
  * 
  *
  * NOTE: Implementations must contain the default empty nullary 
constructor. This is required to be able to
  * deserialize the configuration snapshot from its binary form.
+ *
+ * @param  The data type that the originating serializer of this 
configuration serializes.
  */
 @PublicEvolving
-public abstract class TypeSerializerConfigSnapshot extends 
VersionedIOReadableWritable {
+public abstract class TypeSerializerConfigSnapshot extends 
VersionedIOReadableWritable {
 
/** The user code class loader; only relevant if this configuration 
instance was deserialized from binary form. */
private ClassLoader userCodeClassLoader;
 
+   /**
+* The originating serializer of this configuration snapshot.
+*
+* TODO to allow for incrementally adapting the implementation of 
serializer config snapshot subclasses,
+* TODO we currently have a base implementation for the {@link 
#restoreSerializer()}
+* TODO method which simply returns this serializer instance. The 
serializer is written
+* TODO and read using Java serialization as part of reading / writing 
the config snapshot
+*/
+   private TypeSerializer serializer;
+
+   /**
+* Creates a serializer using this configuration, that is capable of 
reading data
+* written by the serializer described by this configuration.
+*
+* @return the restored serializer.
+ 

[jira] [Commented] (FLINK-9377) Remove writing serializers as part of the checkpoint meta information

2018-09-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16627517#comment-16627517
 ] 

ASF GitHub Bot commented on FLINK-9377:
---

dawidwys commented on a change in pull request #6711: [FLINK-9377] [core, state 
backends] Remove serializers from checkpoints
URL: https://github.com/apache/flink/pull/6711#discussion_r220227910
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshot.java
 ##
 @@ -21,35 +21,131 @@
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.core.io.VersionedIOReadableWritable;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.util.Preconditions;
 
+import java.io.IOException;
+
 /**
  * A {@code TypeSerializerConfigSnapshot} is a point-in-time view of a {@link 
TypeSerializer's} configuration.
- * The configuration snapshot of a serializer is persisted along with 
checkpoints of the managed state that the
- * serializer is registered to.
+ * The configuration snapshot of a serializer is persisted within checkpoints
+ * as a single source of meta information about the schema of serialized data 
in the checkpoint.
+ * This serves three purposes:
+ *
+ * 
+ *   Capturing serializer parameters and schema: a 
serializer's configuration snapshot
+ *   represents information about the parameters, state, and schema of a 
serializer.
+ *   This is explained in more detail below.
  *
- * The persisted configuration may later on be used by new serializers to 
ensure serialization compatibility
- * for the same managed state. In order for new serializers to be able to 
ensure this, the configuration snapshot
- * should encode sufficient information about:
+ *   Compatibility checks for new serializers: when new 
serializers are available,
+ *   they need to be checked whether or not they are compatible to read the 
data written by the previous serializer.
+ *   This is performed by providing the new serializer to the correspondibng 
serializer configuration
+ *   snapshots in checkpoints.
+ *
+ *   Factory for a read serializer when schema conversion is 
required: in the case that new
+ *   serializers are not compatible to read previous data, a schema conversion 
process executed across all data
+ *   is required before the new serializer can be continued to be used. This 
conversion process requires a compatible
+ *   read serializer to restore serialized bytes as objects, and then written 
back again using the new serializer.
+ *   In this scenario, the serializer configuration snapshots in checkpoints 
doubles as a factory for the read
+ *   serializer of the conversion process.
+ * 
+ *
+ * Serializer Configuration and Schema
+ *
+ * Since serializer configuration snapshots needs to be used to ensure 
serialization compatibility
+ * for the same managed state as well as serving as a factory for compatible 
read serializers, the configuration
+ * snapshot should encode sufficient information about:
  *
  * 
  *   Parameter settings of the serializer: parameters of 
the serializer include settings
  *   required to setup the serializer, or the state of the serializer if it is 
stateful. If the serializer
  *   has nested serializers, then the configuration snapshot should also 
contain the parameters of the nested
  *   serializers.
  *
- *   Serialization schema of the serializer: the data 
format used by the serializer.
+ *   Serialization schema of the serializer: the binary 
format used by the serializer, or
+ *   in other words, the schema of data written by the serializer.
  * 
  *
  * NOTE: Implementations must contain the default empty nullary 
constructor. This is required to be able to
  * deserialize the configuration snapshot from its binary form.
+ *
+ * @param  The data type that the originating serializer of this 
configuration serializes.
  */
 @PublicEvolving
-public abstract class TypeSerializerConfigSnapshot extends 
VersionedIOReadableWritable {
+public abstract class TypeSerializerConfigSnapshot extends 
VersionedIOReadableWritable {
 
/** The user code class loader; only relevant if this configuration 
instance was deserialized from binary form. */
private ClassLoader userCodeClassLoader;
 
+   /**
+* The originating serializer of this configuration snapshot.
+*
+* TODO to allow for incrementally adapting the implementation of 
serializer config snapshot subclasses,
+* TODO we currently have a base implementation for the {@link 
#restoreSerializer()}
+* TODO method which simply returns this serializer instance. The 
serializer is written
+* TODO and read using Java serialization as part of reading / writing 
the config snapshot
+*/
+   private TypeSe

[jira] [Commented] (FLINK-9377) Remove writing serializers as part of the checkpoint meta information

2018-09-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16627519#comment-16627519
 ] 

ASF GitHub Bot commented on FLINK-9377:
---

dawidwys commented on a change in pull request #6711: [FLINK-9377] [core, state 
backends] Remove serializers from checkpoints
URL: https://github.com/apache/flink/pull/6711#discussion_r220232529
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSchemaCompatibility.java
 ##
 @@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+/**
+ * A {@code TypeSerializerSchemaCompatibility} represents information about 
whether or not a {@link TypeSerializer}
+ * can be safely used to read data written by a previous type serializer.
+ *
+ * Typically, the compatibility of the new serializer is resolved by 
checking it against the snapshotted
+ * {@link TypeSerializerConfigSnapshot} of the previous serializer. Depending 
on the type of the
+ * resolved compatibility result, migration (i.e., reading bytes with the 
previous serializer and then writing
+ * it again with the new serializer) may be required before the new serializer 
can be used.
+ *
+ * @see TypeSerializer
+ * @see TypeSerializerConfigSnapshot#resolveSchemaCompatibility(TypeSerializer)
+ */
+@PublicEvolving
+public class TypeSerializerSchemaCompatibility {
+
+   /**
+* Enum for the type of the compatibility.
+*/
+   enum Type {
+
+   /** This indicates that the new serializer continued to be used 
as is. */
+   COMPATIBLE_AS_IS,
+
+   /**
+* This indicates that it is required to reconfigure the new 
serializer before
+* it can be used. The reconfigured serializer should be 
provided as part of the
+* resolved {@link TypeSerializerSchemaCompatibility} result.
+*/
+   COMPATIBLE_AFTER_RECONFIGURATION,
+
+   /**
+* This indicates that it is possible to use the new serializer 
after performing a
+* full-scan migration over all state, by reading bytes with 
the previous serializer
+* and then writing it again with the new serializer, 
effectively converting the
+* serialization schema to correspond to the new serializer.
+*/
+   COMPATIBLE_AFTER_MIGRATION,
+
+   /**
+* This indicates that the new serializer is incompatible, even 
with migration.
+* This normally implies that the deserialized Java class can 
not be commonly recognized
+* by the previous and new serializer.
+*/
+   INCOMPATIBLE
+   }
+
+   /**
+* The type of the compatibility.
+*/
+   private final Type resultType;
+
+   /**
+* The reconfigured new serializer to use. This is only relevant
+* in the case that the type of the compatibility is {@link 
Type#COMPATIBLE_AFTER_RECONFIGURATION}.
+*/
+   private final TypeSerializer reconfiguredNewSerializer;
+
+   /**
+* Returns a result that indicates that the new serializer is 
compatible and no migration is required.
+* The new serializer can continued to be used as is.
+*
+* @return a result that indicates migration is not required for the 
new serializer.
+*/
+   public static  TypeSerializerSchemaCompatibility compatibleAsIs() 
{
+   return new 
TypeSerializerSchemaCompatibility<>(Type.COMPATIBLE_AS_IS, null);
+   }
+
+   /**
+* Returns a result that indicates that no migration is required, but 
the new serializer had to be
+* reconfigured in order for it to be compatible. A reconfigured 
serializer is provided and
+* should be used instead.
+*
+* @param reconfiguredSerializer the reconfi

[GitHub] dawidwys commented on a change in pull request #6711: [FLINK-9377] [core, state backends] Remove serializers from checkpoints

2018-09-25 Thread GitBox
dawidwys commented on a change in pull request #6711: [FLINK-9377] [core, state 
backends] Remove serializers from checkpoints
URL: https://github.com/apache/flink/pull/6711#discussion_r220211873
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/BackwardsCompatibleConfigSnapshot.java
 ##
 @@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.Objects;
+
+/**
+ * A utility {@link TypeSerializerConfigSnapshot} that is used for backwards 
compatibility purposes.
+ *
+ * In older versions of Flink (<= 1.6), we used to write state serializers 
into checkpoints, along
+ * with the serializer's configuration snapshot. Since 1.7.0, we no longer 
wrote the serializers, but
 
 Review comment:
   `wrote` -> `write`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] dawidwys commented on a change in pull request #6711: [FLINK-9377] [core, state backends] Remove serializers from checkpoints

2018-09-25 Thread GitBox
dawidwys commented on a change in pull request #6711: [FLINK-9377] [core, state 
backends] Remove serializers from checkpoints
URL: https://github.com/apache/flink/pull/6711#discussion_r220203782
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshot.java
 ##
 @@ -21,35 +21,131 @@
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.core.io.VersionedIOReadableWritable;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.util.Preconditions;
 
+import java.io.IOException;
+
 /**
  * A {@code TypeSerializerConfigSnapshot} is a point-in-time view of a {@link 
TypeSerializer's} configuration.
- * The configuration snapshot of a serializer is persisted along with 
checkpoints of the managed state that the
- * serializer is registered to.
+ * The configuration snapshot of a serializer is persisted within checkpoints
+ * as a single source of meta information about the schema of serialized data 
in the checkpoint.
+ * This serves three purposes:
+ *
+ * 
+ *   Capturing serializer parameters and schema: a 
serializer's configuration snapshot
+ *   represents information about the parameters, state, and schema of a 
serializer.
+ *   This is explained in more detail below.
  *
- * The persisted configuration may later on be used by new serializers to 
ensure serialization compatibility
- * for the same managed state. In order for new serializers to be able to 
ensure this, the configuration snapshot
- * should encode sufficient information about:
+ *   Compatibility checks for new serializers: when new 
serializers are available,
+ *   they need to be checked whether or not they are compatible to read the 
data written by the previous serializer.
+ *   This is performed by providing the new serializer to the correspondibng 
serializer configuration
 
 Review comment:
   `correspondibng` -> `corresponding`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] dawidwys commented on a change in pull request #6711: [FLINK-9377] [core, state backends] Remove serializers from checkpoints

2018-09-25 Thread GitBox
dawidwys commented on a change in pull request #6711: [FLINK-9377] [core, state 
backends] Remove serializers from checkpoints
URL: https://github.com/apache/flink/pull/6711#discussion_r220232529
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSchemaCompatibility.java
 ##
 @@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+/**
+ * A {@code TypeSerializerSchemaCompatibility} represents information about 
whether or not a {@link TypeSerializer}
+ * can be safely used to read data written by a previous type serializer.
+ *
+ * Typically, the compatibility of the new serializer is resolved by 
checking it against the snapshotted
+ * {@link TypeSerializerConfigSnapshot} of the previous serializer. Depending 
on the type of the
+ * resolved compatibility result, migration (i.e., reading bytes with the 
previous serializer and then writing
+ * it again with the new serializer) may be required before the new serializer 
can be used.
+ *
+ * @see TypeSerializer
+ * @see TypeSerializerConfigSnapshot#resolveSchemaCompatibility(TypeSerializer)
+ */
+@PublicEvolving
+public class TypeSerializerSchemaCompatibility {
+
+   /**
+* Enum for the type of the compatibility.
+*/
+   enum Type {
+
+   /** This indicates that the new serializer continued to be used 
as is. */
+   COMPATIBLE_AS_IS,
+
+   /**
+* This indicates that it is required to reconfigure the new 
serializer before
+* it can be used. The reconfigured serializer should be 
provided as part of the
+* resolved {@link TypeSerializerSchemaCompatibility} result.
+*/
+   COMPATIBLE_AFTER_RECONFIGURATION,
+
+   /**
+* This indicates that it is possible to use the new serializer 
after performing a
+* full-scan migration over all state, by reading bytes with 
the previous serializer
+* and then writing it again with the new serializer, 
effectively converting the
+* serialization schema to correspond to the new serializer.
+*/
+   COMPATIBLE_AFTER_MIGRATION,
+
+   /**
+* This indicates that the new serializer is incompatible, even 
with migration.
+* This normally implies that the deserialized Java class can 
not be commonly recognized
+* by the previous and new serializer.
+*/
+   INCOMPATIBLE
+   }
+
+   /**
+* The type of the compatibility.
+*/
+   private final Type resultType;
+
+   /**
+* The reconfigured new serializer to use. This is only relevant
+* in the case that the type of the compatibility is {@link 
Type#COMPATIBLE_AFTER_RECONFIGURATION}.
+*/
+   private final TypeSerializer reconfiguredNewSerializer;
+
+   /**
+* Returns a result that indicates that the new serializer is 
compatible and no migration is required.
+* The new serializer can continued to be used as is.
+*
+* @return a result that indicates migration is not required for the 
new serializer.
+*/
+   public static  TypeSerializerSchemaCompatibility compatibleAsIs() 
{
+   return new 
TypeSerializerSchemaCompatibility<>(Type.COMPATIBLE_AS_IS, null);
+   }
+
+   /**
+* Returns a result that indicates that no migration is required, but 
the new serializer had to be
+* reconfigured in order for it to be compatible. A reconfigured 
serializer is provided and
+* should be used instead.
+*
+* @param reconfiguredSerializer the reconfigured new serializer that 
should be used.
+*
+* @return a result that indicates migration is not required, but a 
reconfigured version of the new
+* serializer should be used.
+*/
+   public static  TypeSerializerSchemaC

[jira] [Commented] (FLINK-9377) Remove writing serializers as part of the checkpoint meta information

2018-09-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16627516#comment-16627516
 ] 

ASF GitHub Bot commented on FLINK-9377:
---

dawidwys commented on a change in pull request #6711: [FLINK-9377] [core, state 
backends] Remove serializers from checkpoints
URL: https://github.com/apache/flink/pull/6711#discussion_r220213096
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/BackwardsCompatibleConfigSnapshot.java
 ##
 @@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.Objects;
+
+/**
+ * A utility {@link TypeSerializerConfigSnapshot} that is used for backwards 
compatibility purposes.
+ *
+ * In older versions of Flink (<= 1.6), we used to write state serializers 
into checkpoints, along
+ * with the serializer's configuration snapshot. Since 1.7.0, we no longer 
wrote the serializers, but
+ * instead used the configuration snapshot as a factory to instantiate 
serializers for restoring state.
+ * However, since some outdated implementations of configuration snapshots did 
not contain sufficient
 
 Review comment:
   `did` -> `do` I would use present tense here, as you describe current state, 
don't you?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Remove writing serializers as part of the checkpoint meta information
> -
>
> Key: FLINK-9377
> URL: https://issues.apache.org/jira/browse/FLINK-9377
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> When writing meta information of a state in savepoints, we currently write 
> both the state serializer as well as the state serializer's configuration 
> snapshot.
> Writing both is actually redundant, as most of the time they have identical 
> information.
>  Moreover, the fact that we use Java serialization to write the serializer 
> and rely on it to be re-readable on the restore run, already poses problems 
> for serializers such as the {{AvroSerializer}} (see discussion in FLINK-9202) 
> to perform even a compatible upgrade.
> The proposal here is to leave only the config snapshot as meta information, 
> and use that as the single source of truth of information about the schema of 
> serialized state.
>  The config snapshot should be treated as a factory (or provided to a 
> factory) to re-create serializers capable of reading old, serialized state.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9377) Remove writing serializers as part of the checkpoint meta information

2018-09-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16627521#comment-16627521
 ] 

ASF GitHub Bot commented on FLINK-9377:
---

dawidwys commented on a change in pull request #6711: [FLINK-9377] [core, state 
backends] Remove serializers from checkpoints
URL: https://github.com/apache/flink/pull/6711#discussion_r220212410
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/BackwardsCompatibleConfigSnapshot.java
 ##
 @@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.Objects;
+
+/**
+ * A utility {@link TypeSerializerConfigSnapshot} that is used for backwards 
compatibility purposes.
+ *
+ * In older versions of Flink (<= 1.6), we used to write state serializers 
into checkpoints, along
+ * with the serializer's configuration snapshot. Since 1.7.0, we no longer 
wrote the serializers, but
+ * instead used the configuration snapshot as a factory to instantiate 
serializers for restoring state.
 
 Review comment:
   `used` -> `use`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Remove writing serializers as part of the checkpoint meta information
> -
>
> Key: FLINK-9377
> URL: https://issues.apache.org/jira/browse/FLINK-9377
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> When writing meta information of a state in savepoints, we currently write 
> both the state serializer as well as the state serializer's configuration 
> snapshot.
> Writing both is actually redundant, as most of the time they have identical 
> information.
>  Moreover, the fact that we use Java serialization to write the serializer 
> and rely on it to be re-readable on the restore run, already poses problems 
> for serializers such as the {{AvroSerializer}} (see discussion in FLINK-9202) 
> to perform even a compatible upgrade.
> The proposal here is to leave only the config snapshot as meta information, 
> and use that as the single source of truth of information about the schema of 
> serialized state.
>  The config snapshot should be treated as a factory (or provided to a 
> factory) to re-create serializers capable of reading old, serialized state.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9377) Remove writing serializers as part of the checkpoint meta information

2018-09-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16627518#comment-16627518
 ] 

ASF GitHub Bot commented on FLINK-9377:
---

dawidwys commented on a change in pull request #6711: [FLINK-9377] [core, state 
backends] Remove serializers from checkpoints
URL: https://github.com/apache/flink/pull/6711#discussion_r220211873
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/BackwardsCompatibleConfigSnapshot.java
 ##
 @@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.Objects;
+
+/**
+ * A utility {@link TypeSerializerConfigSnapshot} that is used for backwards 
compatibility purposes.
+ *
+ * In older versions of Flink (<= 1.6), we used to write state serializers 
into checkpoints, along
+ * with the serializer's configuration snapshot. Since 1.7.0, we no longer 
wrote the serializers, but
 
 Review comment:
   `wrote` -> `write`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Remove writing serializers as part of the checkpoint meta information
> -
>
> Key: FLINK-9377
> URL: https://issues.apache.org/jira/browse/FLINK-9377
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> When writing meta information of a state in savepoints, we currently write 
> both the state serializer as well as the state serializer's configuration 
> snapshot.
> Writing both is actually redundant, as most of the time they have identical 
> information.
>  Moreover, the fact that we use Java serialization to write the serializer 
> and rely on it to be re-readable on the restore run, already poses problems 
> for serializers such as the {{AvroSerializer}} (see discussion in FLINK-9202) 
> to perform even a compatible upgrade.
> The proposal here is to leave only the config snapshot as meta information, 
> and use that as the single source of truth of information about the schema of 
> serialized state.
>  The config snapshot should be treated as a factory (or provided to a 
> factory) to re-create serializers capable of reading old, serialized state.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9377) Remove writing serializers as part of the checkpoint meta information

2018-09-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16627520#comment-16627520
 ] 

ASF GitHub Bot commented on FLINK-9377:
---

dawidwys commented on a change in pull request #6711: [FLINK-9377] [core, state 
backends] Remove serializers from checkpoints
URL: https://github.com/apache/flink/pull/6711#discussion_r220203782
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshot.java
 ##
 @@ -21,35 +21,131 @@
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.core.io.VersionedIOReadableWritable;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.util.Preconditions;
 
+import java.io.IOException;
+
 /**
  * A {@code TypeSerializerConfigSnapshot} is a point-in-time view of a {@link 
TypeSerializer's} configuration.
- * The configuration snapshot of a serializer is persisted along with 
checkpoints of the managed state that the
- * serializer is registered to.
+ * The configuration snapshot of a serializer is persisted within checkpoints
+ * as a single source of meta information about the schema of serialized data 
in the checkpoint.
+ * This serves three purposes:
+ *
+ * 
+ *   Capturing serializer parameters and schema: a 
serializer's configuration snapshot
+ *   represents information about the parameters, state, and schema of a 
serializer.
+ *   This is explained in more detail below.
  *
- * The persisted configuration may later on be used by new serializers to 
ensure serialization compatibility
- * for the same managed state. In order for new serializers to be able to 
ensure this, the configuration snapshot
- * should encode sufficient information about:
+ *   Compatibility checks for new serializers: when new 
serializers are available,
+ *   they need to be checked whether or not they are compatible to read the 
data written by the previous serializer.
+ *   This is performed by providing the new serializer to the correspondibng 
serializer configuration
 
 Review comment:
   `correspondibng` -> `corresponding`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Remove writing serializers as part of the checkpoint meta information
> -
>
> Key: FLINK-9377
> URL: https://issues.apache.org/jira/browse/FLINK-9377
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> When writing meta information of a state in savepoints, we currently write 
> both the state serializer as well as the state serializer's configuration 
> snapshot.
> Writing both is actually redundant, as most of the time they have identical 
> information.
>  Moreover, the fact that we use Java serialization to write the serializer 
> and rely on it to be re-readable on the restore run, already poses problems 
> for serializers such as the {{AvroSerializer}} (see discussion in FLINK-9202) 
> to perform even a compatible upgrade.
> The proposal here is to leave only the config snapshot as meta information, 
> and use that as the single source of truth of information about the schema of 
> serialized state.
>  The config snapshot should be treated as a factory (or provided to a 
> factory) to re-create serializers capable of reading old, serialized state.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10292) Generate JobGraph in StandaloneJobClusterEntrypoint only once

2018-09-25 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16627538#comment-16627538
 ] 

vinoyang commented on FLINK-10292:
--

[~uce] Thank you for verifying, my understanding of "image". Then I give some 
of my thoughts about the questions you asked before. In fact, I think we can't 
help but think about running in image(container), and we can't just consider 
the image scene. In my opinion, there is no essential difference between 
standalone and running in docker/k8s (virtualization can be seen as a 
micro-standalone model? Is it also dependent on Zookeeper in the same 
environment).
 * Regarding what you said: Killing rather than canceling the scene, the 
resulting metadata persists, and the same happens in standalone, which can't be 
avoided, because this is not a normal operation.
 * Regarding what you said "update the image with a modified application" may 
cause the new jobGraph to fail to overwrite the old jobgraph, it should not 
happen. Because this can be seen as a scenario of a job upgrade. In order to 
achieve this, our first step should be to execute "cancel with savepoint", then 
the job will be resubmitted (of course we should recognize it and update the 
metadata of the jobgraph stored in Zookeeper), then restore based on savepoint .

My personal opinion is that Flink on k8s does not seem to need to be considered 
separately on this issue.

> Generate JobGraph in StandaloneJobClusterEntrypoint only once
> -
>
> Key: FLINK-10292
> URL: https://issues.apache.org/jira/browse/FLINK-10292
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Assignee: vinoyang
>Priority: Major
> Fix For: 1.7.0, 1.6.2
>
>
> Currently the {{StandaloneJobClusterEntrypoint}} generates the {{JobGraph}} 
> from the given user code every time it starts/is restarted. This can be 
> problematic if the the {{JobGraph}} generation has side effects. Therefore, 
> it would be better to generate the {{JobGraph}} only once and store it in HA 
> storage instead from where to retrieve.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Issue Comment Deleted] (FLINK-10240) Pluggable scheduling strategy for batch jobs

2018-09-25 Thread tison (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10240?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

tison updated FLINK-10240:
--
Comment: was deleted

(was: Introduce pluggable schedule strategy is an excellent idea that could 
expand a lot the cases Flink is able to handle.

I like this idea and can help. However, the document attached above is 
read-only. So I remain my comments as a link to a copy of it below. Most of 
them are layout improvements and minor reword, the body of document is no more 
than the original design.

https://docs.google.com/document/d/15pUYc5_yrY2IwmnADCoNWZwOIYCOcroWmuuZHs-vdlU/edit?usp=sharing

Note that this is a EDITABLE document and everyone interest on it can remains 
comments or edit it directly. As an open source software we just trust our 
contributors and the document could be frozen and left comment-only if the 
discussion reaches a consensus.)

> Pluggable scheduling strategy for batch jobs
> 
>
> Key: FLINK-10240
> URL: https://issues.apache.org/jira/browse/FLINK-10240
> Project: Flink
>  Issue Type: New Feature
>  Components: Distributed Coordination
>Reporter: Zhu Zhu
>Priority: Major
>  Labels: scheduling
>
> Currently batch jobs are scheduled with LAZY_FROM_SOURCES strategy: source 
> tasks are scheduled in the beginning, and other tasks are scheduled once 
> there input data are consumable.
> However, input data consumable does not always mean the task can work at 
> once. 
>  
> One example is the hash join operation, where the operator first consumes one 
> side(we call it build side) to setup a table, then consumes the other side(we 
> call it probe side) to do the real join work. If the probe side is started 
> early, it just get stuck on back pressure as the join operator will not 
> consume data from it before the building stage is done, causing a waste of 
> resources.
> If we have the probe side task started after the build stage is done, both 
> the build and probe side can have more computing resources as they are 
> staggered.
>  
> That's why we think a flexible scheduling strategy is needed, allowing job 
> owners to customize the vertex schedule order and constraints. Better 
> resource utilization usually means better performance.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] asfgit closed pull request #6282: [FLINK-6847][FLINK-6813] [table] TimestampDiff table api and sql support

2018-09-25 Thread GitBox
asfgit closed pull request #6282: [FLINK-6847][FLINK-6813] [table] 
TimestampDiff table api and sql support
URL: https://github.com/apache/flink/pull/6282
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/dev/table/functions.md b/docs/dev/table/functions.md
index eac6d16eaf8..7d9aee22956 100644
--- a/docs/dev/table/functions.md
+++ b/docs/dev/table/functions.md
@@ -3176,6 +3176,18 @@ TIMESTAMPADD(unit, interval, timevalue)
 E.g., TIMESTAMPADD(WEEK, 1, DATE '2003-01-02') returns 
2003-01-09.
   
 
+
+
+  
+{% highlight text %}
+TIMESTAMPDIFF(unit, timestamp1, timestamp2)
+{% endhighlight %}
+  
+  
+Returns the (signed) number of timeUnit intervals between 
timestamp1 and timestamp2. The unit for the interval is given by the unit 
argument, which should be one of the following values: SECOND, 
MINUTE, HOUR, DAY, WEEK, 
MONTH, QUARTER, or YEAR. The unit for 
the interval could refer Time Interval and Point Unit 
Specifiers table. E.g. TIMESTAMPDIFF(DAY, TIMESTAMP '2003-01-02 
10:00:00', TIMESTAMP '2003-01-03 10:00:00') leads to 1.
+  
+
+
   
 
 
@@ -3421,6 +3433,18 @@ dateFormat(TIMESTAMP, STRING)
 E.g., dateFormat(ts, '%Y, %d %M') results in strings 
formatted as "2017, 05 May".
   
 
+
+
+  
+{% highlight java %}
+timestampDiff(TimeIntervalUnit, datetime1, datetime2)
+{% endhighlight %}
+  
+  
+Returns the (signed) number of timeUnit intervals between datetime1 
and datetime2. The unit for the interval is given by the unit argument, which 
should be one of the following values: SECOND, 
MINUTE, HOUR, DAY, MONTH, 
or YEAR. The unit for the interval could refer Time Interval and Point Unit 
Specifiers table. E.g. timestampDiff(TimeIntervalUnit.DAY, 
'2003-01-02 10:00:00'.toTimestamp, '2003-01-03 10:00:00'.toTimestamp) 
leads to 1.
+  
+
+
 
 
 
@@ -3666,6 +3690,18 @@ dateFormat(TIMESTAMP, STRING)
 E.g., dateFormat('ts, "%Y, %d %M") results in strings 
formatted as "2017, 05 May".
   
 
+
+
+  
+{% highlight scala %}
+timestampDiff(TimeIntervalUnit, datetime1, datetime2)
+{% endhighlight %}
+  
+  
+Returns the (signed) number of timeUnit intervals between datetime1 
and datetime2. The unit for the interval is given by the unit argument, which 
should be one of the following values: SECOND, 
MINUTE, HOUR, DAY, MONTH, 
or YEAR. The unit for the interval could refer Time Interval and Point Unit 
Specifiers table. E.g. timestampDiff(TimeIntervalUnit.DAY, 
'2003-01-02 10:00:00'.toTimestamp, '2003-01-03 10:00:00'.toTimestamp) 
leads to 1.
+  
+
+
   
 
 
@@ -5319,4 +5355,65 @@ The following table lists specifiers for date format 
functions.
   
 
 
+Time Interval and Point Unit Specifiers
+--
+
+The following table lists specifiers for time interval and point unit.
+
+
+  
+
+  Interval Unit
+  Point Unit
+
+  
+  
+  YEAR
+  YEAR
+  
+  QUARTER
+  QUARTER
+  
+  MONTH
+  MONTH
+  
+  WEEK
+  WEEK
+  
+  DAY
+  DAY
+  
+  HOUR
+  HOUR
+  
+  MINUTE
+  MINUTE
+  
+  SECOND
+  SECOND
+  
+  YEAR_TO_MONTH
+  MICROSECOND
+  
+  DAY_TO_HOUR
+  MILLISECOND
+  
+  
+  
+  
+  DAY_TO_SECOND
+  
+  
+  HOUR_TO_MINUTE
+  
+  
+  HOUR_TO_SECOND
+  
+  
+  MINUTE_TO_SECOND
+  
+  
+  
+
+
 {% top %}
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
index dfe69cb0411..eb4aad7e40a 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
@@ -26,6 +26,7 @@ import org.apache.flink.table.api.{TableException, 
CurrentRow, CurrentRange, Unb
 import org.apache.flink.table.expressions.ExpressionUtils.{convertArray, 
toMilliInterval, toMonthInterval, toRowInterval}
 import org.apache.flink.table.api.Table
 import org.apache.flink.table.expressions.TimeIntervalUnit.TimeIntervalUnit
+import org.apache.flink.table.expressions.TimePointUnit.TimePointUnit
 import org.apache.flink.table.expressions._
 import org.apache.flink.table.functions.AggregateFunction
 
@@ -1104,6 +1105,34 @@ object dateFormat {
   }
 }
 
+/**
+ * Returns the (signed) number of timeUnit intervals between timestamp1 and 
timestamp2.
+ *
+ * For example timestampDiff(TimeIntervalUnit.DAY, `2016-06-15`.toDate,
+ *  `2016-06-18`.toDate results in integer as 3
+ */
+object timestampDiff {
+
+  /**
+* Returns the (signed) number of timeUnit intervals between timestamp1 and 
timestamp2.
+*
+* For example tim

[jira] [Commented] (FLINK-6847) Add TIMESTAMPDIFF supported in TableAPI

2018-09-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-6847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16627664#comment-16627664
 ] 

ASF GitHub Bot commented on FLINK-6847:
---

asfgit closed pull request #6282: [FLINK-6847][FLINK-6813] [table] 
TimestampDiff table api and sql support
URL: https://github.com/apache/flink/pull/6282
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/dev/table/functions.md b/docs/dev/table/functions.md
index eac6d16eaf8..7d9aee22956 100644
--- a/docs/dev/table/functions.md
+++ b/docs/dev/table/functions.md
@@ -3176,6 +3176,18 @@ TIMESTAMPADD(unit, interval, timevalue)
 E.g., TIMESTAMPADD(WEEK, 1, DATE '2003-01-02') returns 
2003-01-09.
   
 
+
+
+  
+{% highlight text %}
+TIMESTAMPDIFF(unit, timestamp1, timestamp2)
+{% endhighlight %}
+  
+  
+Returns the (signed) number of timeUnit intervals between 
timestamp1 and timestamp2. The unit for the interval is given by the unit 
argument, which should be one of the following values: SECOND, 
MINUTE, HOUR, DAY, WEEK, 
MONTH, QUARTER, or YEAR. The unit for 
the interval could refer Time Interval and Point Unit 
Specifiers table. E.g. TIMESTAMPDIFF(DAY, TIMESTAMP '2003-01-02 
10:00:00', TIMESTAMP '2003-01-03 10:00:00') leads to 1.
+  
+
+
   
 
 
@@ -3421,6 +3433,18 @@ dateFormat(TIMESTAMP, STRING)
 E.g., dateFormat(ts, '%Y, %d %M') results in strings 
formatted as "2017, 05 May".
   
 
+
+
+  
+{% highlight java %}
+timestampDiff(TimeIntervalUnit, datetime1, datetime2)
+{% endhighlight %}
+  
+  
+Returns the (signed) number of timeUnit intervals between datetime1 
and datetime2. The unit for the interval is given by the unit argument, which 
should be one of the following values: SECOND, 
MINUTE, HOUR, DAY, MONTH, 
or YEAR. The unit for the interval could refer Time Interval and Point Unit 
Specifiers table. E.g. timestampDiff(TimeIntervalUnit.DAY, 
'2003-01-02 10:00:00'.toTimestamp, '2003-01-03 10:00:00'.toTimestamp) 
leads to 1.
+  
+
+
 
 
 
@@ -3666,6 +3690,18 @@ dateFormat(TIMESTAMP, STRING)
 E.g., dateFormat('ts, "%Y, %d %M") results in strings 
formatted as "2017, 05 May".
   
 
+
+
+  
+{% highlight scala %}
+timestampDiff(TimeIntervalUnit, datetime1, datetime2)
+{% endhighlight %}
+  
+  
+Returns the (signed) number of timeUnit intervals between datetime1 
and datetime2. The unit for the interval is given by the unit argument, which 
should be one of the following values: SECOND, 
MINUTE, HOUR, DAY, MONTH, 
or YEAR. The unit for the interval could refer Time Interval and Point Unit 
Specifiers table. E.g. timestampDiff(TimeIntervalUnit.DAY, 
'2003-01-02 10:00:00'.toTimestamp, '2003-01-03 10:00:00'.toTimestamp) 
leads to 1.
+  
+
+
   
 
 
@@ -5319,4 +5355,65 @@ The following table lists specifiers for date format 
functions.
   
 
 
+Time Interval and Point Unit Specifiers
+--
+
+The following table lists specifiers for time interval and point unit.
+
+
+  
+
+  Interval Unit
+  Point Unit
+
+  
+  
+  YEAR
+  YEAR
+  
+  QUARTER
+  QUARTER
+  
+  MONTH
+  MONTH
+  
+  WEEK
+  WEEK
+  
+  DAY
+  DAY
+  
+  HOUR
+  HOUR
+  
+  MINUTE
+  MINUTE
+  
+  SECOND
+  SECOND
+  
+  YEAR_TO_MONTH
+  MICROSECOND
+  
+  DAY_TO_HOUR
+  MILLISECOND
+  
+  
+  
+  
+  DAY_TO_SECOND
+  
+  
+  HOUR_TO_MINUTE
+  
+  
+  HOUR_TO_SECOND
+  
+  
+  MINUTE_TO_SECOND
+  
+  
+  
+
+
 {% top %}
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
index dfe69cb0411..eb4aad7e40a 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
@@ -26,6 +26,7 @@ import org.apache.flink.table.api.{TableException, 
CurrentRow, CurrentRange, Unb
 import org.apache.flink.table.expressions.ExpressionUtils.{convertArray, 
toMilliInterval, toMonthInterval, toRowInterval}
 import org.apache.flink.table.api.Table
 import org.apache.flink.table.expressions.TimeIntervalUnit.TimeIntervalUnit
+import org.apache.flink.table.expressions.TimePointUnit.TimePointUnit
 import org.apache.flink.table.expressions._
 import org.apache.flink.table.functions.AggregateFunction
 
@@ -1104,6 +1105,34 @@ object dateFormat {
   }
 }
 
+/**
+ * Returns the (signed) number of timeUnit intervals between timestamp1 and 
timestamp2.
+ *
+ * For example timestamp

[jira] [Resolved] (FLINK-6847) Add TIMESTAMPDIFF supported in TableAPI

2018-09-25 Thread Timo Walther (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-6847?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther resolved FLINK-6847.
-
   Resolution: Fixed
Fix Version/s: 1.7.0

Fixed in 1.7.0: c5ce970e781df60eb27b62446853eaa0579c8706

> Add TIMESTAMPDIFF supported in TableAPI
> ---
>
> Key: FLINK-6847
> URL: https://issues.apache.org/jira/browse/FLINK-6847
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: sunjincheng
>Priority: Major
>  Labels: pull-request-available, starter
> Fix For: 1.7.0
>
>
> see FLINK-6813



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-6813) Add TIMESTAMPDIFF supported in SQL

2018-09-25 Thread Timo Walther (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-6813?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther resolved FLINK-6813.
-
   Resolution: Fixed
Fix Version/s: 1.7.0

Fixed as part of FLINK-6847.

> Add TIMESTAMPDIFF supported in SQL
> --
>
> Key: FLINK-6813
> URL: https://issues.apache.org/jira/browse/FLINK-6813
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: sunjincheng
>Assignee: sunjincheng
>Priority: Major
> Fix For: 1.7.0
>
>
> TIMESTAMPDIFF(unit,datetime_expr1,datetime_expr2) Returns datetime_expr2 − 
> datetime_expr1, where datetime_expr1 and datetime_expr2 are date or datetime 
> expressions. One expression may be a date and the other a datetime; a date 
> value is treated as a datetime having the time part '00:00:00' where 
> necessary. The unit for the result (an integer) is given by the unit 
> argument. The legal values for unit are the same as those listed in the 
> description of the TIMESTAMPADD() function.
> * Syntax
> TIMESTAMPDIFF(unit,datetime_expr1,datetime_expr2) 
> -unit
> Is the part of datetime_expr1 and datetime_expr2 that specifies the type of 
> boundary crossed.
> -datetime_expr1
> Is an expression that can be resolved to a time, date.
> -datetime_expr2
> Same with startdate.
> * Example
> SELECT TIMESTAMPDIFF(year, '2015-12-31 23:59:59.999', '2017-01-01 
> 00:00:00.000')  from tab; --> 2
> * See more:
>   
> [MySQL|https://dev.mysql.com/doc/refman/5.7/en/date-and-time-functions.html#function_timestampdiff]
> CALCITE:
> {code}
>  SELECT timestampdiff(YEAR, timestamp '2019-06-01 07:01:11', timestamp 
> '2020-06-01 07:01:11'),timestampdiff(QUARTER, timestamp '2019-06-01 
> 07:01:11', timestamp '2020-06-01 07:01:11'),timestampdiff(MONTH, timestamp 
> '2019-06-01 07:01:11',timestamp '2020-06-01 07:01:11'),timestampdiff(WEEK, 
> timestamp '2019-06-01 07:01:11',timestamp '2020-06-01 
> 07:01:11'),timestampdiff(DAY, timestamp '2019-06-01 07:01:11',timestamp 
> '2020-06-01 07:01:11'),timestampdiff(HOUR, timestamp '2019-06-01 
> 07:01:11',timestamp '2020-06-01 07:01:11'),timestampdiff(MINUTE, timestamp 
> '2019-06-01 07:01:11',timestamp '2020-06-01 07:01:11'),timestampdiff(SECOND, 
> timestamp '2019-06-01 07:01:11',timestamp '2020-06-01 07:01:11') FROM depts;
> | 1 | 4 | 12 | **52** | 366| 8784| 527040 | 
> 31622400  
> {code}
> MSSQL:
> {code}
> SELECT
>   datediff(YEAR, '2019-06-01 07:01:11','2020-06-01 07:01:11'),
>   datediff(QUARTER, '2019-06-01 07:01:11', '2020-06-01 07:01:11'),
>   datediff(MONTH, '2019-06-01 07:01:11','2020-06-01 07:01:11'),
>   datediff(WEEK, '2019-06-01 07:01:11', '2020-06-01 07:01:11'),
>   datediff(DAY, '2019-06-01 07:01:11','2020-06-01 07:01:11'),
>   datediff(HOUR, '2019-06-01 07:01:11','2020-06-01 07:01:11'),
>   datediff(MINUTE, '2019-06-01 07:01:11','2020-06-01 07:01:11'),
>   datediff(SECOND,  '2019-06-01 07:01:11', '2020-06-01 07:01:11')
> FROM stu;
> |1|4  |12 |**53** |366|8784   |527040 |31622400
> {code}
> The differences I have discussed with the calcite community. And find the 
> reason: 
> https://stackoverflow.com/questions/26138167/is-timestampdiff-in-mysql-equivalent-to-datediff-in-sql-server.
> So, In this JIRA. we will keep consistency with calcite.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] asfgit closed pull request #6725: [FLINK-10263] [sql-client] Fix classloader issues in SQL Client

2018-09-25 Thread GitBox
asfgit closed pull request #6725: [FLINK-10263] [sql-client] Fix classloader 
issues in SQL Client
URL: https://github.com/apache/flink/pull/6725
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-end-to-end-tests/test-scripts/test_sql_client.sh 
b/flink-end-to-end-tests/test-scripts/test_sql_client.sh
index b5830725db1..ca0251365e6 100755
--- a/flink-end-to-end-tests/test-scripts/test_sql_client.sh
+++ b/flink-end-to-end-tests/test-scripts/test_sql_client.sh
@@ -212,6 +212,8 @@ tables:
 type: VARCHAR
   - name: duplicate_count
 type: BIGINT
+  - name: constant
+type: VARCHAR
 connector:
   type: filesystem
   path: $RESULT
@@ -226,6 +228,8 @@ tables:
   type: VARCHAR
 - name: duplicate_count
   type: BIGINT
+- name: constant
+  type: VARCHAR
 
 functions:
   - name: RegReplace
@@ -261,7 +265,7 @@ $FLINK_DIR/bin/sql-client.sh embedded \
 
 read -r -d '' SQL_STATEMENT_2 << EOF
 INSERT INTO CsvSinkTable
-  SELECT *
+  SELECT AvroBothTable.*, RegReplace('Test constant folding.', 'Test', 
'Success') AS constant
   FROM AvroBothTable
 EOF
 
@@ -285,4 +289,4 @@ for i in {1..10}; do
   sleep 5
 done
 
-check_result_hash "SQLClient" $RESULT "dca08a82cc09f6b19950291dbbef16bb"
+check_result_hash "SQLClient" $RESULT "0a1bf8bf716069b7269f575f87a802c0"
diff --git 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
index 85b3e9265a8..552d0b37dca 100644
--- 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
+++ 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
@@ -75,6 +75,7 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.function.Supplier;
 
 /**
  * Context for executing table programs. This class caches everything that can 
be cached across
@@ -183,6 +184,19 @@ public EnvironmentInstance createEnvironmentInstance() {
return tableSinks;
}
 
+   /**
+* Executes the given supplier using the execution context's 
classloader as thread classloader.
+*/
+   public  R wrapClassLoader(Supplier supplier) {
+   final ClassLoader previousClassloader = 
Thread.currentThread().getContextClassLoader();
+   Thread.currentThread().setContextClassLoader(classLoader);
+   try {
+   return supplier.get();
+   } finally {
+   
Thread.currentThread().setContextClassLoader(previousClassloader);
+   }
+   }
+
// 

 
private static CommandLine createCommandLine(Deployment deployment, 
Options commandLineOptions) {
diff --git 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
index 3b9e8e99b82..1318043faf1 100644
--- 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
+++ 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
@@ -219,14 +219,16 @@ public TableSchema getTableSchema(SessionContext session, 
String name) throws Sq
 
@Override
public String explainStatement(SessionContext session, String 
statement) throws SqlExecutionException {
-   final TableEnvironment tableEnv = 
getOrCreateExecutionContext(session)
+   final ExecutionContext context = 
getOrCreateExecutionContext(session);
+   final TableEnvironment tableEnv = context
.createEnvironmentInstance()
.getTableEnvironment();
 
// translate
try {
final Table table = createTable(tableEnv, statement);
-   return tableEnv.explain(table);
+   // explanation requires an optimization step that might 
reference UDFs during code compilation
+   return context.wrapClassLoader(() -> 
tableEnv.explain(table));
} catch (Throwable t) {
// catch everything such that the query does not crash 
the executor
throw new SqlExecutionException("Invalid SQL 
statement.", t);
@@ -242,7 +244,7 @@ public ResultDe

[jira] [Commented] (FLINK-10263) User-defined function with LITERAL paramters yields CompileException

2018-09-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10263?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16627711#comment-16627711
 ] 

ASF GitHub Bot commented on FLINK-10263:


asfgit closed pull request #6725: [FLINK-10263] [sql-client] Fix classloader 
issues in SQL Client
URL: https://github.com/apache/flink/pull/6725
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-end-to-end-tests/test-scripts/test_sql_client.sh 
b/flink-end-to-end-tests/test-scripts/test_sql_client.sh
index b5830725db1..ca0251365e6 100755
--- a/flink-end-to-end-tests/test-scripts/test_sql_client.sh
+++ b/flink-end-to-end-tests/test-scripts/test_sql_client.sh
@@ -212,6 +212,8 @@ tables:
 type: VARCHAR
   - name: duplicate_count
 type: BIGINT
+  - name: constant
+type: VARCHAR
 connector:
   type: filesystem
   path: $RESULT
@@ -226,6 +228,8 @@ tables:
   type: VARCHAR
 - name: duplicate_count
   type: BIGINT
+- name: constant
+  type: VARCHAR
 
 functions:
   - name: RegReplace
@@ -261,7 +265,7 @@ $FLINK_DIR/bin/sql-client.sh embedded \
 
 read -r -d '' SQL_STATEMENT_2 << EOF
 INSERT INTO CsvSinkTable
-  SELECT *
+  SELECT AvroBothTable.*, RegReplace('Test constant folding.', 'Test', 
'Success') AS constant
   FROM AvroBothTable
 EOF
 
@@ -285,4 +289,4 @@ for i in {1..10}; do
   sleep 5
 done
 
-check_result_hash "SQLClient" $RESULT "dca08a82cc09f6b19950291dbbef16bb"
+check_result_hash "SQLClient" $RESULT "0a1bf8bf716069b7269f575f87a802c0"
diff --git 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
index 85b3e9265a8..552d0b37dca 100644
--- 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
+++ 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
@@ -75,6 +75,7 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.function.Supplier;
 
 /**
  * Context for executing table programs. This class caches everything that can 
be cached across
@@ -183,6 +184,19 @@ public EnvironmentInstance createEnvironmentInstance() {
return tableSinks;
}
 
+   /**
+* Executes the given supplier using the execution context's 
classloader as thread classloader.
+*/
+   public  R wrapClassLoader(Supplier supplier) {
+   final ClassLoader previousClassloader = 
Thread.currentThread().getContextClassLoader();
+   Thread.currentThread().setContextClassLoader(classLoader);
+   try {
+   return supplier.get();
+   } finally {
+   
Thread.currentThread().setContextClassLoader(previousClassloader);
+   }
+   }
+
// 

 
private static CommandLine createCommandLine(Deployment deployment, 
Options commandLineOptions) {
diff --git 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
index 3b9e8e99b82..1318043faf1 100644
--- 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
+++ 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
@@ -219,14 +219,16 @@ public TableSchema getTableSchema(SessionContext session, 
String name) throws Sq
 
@Override
public String explainStatement(SessionContext session, String 
statement) throws SqlExecutionException {
-   final TableEnvironment tableEnv = 
getOrCreateExecutionContext(session)
+   final ExecutionContext context = 
getOrCreateExecutionContext(session);
+   final TableEnvironment tableEnv = context
.createEnvironmentInstance()
.getTableEnvironment();
 
// translate
try {
final Table table = createTable(tableEnv, statement);
-   return tableEnv.explain(table);
+   // explanation requires an optimization step that might 
reference UDFs during code compilation
+   return context.wrapClassLoader(() -> 
tableEnv.explain(tabl

[GitHub] tweise opened a new pull request #6761: [FLINK-10279] [documentation] Make jython limitations more obvious in documentation.

2018-09-25 Thread GitBox
tweise opened a new pull request #6761: [FLINK-10279] [documentation] Make 
jython limitations more obvious in documentation.
URL: https://github.com/apache/flink/pull/6761
 
 
   Purpose of this change is to better highlight the potential restrictions of 
the Jython based streaming API to avoid user confusion.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10279) Make jython limitations more obvious in documentation

2018-09-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16627726#comment-16627726
 ] 

ASF GitHub Bot commented on FLINK-10279:


tweise commented on issue #6761: [FLINK-10279] [documentation] Make jython 
limitations more obvious in documentation.
URL: https://github.com/apache/flink/pull/6761#issuecomment-424442918
 
 
   CC: @StephanEwen @aljoscha @zentol 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Make jython limitations more obvious in documentation
> -
>
> Key: FLINK-10279
> URL: https://issues.apache.org/jira/browse/FLINK-10279
> Project: Flink
>  Issue Type: Task
>  Components: Documentation, Python API
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>  Labels: pull-request-available
>
> The "Python Programming Guide (Streaming) Beta" at 
> [https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/python.html]
>  does not sufficiently highlight limitations of the API. It should probably 
> have a prominent disclaimer right at the top stating that this actually isn't 
> a "Python" API but Jython, which likely means that the user looking for a 
> solution to run native Python code won't be able to use many important 
> libraries, which is often the reason to look for Python support in first 
> place.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] tweise commented on issue #6761: [FLINK-10279] [documentation] Make jython limitations more obvious in documentation.

2018-09-25 Thread GitBox
tweise commented on issue #6761: [FLINK-10279] [documentation] Make jython 
limitations more obvious in documentation.
URL: https://github.com/apache/flink/pull/6761#issuecomment-424442918
 
 
   CC: @StephanEwen @aljoscha @zentol 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10279) Make jython limitations more obvious in documentation

2018-09-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16627725#comment-16627725
 ] 

ASF GitHub Bot commented on FLINK-10279:


tweise opened a new pull request #6761: [FLINK-10279] [documentation] Make 
jython limitations more obvious in documentation.
URL: https://github.com/apache/flink/pull/6761
 
 
   Purpose of this change is to better highlight the potential restrictions of 
the Jython based streaming API to avoid user confusion.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Make jython limitations more obvious in documentation
> -
>
> Key: FLINK-10279
> URL: https://issues.apache.org/jira/browse/FLINK-10279
> Project: Flink
>  Issue Type: Task
>  Components: Documentation, Python API
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>  Labels: pull-request-available
>
> The "Python Programming Guide (Streaming) Beta" at 
> [https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/python.html]
>  does not sufficiently highlight limitations of the API. It should probably 
> have a prominent disclaimer right at the top stating that this actually isn't 
> a "Python" API but Jython, which likely means that the user looking for a 
> solution to run native Python code won't be able to use many important 
> libraries, which is often the reason to look for Python support in first 
> place.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10279) Make jython limitations more obvious in documentation

2018-09-25 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10279?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-10279:
---
Labels: pull-request-available  (was: )

> Make jython limitations more obvious in documentation
> -
>
> Key: FLINK-10279
> URL: https://issues.apache.org/jira/browse/FLINK-10279
> Project: Flink
>  Issue Type: Task
>  Components: Documentation, Python API
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>  Labels: pull-request-available
>
> The "Python Programming Guide (Streaming) Beta" at 
> [https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/python.html]
>  does not sufficiently highlight limitations of the API. It should probably 
> have a prominent disclaimer right at the top stating that this actually isn't 
> a "Python" API but Jython, which likely means that the user looking for a 
> solution to run native Python code won't be able to use many important 
> libraries, which is often the reason to look for Python support in first 
> place.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-10292) Generate JobGraph in StandaloneJobClusterEntrypoint only once

2018-09-25 Thread Ufuk Celebi (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16626190#comment-16626190
 ] 

Ufuk Celebi edited comment on FLINK-10292 at 9/25/18 6:24 PM:
--

I understand that non-determinism may be an issue when generating the 
{{JobGraph}}, but do we have some data about how common that is for 
applications? Would it be possible to keep a fixed JobGraph in the image 
instead of persisting one in the {{SubmittedJobGraphStore}}?

I like our current approach, because it keeps the source of truth with 
image-based deployments such as Kubernetes *in* the image instead of the 
{{SubmittedJobGraphStore}}. I'm wondering about the following scenario in 
particular (this is independent of the question whether it runs on Kubernetes 
or not and can be reproduced in an other way as well):
 * A user creates a job cluster with high availability enabled (cluster ID for 
the logical application, e.g. myapp)
 ** This will persist the job with a fixed ID (after FLINK-10291) on first 
submission
 * The user kills the application *without* cancelling
 ** This will leave all data in the high availability store(s) such as job 
graphs or checkpoints
 * The user updates the image with a modified application and keeps the high 
availability configuration (e.g. cluster ID stays myapp)
 ** This will result in the job in the image to be ignored since we already 
have a job graph with the same (fixed) ID

I think in such a scenario it can be desirable to still have the checkpoints 
available, but it might be problematic if the job graph is recovered from the 
{{SubmittedJobGraphStore}} instead of using the job that is part of the image. 
What do you think about this scenario? Is it the responsibility of the user to 
handle this? If so, I think that the approach outlined in this ticket makes 
sense. If not, we may want to consider alternatives or ignore potential 
non-determinism.


was (Author: uce):
I understand that non-determinism may be an issue when generating the 
{{JobGraph}}, but do we have some data about how common that is for 
applications? Would it be possible to keep a fixed JobGraph in the image 
instead of persisting one in the {{SubmittedJobGraphStore}}?

I like our current approach, because it keeps the source of truth for the job 
in the image instead of the {{SubmittedJobGraphStore}}. I'm wondering about the 
following scenario:
 * A user creates a job cluster with high availability enabled (cluster ID for 
the logical application, e.g. myapp)
 ** This will persist the job with a fixed ID (after FLINK-10291) on first 
submission
 * The user kills the application *without* cancelling
 ** This will leave all data in the high availability store(s) such as job 
graphs or checkpoints
 * The user updates the image with a modified application and keeps the high 
availability configuration (e.g. cluster ID stays myapp)
 ** This will result in the job in the image to be ignored since we already 
have a job graph with the same (fixed) ID

I think in such a scenario it can be desirable to still have the checkpoints 
available, but it might be problematic if the job graph is recovered from the 
{{SubmittedJobGraphStore}} instead of using the job that is part of the image. 
What do you think about this scenario? Is it the responsibility of the user to 
handle this? If so, I think that the approach outlined in this ticket makes 
sense. If not, we may want to consider alternatives or ignore potential 
non-determinism.

> Generate JobGraph in StandaloneJobClusterEntrypoint only once
> -
>
> Key: FLINK-10292
> URL: https://issues.apache.org/jira/browse/FLINK-10292
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Assignee: vinoyang
>Priority: Major
> Fix For: 1.7.0, 1.6.2
>
>
> Currently the {{StandaloneJobClusterEntrypoint}} generates the {{JobGraph}} 
> from the given user code every time it starts/is restarted. This can be 
> problematic if the the {{JobGraph}} generation has side effects. Therefore, 
> it would be better to generate the {{JobGraph}} only once and store it in HA 
> storage instead from where to retrieve.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10292) Generate JobGraph in StandaloneJobClusterEntrypoint only once

2018-09-25 Thread Ufuk Celebi (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16627766#comment-16627766
 ] 

Ufuk Celebi commented on FLINK-10292:
-

[~yanghua] I didn't mean to confuse you with the image and Kubernetes example. 
I just took it as *one* example for when the {{StandaloneJobClusterEntryPoint}} 
is used. My main question was independent of this as you also pointed out.

Thanks for your input. To summarize your answer, you think that in such a 
scenario, it should be the **responsibility of the user** to clean up the HA 
data.

> Generate JobGraph in StandaloneJobClusterEntrypoint only once
> -
>
> Key: FLINK-10292
> URL: https://issues.apache.org/jira/browse/FLINK-10292
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Assignee: vinoyang
>Priority: Major
> Fix For: 1.7.0, 1.6.2
>
>
> Currently the {{StandaloneJobClusterEntrypoint}} generates the {{JobGraph}} 
> from the given user code every time it starts/is restarted. This can be 
> problematic if the the {{JobGraph}} generation has side effects. Therefore, 
> it would be better to generate the {{JobGraph}} only once and store it in HA 
> storage instead from where to retrieve.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-10225) Cannot access state from a empty taskmanager

2018-09-25 Thread Pierre Zemb (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16620743#comment-16620743
 ] 

Pierre Zemb edited comment on FLINK-10225 at 9/25/18 6:40 PM:
--

Hi! I've been reading the code a bit, and I have a question.

>From what I can see, only the TaskExecutor has enough knowledge to make a call 
>to the ResourceManager. He is also the only one that is updating the 
>ConcurrentHashMap used by the RPC handler. Meaning that when I'm inside the 
>RPC handler, I cannot find a way to nicely trigger a method of TaskExecutor.

 

I like the way the interface for KvStateClientProxy is implemented, and I don't 
want to change , do you have an idea on how could I implement this?

 

cc [~till.rohrmann]


was (Author: pierrez):
Hi! I've been reading the code a bit, and I have a question.

>From what I can see, only the TaskExecutor has enough knowledge to make a call 
>to the ResourceManager. He is also the only one that is updating the 
>ConcurrentHashMap used by the RPC handler. Meaning that when I'm inside the 
>RPC handler, I cannot find a way to nicely trigger a method of TaskExecutor.

 

I like the way the interface for KvStateClientProxy is implemented, and I don't 
want to change , do you have an idea on how could I implement this?

> Cannot access state from a empty taskmanager
> 
>
> Key: FLINK-10225
> URL: https://issues.apache.org/jira/browse/FLINK-10225
> Project: Flink
>  Issue Type: Bug
>  Components: Queryable State
>Affects Versions: 1.5.3, 1.6.0
> Environment: 4tm and 1jm for now on 1.6.0
>Reporter: Pierre Zemb
>Priority: Critical
>
> Hi!
> I've started to deploy a small Flink cluster (4tm and 1jm for now on 1.6.0), 
> and deployed a small job on it. Because of the current load, job is 
> completely handled by a single tm. I've created a small proxy that is using 
> [QueryableStateClient|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/queryablestate/client/QueryableStateClient.html]
>  to access the current state. It is working nicely, except under certain 
> circumstances. It seems to me that I can only access the state through a node 
> that is holding a part of the job. Here's an example:
>  * job on tm1. Pointing QueryableStateClient to tm1. State accessible
>  * job still on tm1. Pointing QueryableStateClient to tm2 (for example). 
> State inaccessible
>  * killing tm1, job is now on tm2. State accessible
>  * job still on tm2. Pointing QueryableStateClient to tm3. State inaccessible
>  * adding some parallelism to spread job on tm1 and tm2. Pointing 
> QueryableStateClient to either tm1 and tm2 is working
>  * job still on tm1 and tm2. Pointing QueryableStateClient to tm3. State 
> inaccessible
> When the state is inaccessible, I can see this (generated 
> [here|https://github.com/apache/flink/blob/release-1.6/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java#L228]):
> {code:java}
> java.lang.RuntimeException: Failed request 0. Caused by: 
> org.apache.flink.queryablestate.exceptions.UnknownLocationException: Could 
> not retrieve location of state=repo-status of 
> job=3ac3bc00b2d5bc0752917186a288d40a. Potential reasons are: i) the state is 
> not ready, or ii) the job does not exist. at 
> org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.getKvStateLookupInfo(KvStateClientProxyHandler.java:228)
>  at 
> org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.getState(KvStateClientProxyHandler.java:162)
>  at 
> org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.executeActionAsync(KvStateClientProxyHandler.java:129)
>  at 
> org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.handleRequest(KvStateClientProxyHandler.java:119)
>  at 
> org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.handleRequest(KvStateClientProxyHandler.java:63)
>  at 
> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:236)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
> at java.util.concurrent.FutureTask.run(FutureTask.java:266) at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  at java.lang.Thread.run(Thread.java:745)
> {code}
> Went a bit through the (master branch) code. Class KvStateClientProxy is 
> holding {color:#33}kvStateLocationOracle the key-value state location 
> oracle for the given JobID. Here's the usage{color}{color:#33}:{color}
>  * {color:#33}updateKvStateLocationOracle() in 

[jira] [Commented] (FLINK-10406) Port JobManagerTest to new code base

2018-09-25 Thread tison (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16627806#comment-16627806
 ] 

tison commented on FLINK-10406:
---

* {{testStopSignal}} and  {{testStopSignalFail}} are covered by 
{{ExecutionGraphStopTest}}. High level invocation at {{Dispatcher}} and 
{{JobMaster}} level are trivial.
 *

> Port JobManagerTest to new code base
> 
>
> Key: FLINK-10406
> URL: https://issues.apache.org/jira/browse/FLINK-10406
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.7.0
>Reporter: tison
>Assignee: tison
>Priority: Major
> Fix For: 1.7.0
>
>
> Port {{JobManagerTest}} to new code base
> Not all of its tests should be ported, since some of them are covered by 
> {{JobMasterTest}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-10406) Port JobManagerTest to new code base

2018-09-25 Thread tison (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16627806#comment-16627806
 ] 

tison edited comment on FLINK-10406 at 9/25/18 7:13 PM:


* {{testStopSignal}} and  {{testStopSignalFail}} are covered by 
{{ExecutionGraphStopTest}}. High level invocation at {{Dispatcher}} and 
{{JobMaster}} level are trivial.
 * {{testNullHostnameGoesToLocalhost}} is ported to {{AkkaUtilsTest#"null 
hostname should go to localhost"}}


was (Author: tison):
* {{testStopSignal}} and  {{testStopSignalFail}} are covered by 
{{ExecutionGraphStopTest}}. High level invocation at {{Dispatcher}} and 
{{JobMaster}} level are trivial.
 *

> Port JobManagerTest to new code base
> 
>
> Key: FLINK-10406
> URL: https://issues.apache.org/jira/browse/FLINK-10406
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.7.0
>Reporter: tison
>Assignee: tison
>Priority: Major
> Fix For: 1.7.0
>
>
> Port {{JobManagerTest}} to new code base
> Not all of its tests should be ported, since some of them are covered by 
> {{JobMasterTest}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-10406) Port JobManagerTest to new code base

2018-09-25 Thread tison (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16627806#comment-16627806
 ] 

tison edited comment on FLINK-10406 at 9/25/18 7:22 PM:


* {{testStopSignal}} and  {{testStopSignalFail}} are covered by 
{{ExecutionGraphStopTest}}. High level invocation at {{Dispatcher}} and 
{{JobMaster}} level are trivial.
 * {{testNullHostnameGoesToLocalhost}} is ported to {{AkkaUtilsTest#"null 
hostname should go to localhost"}}
 * {{testRequestPartitionState*}} I would propose to ignore all of them since 
we have FLINK-10319. It proposed to disable {{JobMaster#requestPartitionState}} 
and have one approval and no objection yet. Also cc [~trohrm...@apache.org], 
could you take a look at FLINK-10319 so that we could make the decision of this 
removal?


was (Author: tison):
* {{testStopSignal}} and  {{testStopSignalFail}} are covered by 
{{ExecutionGraphStopTest}}. High level invocation at {{Dispatcher}} and 
{{JobMaster}} level are trivial.
 * {{testNullHostnameGoesToLocalhost}} is ported to {{AkkaUtilsTest#"null 
hostname should go to localhost"}}

> Port JobManagerTest to new code base
> 
>
> Key: FLINK-10406
> URL: https://issues.apache.org/jira/browse/FLINK-10406
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.7.0
>Reporter: tison
>Assignee: tison
>Priority: Major
> Fix For: 1.7.0
>
>
> Port {{JobManagerTest}} to new code base
> Not all of its tests should be ported, since some of them are covered by 
> {{JobMasterTest}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-10406) Port JobManagerTest to new code base

2018-09-25 Thread tison (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16627806#comment-16627806
 ] 

tison edited comment on FLINK-10406 at 9/25/18 7:28 PM:


* {{testStopSignal}} and  {{testStopSignalFail}} are covered by 
{{ExecutionGraphStopTest}}. High level invocation at {{Dispatcher}} and 
{{JobMaster}} level are trivial.
 * {{testNullHostnameGoesToLocalhost}} is ported to {{AkkaUtilsTest#"null 
hostname should go to localhost"}}
 * {{testRequestPartitionState*}} I would propose to ignore all of them since 
we have FLINK-10319. It proposed to disable {{JobMaster#requestPartitionState}} 
and have one approval and no objection yet. Also cc [~trohrm...@apache.org], 
could you take a look at FLINK-10319 so that we could make the decision of this 
removal? (UPDATE: even without FLINK-10319 accepted, these tests should be 
covered by {{JobMasterTest#testRequestPartitionState}} and {{TaskTest#...}})


was (Author: tison):
* {{testStopSignal}} and  {{testStopSignalFail}} are covered by 
{{ExecutionGraphStopTest}}. High level invocation at {{Dispatcher}} and 
{{JobMaster}} level are trivial.
 * {{testNullHostnameGoesToLocalhost}} is ported to {{AkkaUtilsTest#"null 
hostname should go to localhost"}}
 * {{testRequestPartitionState*}} I would propose to ignore all of them since 
we have FLINK-10319. It proposed to disable {{JobMaster#requestPartitionState}} 
and have one approval and no objection yet. Also cc [~trohrm...@apache.org], 
could you take a look at FLINK-10319 so that we could make the decision of this 
removal?

> Port JobManagerTest to new code base
> 
>
> Key: FLINK-10406
> URL: https://issues.apache.org/jira/browse/FLINK-10406
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.7.0
>Reporter: tison
>Assignee: tison
>Priority: Major
> Fix For: 1.7.0
>
>
> Port {{JobManagerTest}} to new code base
> Not all of its tests should be ported, since some of them are covered by 
> {{JobMasterTest}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10423) Forward RocksDB memory metrics to Flink metrics reporter

2018-09-25 Thread Monal Daxini (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16627866#comment-16627866
 ] 

Monal Daxini commented on FLINK-10423:
--

It will be great to have this ported to 1.6 release as well. 

Spoke with [~srichter] about this offline.

> Forward RocksDB memory metrics to Flink metrics reporter 
> -
>
> Key: FLINK-10423
> URL: https://issues.apache.org/jira/browse/FLINK-10423
> Project: Flink
>  Issue Type: New Feature
>  Components: Metrics, State Backends, Checkpointing
>Reporter: Seth Wiesman
>Assignee: Seth Wiesman
>Priority: Major
>
> RocksDB contains a number of metrics at the column family level about current 
> memory usage, open memtables,  etc that would be useful to users wishing 
> greater insight what rocksdb is doing. This work is inspired heavily by the 
> comments on this rocksdb issue thread 
> (https://github.com/facebook/rocksdb/issues/3216#issuecomment-348779233)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-10423) Forward RocksDB memory metrics to Flink metrics reporter

2018-09-25 Thread Monal Daxini (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16627866#comment-16627866
 ] 

Monal Daxini edited comment on FLINK-10423 at 9/25/18 8:15 PM:
---

It will be great to have this ported to 1.6 release.

Spoke with [~srichter] about this offline.


was (Author: mdaxini):
It will be great to have this ported to 1.6 release as well. 

Spoke with [~srichter] about this offline.

> Forward RocksDB memory metrics to Flink metrics reporter 
> -
>
> Key: FLINK-10423
> URL: https://issues.apache.org/jira/browse/FLINK-10423
> Project: Flink
>  Issue Type: New Feature
>  Components: Metrics, State Backends, Checkpointing
>Reporter: Seth Wiesman
>Assignee: Seth Wiesman
>Priority: Major
>
> RocksDB contains a number of metrics at the column family level about current 
> memory usage, open memtables,  etc that would be useful to users wishing 
> greater insight what rocksdb is doing. This work is inspired heavily by the 
> comments on this rocksdb issue thread 
> (https://github.com/facebook/rocksdb/issues/3216#issuecomment-348779233)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9363) Bump up the Jackson version

2018-09-25 Thread Ted Yu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9363?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated FLINK-9363:
--
Description: 
CVE's for Jackson :

CVE-2017-17485
CVE-2018-5968
CVE-2018-7489


We can upgrade to 2.9.5

  was:
CVE's for Jackson :

CVE-2017-17485
CVE-2018-5968
CVE-2018-7489

We can upgrade to 2.9.5


> Bump up the Jackson version
> ---
>
> Key: FLINK-9363
> URL: https://issues.apache.org/jira/browse/FLINK-9363
> Project: Flink
>  Issue Type: Improvement
>Reporter: Ted Yu
>Assignee: vinoyang
>Priority: Major
>  Labels: security
>
> CVE's for Jackson :
> CVE-2017-17485
> CVE-2018-5968
> CVE-2018-7489
> We can upgrade to 2.9.5



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] chunhui-shi commented on issue #6684: [FLINK-10205] Batch Job: InputSplit Fault tolerant for DataSource…

2018-09-25 Thread GitBox
chunhui-shi commented on issue #6684: [FLINK-10205] Batch Job: InputSplit 
Fault tolerant for DataSource…
URL: https://github.com/apache/flink/pull/6684#issuecomment-424544286
 
 
   Nice work. Just that the stored history of split 'inputSplits' does not have 
to be a full history. If we understand that the list is short enough, then it 
is fine to ship it as is. +1.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10205) Batch Job: InputSplit Fault tolerant for DataSourceTask

2018-09-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16628084#comment-16628084
 ] 

ASF GitHub Bot commented on FLINK-10205:


chunhui-shi commented on issue #6684: [FLINK-10205] Batch Job: InputSplit 
Fault tolerant for DataSource…
URL: https://github.com/apache/flink/pull/6684#issuecomment-424544286
 
 
   Nice work. Just that the stored history of split 'inputSplits' does not have 
to be a full history. If we understand that the list is short enough, then it 
is fine to ship it as is. +1.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Batch Job: InputSplit Fault tolerant for DataSourceTask
> ---
>
> Key: FLINK-10205
> URL: https://issues.apache.org/jira/browse/FLINK-10205
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Reporter: JIN SUN
>Assignee: JIN SUN
>Priority: Major
>  Labels: pull-request-available
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Today DataSource Task pull InputSplits from JobManager to achieve better 
> performance, however, when a DataSourceTask failed and rerun, it will not get 
> the same splits as its previous version. this will introduce inconsistent 
> result or even data corruption.
> Furthermore,  if there are two executions run at the same time (in batch 
> scenario), this two executions should process same splits.
> we need to fix the issue to make the inputs of a DataSourceTask 
> deterministic. The propose is save all splits into ExecutionVertex and 
> DataSourceTask will pull split from there.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10292) Generate JobGraph in StandaloneJobClusterEntrypoint only once

2018-09-25 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16628139#comment-16628139
 ] 

vinoyang commented on FLINK-10292:
--

Hi [~uce] Thank you for your clarification on "image", that doesn't have much 
impact. My general idea is that Flink runs in a virtualized environment and the 
standalone environment is not much different in nature, and can be treated 
equally.

[~till.rohrmann], what do you think about our discussion, if you have time, 
please help to review the PR of FLINK-10291, it should be no dispute.

> Generate JobGraph in StandaloneJobClusterEntrypoint only once
> -
>
> Key: FLINK-10292
> URL: https://issues.apache.org/jira/browse/FLINK-10292
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Assignee: vinoyang
>Priority: Major
> Fix For: 1.7.0, 1.6.2
>
>
> Currently the {{StandaloneJobClusterEntrypoint}} generates the {{JobGraph}} 
> from the given user code every time it starts/is restarted. This can be 
> problematic if the the {{JobGraph}} generation has side effects. Therefore, 
> it would be better to generate the {{JobGraph}} only once and store it in HA 
> storage instead from where to retrieve.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10406) Port JobManagerTest to new code base

2018-09-25 Thread tison (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16628148#comment-16628148
 ] 

tison commented on FLINK-10406:
---

- {{testSavepointRestoreSettings}} is covered by 
{{JobMaster#testRestoringFromSavepoint}}

> Port JobManagerTest to new code base
> 
>
> Key: FLINK-10406
> URL: https://issues.apache.org/jira/browse/FLINK-10406
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.7.0
>Reporter: tison
>Assignee: tison
>Priority: Major
> Fix For: 1.7.0
>
>
> Port {{JobManagerTest}} to new code base
> Not all of its tests should be ported, since some of them are covered by 
> {{JobMasterTest}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] Myasuka commented on issue #5984: [FLINK-9328][state] Fix RocksDBStateBackend restore problem due to StateBackendTestBase not registering snapshots

2018-09-25 Thread GitBox
Myasuka commented on issue #5984: [FLINK-9328][state] Fix RocksDBStateBackend 
restore problem due to StateBackendTestBase not registering snapshots
URL: https://github.com/apache/flink/pull/5984#issuecomment-424561812
 
 
   @azagrebin Sure, since the test code already fixed, just close this PR.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9328) RocksDBStateBackend might use PlaceholderStreamStateHandle to restor due to StateBackendTestBase class not register snapshots in some UTs

2018-09-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9328?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16628169#comment-16628169
 ] 

ASF GitHub Bot commented on FLINK-9328:
---

Myasuka commented on issue #5984: [FLINK-9328][state] Fix RocksDBStateBackend 
restore problem due to StateBackendTestBase not registering snapshots
URL: https://github.com/apache/flink/pull/5984#issuecomment-424561812
 
 
   @azagrebin Sure, since the test code already fixed, just close this PR.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> RocksDBStateBackend might use PlaceholderStreamStateHandle to restor due to 
> StateBackendTestBase class not register snapshots in some UTs
> -
>
> Key: FLINK-9328
> URL: https://issues.apache.org/jira/browse/FLINK-9328
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.4.2
>Reporter: Yun Tang
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.5.5
>
>
> Currently, StateBackendTestBase class does not register snapshots to 
> SharedStateRegistry in testValueState, testListState, testReducingState, 
> testFoldingState and testMapState UTs, which may cause RocksDBStateBackend to 
> restore from PlaceholderStreamStateHandle during the 2nd restore procedure if 
> one specific sst file both existed in the 1st snapshot and the 2nd snapshot 
> handle.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] XuQianJin-Stars commented on issue #6710: [FLINK-10134] UTF-16 support for TextInputFormat bug fixed

2018-09-25 Thread GitBox
XuQianJin-Stars commented on issue #6710: [FLINK-10134] UTF-16 support for 
TextInputFormat bug fixed
URL: https://github.com/apache/flink/pull/6710#issuecomment-424565714
 
 
   @StephanEwen Hello, regarding the two questions you raised yesterday, I have 
some opinions about myself and I don’t know if it’s right.
   1.Where should the BOM be read?I think it is still necessary to increase the 
logic for processing the bom when the file is started at the beginning of the 
file. Add an attribute to the read bom encoding logic to record the file bom 
encoding.For example: put it in the function `createInputSplits`.
   2.Regarding the second performance problem, you can use the previously 
generated bom code to judge UTF8 with bom, UTF16 wuth bom, UTF32 with bom, and 
control the byte size to process the end of each line, because I found The 
previous bug garbled is actually a coding problem, one is caused by improper 
processing of the end byte of each line. I have done the following for this 
problem:
   `String utf8 = "UTF-8";`
   `String utf16 = "UTF-16";`
   `String utf32 = "UTF-32";`
   `int stepSize = 0;`
   `String charsetName = this.getCharsetName();`
   `if (charsetName.contains(utf8)) {`
   ​`stepSize = 1;`
   `} else if (charsetName.contains(utf16)) {`
   ​`stepSize = 2;`
   `} else if (charsetName.contains(utf32)) {`
   ​`stepSize = 4;`
   `}`
   `//Check if \n is used as delimiter and the end of this line is a \r, then 
remove \r from the line`
   `if (this.getDelimiter() != null && this.getDelimiter().length == 1`
   ​`&& this.getDelimiter()[0] == NEW_LINE && offset + numBytes >= 
stepSize`
   ​`&& bytes[offset + numBytes - stepSize] == CARRIAGE_RETURN) {`
   ​`numBytes -= stepSize;`
   `}`
   `numBytes = numBytes - stepSize + 1;`
   `return new String(bytes, offset, numBytes, this.getCharsetName());`
   These are some of my own ideas. I hope that you can give some better 
suggestions and handle this jira better. Thank you.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10134) UTF-16 support for TextInputFormat

2018-09-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16628183#comment-16628183
 ] 

ASF GitHub Bot commented on FLINK-10134:


XuQianJin-Stars commented on issue #6710: [FLINK-10134] UTF-16 support for 
TextInputFormat bug fixed
URL: https://github.com/apache/flink/pull/6710#issuecomment-424565714
 
 
   @StephanEwen Hello, regarding the two questions you raised yesterday, I have 
some opinions about myself and I don’t know if it’s right.
   1.Where should the BOM be read?I think it is still necessary to increase the 
logic for processing the bom when the file is started at the beginning of the 
file. Add an attribute to the read bom encoding logic to record the file bom 
encoding.For example: put it in the function `createInputSplits`.
   2.Regarding the second performance problem, you can use the previously 
generated bom code to judge UTF8 with bom, UTF16 wuth bom, UTF32 with bom, and 
control the byte size to process the end of each line, because I found The 
previous bug garbled is actually a coding problem, one is caused by improper 
processing of the end byte of each line. I have done the following for this 
problem:
   `String utf8 = "UTF-8";`
   `String utf16 = "UTF-16";`
   `String utf32 = "UTF-32";`
   `int stepSize = 0;`
   `String charsetName = this.getCharsetName();`
   `if (charsetName.contains(utf8)) {`
   ​`stepSize = 1;`
   `} else if (charsetName.contains(utf16)) {`
   ​`stepSize = 2;`
   `} else if (charsetName.contains(utf32)) {`
   ​`stepSize = 4;`
   `}`
   `//Check if \n is used as delimiter and the end of this line is a \r, then 
remove \r from the line`
   `if (this.getDelimiter() != null && this.getDelimiter().length == 1`
   ​`&& this.getDelimiter()[0] == NEW_LINE && offset + numBytes >= 
stepSize`
   ​`&& bytes[offset + numBytes - stepSize] == CARRIAGE_RETURN) {`
   ​`numBytes -= stepSize;`
   `}`
   `numBytes = numBytes - stepSize + 1;`
   `return new String(bytes, offset, numBytes, this.getCharsetName());`
   These are some of my own ideas. I hope that you can give some better 
suggestions and handle this jira better. Thank you.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> UTF-16 support for TextInputFormat
> --
>
> Key: FLINK-10134
> URL: https://issues.apache.org/jira/browse/FLINK-10134
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.4.2
>Reporter: David Dreyfus
>Priority: Blocker
>  Labels: pull-request-available
>
> It does not appear that Flink supports a charset encoding of "UTF-16". It 
> particular, it doesn't appear that Flink consumes the Byte Order Mark (BOM) 
> to establish whether a UTF-16 file is UTF-16LE or UTF-16BE.
>  
> TextInputFormat.setCharset("UTF-16") calls DelimitedInputFormat.setCharset(), 
> which sets TextInputFormat.charsetName and then modifies the previously set 
> delimiterString to construct the proper byte string encoding of the the 
> delimiter. This same charsetName is also used in TextInputFormat.readRecord() 
> to interpret the bytes read from the file.
>  
> There are two problems that this implementation would seem to have when using 
> UTF-16.
>  # delimiterString.getBytes(getCharset()) in DelimitedInputFormat.java will 
> return a Big Endian byte sequence including the Byte Order Mark (BOM). The 
> actual text file will not contain a BOM at each line ending, so the delimiter 
> will never be read. Moreover, if the actual byte encoding of the file is 
> Little Endian, the bytes will be interpreted incorrectly.
>  # TextInputFormat.readRecord() will not see a BOM each time it decodes a 
> byte sequence with the String(bytes, offset, numBytes, charset) call. 
> Therefore, it will assume Big Endian, which may not always be correct. [1] 
> [https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java#L95]
>  
> While there are likely many solutions, I would think that all of them would 
> have to start by reading the BOM from the file when a Split is opened and 
> then using that BOM to modify the specified encoding to a BOM specific one 
> when the caller doesn't specify one, and to overwrite the caller's 
> specification if the BOM is in conflict with the caller's specification. That 
> is, if the BOM indicates Little Endian and the caller indicates UTF-16BE, 
> Flink should rewrite the charsetName as UTF-16LE.
>  I hope this makes sense and that I haven't been testing incorre

[jira] [Comment Edited] (FLINK-10406) Port JobManagerTest to new code base

2018-09-25 Thread tison (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16628148#comment-16628148
 ] 

tison edited comment on FLINK-10406 at 9/26/18 2:57 AM:


- {{testSavepointRestoreSettings}} is covered by 
{{JobMaster#testRestoringFromSavepoint}}

the {{triggerSavepoint}} part is covered by {{JobMasterTriggerSavepointIT}}, 
and the submit failure part should be taken care of when port 
{{JobSubmitTest}}, which has a test {{testAnswerFailureWhenSavepointReadFails}}


was (Author: tison):
- {{testSavepointRestoreSettings}} is covered by 
{{JobMaster#testRestoringFromSavepoint}}

> Port JobManagerTest to new code base
> 
>
> Key: FLINK-10406
> URL: https://issues.apache.org/jira/browse/FLINK-10406
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.7.0
>Reporter: tison
>Assignee: tison
>Priority: Major
> Fix For: 1.7.0
>
>
> Port {{JobManagerTest}} to new code base
> Not all of its tests should be ported, since some of them are covered by 
> {{JobMasterTest}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10427) Port JobSubmitTest to new code base

2018-09-25 Thread tison (JIRA)
tison created FLINK-10427:
-

 Summary: Port JobSubmitTest to new code base
 Key: FLINK-10427
 URL: https://issues.apache.org/jira/browse/FLINK-10427
 Project: Flink
  Issue Type: Sub-task
  Components: Tests
Affects Versions: 1.7.0
Reporter: tison
Assignee: tison
 Fix For: 1.7.0


Port {{JobSubmitTest}} to new code base.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10406) Port JobManagerTest to new code base

2018-09-25 Thread tison (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16628197#comment-16628197
 ] 

tison commented on FLINK-10406:
---

- {{testCancelWithSavepoint}} is covered by 
{{JobMasterTriggerSavepointIT#testStopJobAfterSavepoint}}

> Port JobManagerTest to new code base
> 
>
> Key: FLINK-10406
> URL: https://issues.apache.org/jira/browse/FLINK-10406
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.7.0
>Reporter: tison
>Assignee: tison
>Priority: Major
> Fix For: 1.7.0
>
>
> Port {{JobManagerTest}} to new code base
> Not all of its tests should be ported, since some of them are covered by 
> {{JobMasterTest}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10420) Create and drop view in sql client should check the view created based on the configuration.

2018-09-25 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16628208#comment-16628208
 ] 

vinoyang commented on FLINK-10420:
--

[~hequn8128] What do you think about this issue?

> Create and drop view in sql client should check the view created based on the 
> configuration.
> 
>
> Key: FLINK-10420
> URL: https://issues.apache.org/jira/browse/FLINK-10420
> Project: Flink
>  Issue Type: Bug
>  Components: SQL Client
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Minor
>
> Currently, just checked current session : 
> {code:java}
> private void callCreateView(SqlCommandCall cmdCall) {
>final String name = cmdCall.operands[0];
>final String query = cmdCall.operands[1];
>//here
>final String previousQuery = context.getViews().get(name);
>if (previousQuery != null) {
>   printExecutionError(CliStrings.MESSAGE_VIEW_ALREADY_EXISTS);
>   return;
>}
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-10406) Port JobManagerTest to new code base

2018-09-25 Thread tison (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16628197#comment-16628197
 ] 

tison edited comment on FLINK-10406 at 9/26/18 3:26 AM:


- {{testCancelWithSavepoint}} is covered by 
{{JobMasterTriggerSavepointIT#testStopJobAfterSavepoint}}

- {{testCancelWithSavepointNoDirectoriesConfigured}} is somehow covered by 
{{JobMasterTriggerSavepointIT#testDoNotCancelJobIfSavepointFails}}. Now we 
don't provide detail error message to dig out the cause a savepoint fails. 
{{testDoNotCancelJobIfSavepointFails}} tests if the savepoint path permission 
denied, but change it to a /not/exist/path provide the same process.


was (Author: tison):
- {{testCancelWithSavepoint}} is covered by 
{{JobMasterTriggerSavepointIT#testStopJobAfterSavepoint}}

> Port JobManagerTest to new code base
> 
>
> Key: FLINK-10406
> URL: https://issues.apache.org/jira/browse/FLINK-10406
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.7.0
>Reporter: tison
>Assignee: tison
>Priority: Major
> Fix For: 1.7.0
>
>
> Port {{JobManagerTest}} to new code base
> Not all of its tests should be ported, since some of them are covered by 
> {{JobMasterTest}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-10406) Port JobManagerTest to new code base

2018-09-25 Thread tison (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16628197#comment-16628197
 ] 

tison edited comment on FLINK-10406 at 9/26/18 3:27 AM:


- {{testCancelWithSavepoint}} is covered by 
{{JobMasterTriggerSavepointIT#testStopJobAfterSavepoint}}

- {{testCancelWithSavepointNoDirectoriesConfigured}} is somehow covered by 
{{JobMasterTriggerSavepointIT#testDoNotCancelJobIfSavepointFails}}. Now we 
don't provide detail error message to dig out the cause a savepoint fails. 
{{testDoNotCancelJobIfSavepointFails}} tests if the savepoint path permission 
denied, but change it to a /not/exist/path provide the same process.

the exception stringified as "java.util.concurrent.ExecutionException: 
java.util.concurrent.CompletionException: 
org.apache.flink.runtime.checkpoint.CheckpointTriggerException: Failed to 
trigger savepoint. Decline reason: An Exception occurred while triggering the 
checkpoint."


was (Author: tison):
- {{testCancelWithSavepoint}} is covered by 
{{JobMasterTriggerSavepointIT#testStopJobAfterSavepoint}}

- {{testCancelWithSavepointNoDirectoriesConfigured}} is somehow covered by 
{{JobMasterTriggerSavepointIT#testDoNotCancelJobIfSavepointFails}}. Now we 
don't provide detail error message to dig out the cause a savepoint fails. 
{{testDoNotCancelJobIfSavepointFails}} tests if the savepoint path permission 
denied, but change it to a /not/exist/path provide the same process.

> Port JobManagerTest to new code base
> 
>
> Key: FLINK-10406
> URL: https://issues.apache.org/jira/browse/FLINK-10406
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.7.0
>Reporter: tison
>Assignee: tison
>Priority: Major
> Fix For: 1.7.0
>
>
> Port {{JobManagerTest}} to new code base
> Not all of its tests should be ported, since some of them are covered by 
> {{JobMasterTest}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-10406) Port JobManagerTest to new code base

2018-09-25 Thread tison (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16628197#comment-16628197
 ] 

tison edited comment on FLINK-10406 at 9/26/18 3:41 AM:


- {{testCancelWithSavepoint}} is covered by 
{{JobMasterTriggerSavepointIT#testStopJobAfterSavepoint}}

- {{testCancelWithSavepointNoDirectoriesConfigured}} is somehow covered by 
{{JobMasterTriggerSavepointIT#testDoNotCancelJobIfSavepointFails}}. Now we 
don't provide detail error message to dig out the cause a savepoint fails. 
{{testDoNotCancelJobIfSavepointFails}} tests if the savepoint path permission 
denied, but change it to a /not/exist/path provide the same process.

the exception stringified as "java.util.concurrent.ExecutionException: 
java.util.concurrent.CompletionException: 
org.apache.flink.runtime.checkpoint.CheckpointTriggerException: Failed to 
trigger savepoint. Decline reason: An Exception occurred while triggering the 
checkpoint."

- {{testCancelJobWithSavepointFailurePeriodicCheckpoints}} is covered by 
{{JobMasterTriggerSavepointIT#testDoNotCancelJobIfSavepointFails}}.


was (Author: tison):
- {{testCancelWithSavepoint}} is covered by 
{{JobMasterTriggerSavepointIT#testStopJobAfterSavepoint}}

- {{testCancelWithSavepointNoDirectoriesConfigured}} is somehow covered by 
{{JobMasterTriggerSavepointIT#testDoNotCancelJobIfSavepointFails}}. Now we 
don't provide detail error message to dig out the cause a savepoint fails. 
{{testDoNotCancelJobIfSavepointFails}} tests if the savepoint path permission 
denied, but change it to a /not/exist/path provide the same process.

the exception stringified as "java.util.concurrent.ExecutionException: 
java.util.concurrent.CompletionException: 
org.apache.flink.runtime.checkpoint.CheckpointTriggerException: Failed to 
trigger savepoint. Decline reason: An Exception occurred while triggering the 
checkpoint."

> Port JobManagerTest to new code base
> 
>
> Key: FLINK-10406
> URL: https://issues.apache.org/jira/browse/FLINK-10406
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.7.0
>Reporter: tison
>Assignee: tison
>Priority: Major
> Fix For: 1.7.0
>
>
> Port {{JobManagerTest}} to new code base
> Not all of its tests should be ported, since some of them are covered by 
> {{JobMasterTest}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10392) Remove legacy mode

2018-09-25 Thread tison (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10392?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16628234#comment-16628234
 ] 

tison commented on FLINK-10392:
---

[~till.rohrmann] I am afraid that if we add every test porting job a sub task 
then we will get a long list even before we start the removal of legacy project 
production file. It is acceptable or we can do some squash work?

> Remove legacy mode
> --
>
> Key: FLINK-10392
> URL: https://issues.apache.org/jira/browse/FLINK-10392
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
> Fix For: 1.7.0
>
>
> This issue is the umbrella issue to remove the legacy mode code from Flink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] zhijiangW opened a new pull request #6762: [FLINK-10339][network] Use off-heap memory for SpillReadBufferPool

2018-09-25 Thread GitBox
zhijiangW opened a new pull request #6762: [FLINK-10339][network] Use off-heap 
memory for SpillReadBufferPool
URL: https://github.com/apache/flink/pull/6762
 
 
   ## What is the purpose of the change
   
   Currently the `NetworkBufferPool` already uses off-heap memory directly. But 
the `BufferPool` for `SpilledSubpartitionView` still uses heap memory by 
default. It should keep the same behavior with `NetworkBufferPool` and it may 
get benefit for reusing this off-heap memory in netty stack during transporting.
   
   ## Brief change log
   
 - *Use off-heap memory directly for `SpillReadBufferPool` in 
`SpilledSubpartitionView`.*
   
   ## Verifying this change
   
   This change is already covered by existing tests, such as 
*SpillableSubpartitionTest*.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (yes)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10339) SpillReadBufferPool cannot use off-heap memory

2018-09-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16628295#comment-16628295
 ] 

ASF GitHub Bot commented on FLINK-10339:


zhijiangW opened a new pull request #6762: [FLINK-10339][network] Use off-heap 
memory for SpillReadBufferPool
URL: https://github.com/apache/flink/pull/6762
 
 
   ## What is the purpose of the change
   
   Currently the `NetworkBufferPool` already uses off-heap memory directly. But 
the `BufferPool` for `SpilledSubpartitionView` still uses heap memory by 
default. It should keep the same behavior with `NetworkBufferPool` and it may 
get benefit for reusing this off-heap memory in netty stack during transporting.
   
   ## Brief change log
   
 - *Use off-heap memory directly for `SpillReadBufferPool` in 
`SpilledSubpartitionView`.*
   
   ## Verifying this change
   
   This change is already covered by existing tests, such as 
*SpillableSubpartitionTest*.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (yes)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> SpillReadBufferPool cannot use off-heap memory
> --
>
> Key: FLINK-10339
> URL: https://issues.apache.org/jira/browse/FLINK-10339
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.5.3, 1.6.0, 1.7.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>  Labels: pull-request-available
>
> Currently, the {{NetworkBufferPool}} always uses off-heap memory to reduce 
> memory copy from flink {{Buffer}} to netty internal {{ByteBuf}} during 
> transporting on sender side.
>  
> But for {{SpillReadBufferPool}} in {{SpilledSubpartitionView}}, it still uses 
> heap memory for caching. We can make it as off-heap by default similar with 
> {{NetworkBufferPool}} or decide the type by the current parameter 
> {{taskmanager.memory.off-heap.}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10339) SpillReadBufferPool cannot use off-heap memory

2018-09-25 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10339?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-10339:
---
Labels: pull-request-available  (was: )

> SpillReadBufferPool cannot use off-heap memory
> --
>
> Key: FLINK-10339
> URL: https://issues.apache.org/jira/browse/FLINK-10339
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.5.3, 1.6.0, 1.7.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>  Labels: pull-request-available
>
> Currently, the {{NetworkBufferPool}} always uses off-heap memory to reduce 
> memory copy from flink {{Buffer}} to netty internal {{ByteBuf}} during 
> transporting on sender side.
>  
> But for {{SpillReadBufferPool}} in {{SpilledSubpartitionView}}, it still uses 
> heap memory for caching. We can make it as off-heap by default similar with 
> {{NetworkBufferPool}} or decide the type by the current parameter 
> {{taskmanager.memory.off-heap.}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8851) SQL Client fails if same file is used as default and env configuration

2018-09-25 Thread xuqianjin (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8851?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16628299#comment-16628299
 ] 

xuqianjin commented on FLINK-8851:
--

I verified that the same configuration file does have this bug, I will try to 
fix this bug.

> SQL Client fails if same file is used as default and env configuration
> --
>
> Key: FLINK-8851
> URL: https://issues.apache.org/jira/browse/FLINK-8851
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.5.0
>Reporter: Fabian Hueske
>Assignee: Timo Walther
>Priority: Critical
> Fix For: 1.5.5
>
>
> Specifying the same file as default and environment configuration yields the 
> following exception
> {code:java}
> Exception in thread "main" org.apache.flink.table.client.SqlClientException: 
> Unexpected exception. This is a bug. Please consider filing an issue.
>     at org.apache.flink.table.client.SqlClient.main(SqlClient.java:156)
> Caused by: java.lang.UnsupportedOperationException
>     at java.util.AbstractMap.put(AbstractMap.java:209)
>     at java.util.AbstractMap.putAll(AbstractMap.java:281)
>     at 
> org.apache.flink.table.client.config.Environment.merge(Environment.java:107)
>     at 
> org.apache.flink.table.client.gateway.local.LocalExecutor.createEnvironment(LocalExecutor.java:461)
>     at 
> org.apache.flink.table.client.gateway.local.LocalExecutor.listTables(LocalExecutor.java:203)
>     at 
> org.apache.flink.table.client.cli.CliClient.callShowTables(CliClient.java:270)
>     at org.apache.flink.table.client.cli.CliClient.open(CliClient.java:198)
>     at org.apache.flink.table.client.SqlClient.start(SqlClient.java:97)
>     at org.apache.flink.table.client.SqlClient.main(SqlClient.java:146){code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-8851) SQL Client fails if same file is used as default and env configuration

2018-09-25 Thread xuqianjin (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8851?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16628299#comment-16628299
 ] 

xuqianjin edited comment on FLINK-8851 at 9/26/18 6:36 AM:
---

I verified in the 1.6 version that the same configuration file does have this 
bug, I will try to fix this bug.


was (Author: x1q1j1):
I verified that the same configuration file does have this bug, I will try to 
fix this bug.

> SQL Client fails if same file is used as default and env configuration
> --
>
> Key: FLINK-8851
> URL: https://issues.apache.org/jira/browse/FLINK-8851
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.5.0
>Reporter: Fabian Hueske
>Assignee: Timo Walther
>Priority: Critical
> Fix For: 1.5.5
>
>
> Specifying the same file as default and environment configuration yields the 
> following exception
> {code:java}
> Exception in thread "main" org.apache.flink.table.client.SqlClientException: 
> Unexpected exception. This is a bug. Please consider filing an issue.
>     at org.apache.flink.table.client.SqlClient.main(SqlClient.java:156)
> Caused by: java.lang.UnsupportedOperationException
>     at java.util.AbstractMap.put(AbstractMap.java:209)
>     at java.util.AbstractMap.putAll(AbstractMap.java:281)
>     at 
> org.apache.flink.table.client.config.Environment.merge(Environment.java:107)
>     at 
> org.apache.flink.table.client.gateway.local.LocalExecutor.createEnvironment(LocalExecutor.java:461)
>     at 
> org.apache.flink.table.client.gateway.local.LocalExecutor.listTables(LocalExecutor.java:203)
>     at 
> org.apache.flink.table.client.cli.CliClient.callShowTables(CliClient.java:270)
>     at org.apache.flink.table.client.cli.CliClient.open(CliClient.java:198)
>     at org.apache.flink.table.client.SqlClient.start(SqlClient.java:97)
>     at org.apache.flink.table.client.SqlClient.main(SqlClient.java:146){code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Issue Comment Deleted] (FLINK-8851) SQL Client fails if same file is used as default and env configuration

2018-09-25 Thread xuqianjin (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-8851?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xuqianjin updated FLINK-8851:
-
Comment: was deleted

(was: I verified in the 1.6 version that the same configuration file does have 
this bug, I will try to fix this bug.)

> SQL Client fails if same file is used as default and env configuration
> --
>
> Key: FLINK-8851
> URL: https://issues.apache.org/jira/browse/FLINK-8851
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.5.0
>Reporter: Fabian Hueske
>Assignee: Timo Walther
>Priority: Critical
> Fix For: 1.5.5
>
>
> Specifying the same file as default and environment configuration yields the 
> following exception
> {code:java}
> Exception in thread "main" org.apache.flink.table.client.SqlClientException: 
> Unexpected exception. This is a bug. Please consider filing an issue.
>     at org.apache.flink.table.client.SqlClient.main(SqlClient.java:156)
> Caused by: java.lang.UnsupportedOperationException
>     at java.util.AbstractMap.put(AbstractMap.java:209)
>     at java.util.AbstractMap.putAll(AbstractMap.java:281)
>     at 
> org.apache.flink.table.client.config.Environment.merge(Environment.java:107)
>     at 
> org.apache.flink.table.client.gateway.local.LocalExecutor.createEnvironment(LocalExecutor.java:461)
>     at 
> org.apache.flink.table.client.gateway.local.LocalExecutor.listTables(LocalExecutor.java:203)
>     at 
> org.apache.flink.table.client.cli.CliClient.callShowTables(CliClient.java:270)
>     at org.apache.flink.table.client.cli.CliClient.open(CliClient.java:198)
>     at org.apache.flink.table.client.SqlClient.start(SqlClient.java:97)
>     at org.apache.flink.table.client.SqlClient.main(SqlClient.java:146){code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] StephanEwen commented on issue #6755: [FLINK-10412] toString field in AbstractID should be transient to avoid been serialized

2018-09-25 Thread GitBox
StephanEwen commented on issue #6755: [FLINK-10412] toString field in 
AbstractID should be transient to avoid been serialized
URL: https://github.com/apache/flink/pull/6755#issuecomment-424229947
 
 
   I would like to not do this change.
   
   I fear that there are some corner cases in which Java serialization is still 
used and where this can break setups. The benefit of this change seems not big 
enough to risk that.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10292) Generate JobGraph in StandaloneJobClusterEntrypoint only once

2018-09-25 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16626884#comment-16626884
 ] 

vinoyang commented on FLINK-10292:
--

[~uce] First of all, I have a bit of doubt about your comment. What do you mean 
by "image" here? The scenario you describe is assuming it is started in docker? 

> Generate JobGraph in StandaloneJobClusterEntrypoint only once
> -
>
> Key: FLINK-10292
> URL: https://issues.apache.org/jira/browse/FLINK-10292
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Assignee: vinoyang
>Priority: Major
> Fix For: 1.7.0, 1.6.2
>
>
> Currently the {{StandaloneJobClusterEntrypoint}} generates the {{JobGraph}} 
> from the given user code every time it starts/is restarted. This can be 
> problematic if the the {{JobGraph}} generation has side effects. Therefore, 
> it would be better to generate the {{JobGraph}} only once and store it in HA 
> storage instead from where to retrieve.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10412) toString field in AbstractID should be transient to avoid been serialized

2018-09-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10412?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16626885#comment-16626885
 ] 

ASF GitHub Bot commented on FLINK-10412:


StephanEwen commented on issue #6755: [FLINK-10412] toString field in 
AbstractID should be transient to avoid been serialized
URL: https://github.com/apache/flink/pull/6755#issuecomment-424229947
 
 
   I would like to not do this change.
   
   I fear that there are some corner cases in which Java serialization is still 
used and where this can break setups. The benefit of this change seems not big 
enough to risk that.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> toString field in AbstractID should be transient to avoid been serialized
> -
>
> Key: FLINK-10412
> URL: https://issues.apache.org/jira/browse/FLINK-10412
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Zhu Zhu
>Assignee: vinoyang
>Priority: Major
>  Labels: deploy,deployment, pull-request-available, serialization
>
> The toString field in AbstractID will be serialized currently, which makes 
> RPC messages body like InputChannelDeploymentDescriptor and PartitionInfo 
> larger (50%+).
> It adds more pressure to JM memory especially in large scale job scheduling 
> (1x1 ALL-to-ALL connection).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] yanghua commented on issue #6755: [FLINK-10412] toString field in AbstractID should be transient to avoid been serialized

2018-09-25 Thread GitBox
yanghua commented on issue #6755: [FLINK-10412] toString field in AbstractID 
should be transient to avoid been serialized
URL: https://github.com/apache/flink/pull/6755#issuecomment-424231570
 
 
   hi @StephanEwen How can it "break setup" without serializing it? It's just 
an internal variable that improves the performance of the `toString()` method. 
Other than that, it hasn't been used elsewhere. And when `toString()` is called 
again, it will be generated again, and `toString()` sets the behavior of this 
variable to be idempotent.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10412) toString field in AbstractID should be transient to avoid been serialized

2018-09-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10412?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16626897#comment-16626897
 ] 

ASF GitHub Bot commented on FLINK-10412:


yanghua commented on issue #6755: [FLINK-10412] toString field in AbstractID 
should be transient to avoid been serialized
URL: https://github.com/apache/flink/pull/6755#issuecomment-424231570
 
 
   hi @StephanEwen How can it "break setup" without serializing it? It's just 
an internal variable that improves the performance of the `toString()` method. 
Other than that, it hasn't been used elsewhere. And when `toString()` is called 
again, it will be generated again, and `toString()` sets the behavior of this 
variable to be idempotent.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> toString field in AbstractID should be transient to avoid been serialized
> -
>
> Key: FLINK-10412
> URL: https://issues.apache.org/jira/browse/FLINK-10412
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Zhu Zhu
>Assignee: vinoyang
>Priority: Major
>  Labels: deploy,deployment, pull-request-available, serialization
>
> The toString field in AbstractID will be serialized currently, which makes 
> RPC messages body like InputChannelDeploymentDescriptor and PartitionInfo 
> larger (50%+).
> It adds more pressure to JM memory especially in large scale job scheduling 
> (1x1 ALL-to-ALL connection).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] tillrohrmann commented on a change in pull request #6751: [FLINK-10403] Port JobManagerHAProcessFailureBatchRecoveryITCase to new code base

2018-09-25 Thread GitBox
tillrohrmann commented on a change in pull request #6751: [FLINK-10403] Port 
JobManagerHAProcessFailureBatchRecoveryITCase to new code base
URL: https://github.com/apache/flink/pull/6751#discussion_r220079997
 
 

 ##
 File path: 
flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
 ##
 @@ -212,7 +216,8 @@ public void flatMap(Long value, Collector out) 
throws Exception {
}
 
@Test
-   public void testJobManagerProcessFailure() throws Exception {
+   public void testDispatcherProcessFailure() throws Exception {
 
 Review comment:
   Thanks for pointing this out @TisonKun. I will look into it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10403) Port JobManagerHAProcessFailureBatchRecoveryITCase to new code base

2018-09-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10403?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16626899#comment-16626899
 ] 

ASF GitHub Bot commented on FLINK-10403:


tillrohrmann commented on a change in pull request #6751: [FLINK-10403] Port 
JobManagerHAProcessFailureBatchRecoveryITCase to new code base
URL: https://github.com/apache/flink/pull/6751#discussion_r220079997
 
 

 ##
 File path: 
flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
 ##
 @@ -212,7 +216,8 @@ public void flatMap(Long value, Collector out) 
throws Exception {
}
 
@Test
-   public void testJobManagerProcessFailure() throws Exception {
+   public void testDispatcherProcessFailure() throws Exception {
 
 Review comment:
   Thanks for pointing this out @TisonKun. I will look into it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Port JobManagerHAProcessFailureBatchRecoveryITCase to new code base
> ---
>
> Key: FLINK-10403
> URL: https://issues.apache.org/jira/browse/FLINK-10403
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.7.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Port {{JobManagerHAProcessFailureBatchRecoveryITCase}} to new code base.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] kleingeist commented on a change in pull request #6727: [FLINK-10371] Allow to enable SSL mutual authentication on REST endpoints by configuration

2018-09-25 Thread GitBox
kleingeist commented on a change in pull request #6727: [FLINK-10371] Allow to 
enable SSL mutual authentication on REST endpoints by configuration
URL: https://github.com/apache/flink/pull/6727#discussion_r220081405
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java
 ##
 @@ -78,6 +78,29 @@ public void checkEnableSSL() {
assertFalse(SSLUtils.isRestSSLEnabled(precedence));
}
 
+   /**
+* Tests whether activation of REST mutual SSL authentication evaluates 
the config flags correctly.
+*/
+   @SuppressWarnings("deprecation")
 
 Review comment:
   the `SSL_ENABLED` option is deprecated, but using the annotation is a bad 
way of handling this, I should rather use the new options


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10371) Allow to enable SSL mutual authentication on REST endpoints by configuration

2018-09-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10371?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16626908#comment-16626908
 ] 

ASF GitHub Bot commented on FLINK-10371:


kleingeist commented on a change in pull request #6727: [FLINK-10371] Allow to 
enable SSL mutual authentication on REST endpoints by configuration
URL: https://github.com/apache/flink/pull/6727#discussion_r220081405
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java
 ##
 @@ -78,6 +78,29 @@ public void checkEnableSSL() {
assertFalse(SSLUtils.isRestSSLEnabled(precedence));
}
 
+   /**
+* Tests whether activation of REST mutual SSL authentication evaluates 
the config flags correctly.
+*/
+   @SuppressWarnings("deprecation")
 
 Review comment:
   the `SSL_ENABLED` option is deprecated, but using the annotation is a bad 
way of handling this, I should rather use the new options


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Allow to enable SSL mutual authentication on REST endpoints by configuration
> 
>
> Key: FLINK-10371
> URL: https://issues.apache.org/jira/browse/FLINK-10371
> Project: Flink
>  Issue Type: Improvement
>  Components: Client, REST, Security
>Affects Versions: 1.6.0, 1.7.0
>Reporter: Johannes Dillmann
>Assignee: Johannes Dillmann
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0, 1.6.2
>
>
> With Flink 1.6 SSL mutual authentication was introduced for internal 
> connectivity in FLINK-9312. 
>  SSL support for external connectivity was also introduced in regard to 
> encryption of the connection and verification of the Flink REST endpoint from 
> the client side.
> But _mutual authentication between the REST endpoint and clients is not 
> supported yet_.
>  The [documentation suggests 
> |https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/security-ssl.html]
>  using a side car proxy to enable SSL mutual auth on the REST endpoint and 
> points out the advantages of using a feature rich proxy.
> While this is a good rationale, there are still important use cases for 
> support of  simple mutual authentication directly in Flink: Mainly support 
> for using standard images in a containerized environment.
> There are tools used to setup Flink Jobs (for example on Kubernetes clusters) 
> and act as gateways to the Flink REST endpoint and the Flink web interface. 
> To prevent unauthorised access to Flink the connectivity has to be secured. 
> As the tools acts as gateway it is easy to create and pass a shared keystore  
> and truststore used for mutual authentication to the Flink instances 
> configurations.
> To enable for SSL mutual authentication on REST endpoints, I am suggesting to 
> add a the configuration parameter `security.ssl.rest.authentication-enabled` 
> which defaults to `false`.
>  If it is set to `true` the `SSLUtils` factories for creating the REST server 
> endpoint and the REST clients should set authentication to required and share 
> `security.ssl.rest.keystore` and `security.ssl.rest.truststore` to setup SSL 
> mutual authenticated connections.
>  
> I have a working prototype which I would gladly submit as a PR to get further 
> feedback. The changes to Flink are minimal and the default behaviour won't 
> change.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] kleingeist commented on a change in pull request #6727: [FLINK-10371] Allow to enable SSL mutual authentication on REST endpoints by configuration

2018-09-25 Thread GitBox
kleingeist commented on a change in pull request #6727: [FLINK-10371] Allow to 
enable SSL mutual authentication on REST endpoints by configuration
URL: https://github.com/apache/flink/pull/6727#discussion_r220081405
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java
 ##
 @@ -78,6 +78,29 @@ public void checkEnableSSL() {
assertFalse(SSLUtils.isRestSSLEnabled(precedence));
}
 
+   /**
+* Tests whether activation of REST mutual SSL authentication evaluates 
the config flags correctly.
+*/
+   @SuppressWarnings("deprecation")
 
 Review comment:
   the `SSL_ENABLED` option is deprecated, but using the annotation is a bad 
way of handling this, I should rather use the new options, as the fallback is 
already covered in another test


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10371) Allow to enable SSL mutual authentication on REST endpoints by configuration

2018-09-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10371?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16626910#comment-16626910
 ] 

ASF GitHub Bot commented on FLINK-10371:


kleingeist commented on a change in pull request #6727: [FLINK-10371] Allow to 
enable SSL mutual authentication on REST endpoints by configuration
URL: https://github.com/apache/flink/pull/6727#discussion_r220081405
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/net/SSLUtilsTest.java
 ##
 @@ -78,6 +78,29 @@ public void checkEnableSSL() {
assertFalse(SSLUtils.isRestSSLEnabled(precedence));
}
 
+   /**
+* Tests whether activation of REST mutual SSL authentication evaluates 
the config flags correctly.
+*/
+   @SuppressWarnings("deprecation")
 
 Review comment:
   the `SSL_ENABLED` option is deprecated, but using the annotation is a bad 
way of handling this, I should rather use the new options, as the fallback is 
already covered in another test


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Allow to enable SSL mutual authentication on REST endpoints by configuration
> 
>
> Key: FLINK-10371
> URL: https://issues.apache.org/jira/browse/FLINK-10371
> Project: Flink
>  Issue Type: Improvement
>  Components: Client, REST, Security
>Affects Versions: 1.6.0, 1.7.0
>Reporter: Johannes Dillmann
>Assignee: Johannes Dillmann
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0, 1.6.2
>
>
> With Flink 1.6 SSL mutual authentication was introduced for internal 
> connectivity in FLINK-9312. 
>  SSL support for external connectivity was also introduced in regard to 
> encryption of the connection and verification of the Flink REST endpoint from 
> the client side.
> But _mutual authentication between the REST endpoint and clients is not 
> supported yet_.
>  The [documentation suggests 
> |https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/security-ssl.html]
>  using a side car proxy to enable SSL mutual auth on the REST endpoint and 
> points out the advantages of using a feature rich proxy.
> While this is a good rationale, there are still important use cases for 
> support of  simple mutual authentication directly in Flink: Mainly support 
> for using standard images in a containerized environment.
> There are tools used to setup Flink Jobs (for example on Kubernetes clusters) 
> and act as gateways to the Flink REST endpoint and the Flink web interface. 
> To prevent unauthorised access to Flink the connectivity has to be secured. 
> As the tools acts as gateway it is easy to create and pass a shared keystore  
> and truststore used for mutual authentication to the Flink instances 
> configurations.
> To enable for SSL mutual authentication on REST endpoints, I am suggesting to 
> add a the configuration parameter `security.ssl.rest.authentication-enabled` 
> which defaults to `false`.
>  If it is set to `true` the `SSLUtils` factories for creating the REST server 
> endpoint and the REST clients should set authentication to required and share 
> `security.ssl.rest.keystore` and `security.ssl.rest.truststore` to setup SSL 
> mutual authenticated connections.
>  
> I have a working prototype which I would gladly submit as a PR to get further 
> feedback. The changes to Flink are minimal and the default behaviour won't 
> change.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10315) Let JDBCAppendTableSink be built with java.sql.Connection

2018-09-25 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16626912#comment-16626912
 ] 

vinoyang commented on FLINK-10315:
--

[~twalthr] any opinion?

> Let JDBCAppendTableSink be built with java.sql.Connection
> -
>
> Key: FLINK-10315
> URL: https://issues.apache.org/jira/browse/FLINK-10315
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API
> Environment: I'm currently using Flink 1.6.0 Java.
>Reporter: François Lacombe
>Assignee: vinoyang
>Priority: Major
>
> Currently, JDBCAppendTableSink is built with methods like setDBUrl, 
> setUsername, setPassword... and so on.
> We can't use an existing Java SQL connection to build it.
> It may be great to add a setConnection() method to the builder class as to 
> prevent sensitive data like username or password to transit through large 
> stacks from config connectors (often in main()) to JDBC sinks.
> To be able to provide only one object is far lighter than 4 or 5 strings
>  
> Thanks in advance



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-10383) Hadoop configurations on the classpath seep into the S3 file system configs

2018-09-25 Thread Stephan Ewen (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10383?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen resolved FLINK-10383.
--
Resolution: Fixed

Fixed in
  - 1.6.2 via 67fa338c2f4ff7cc5d28f9eac90f170aed7cefcc
  - 1.7.0 via 51f6f7f9ebab17cd9dfd7b68ded600ac67021652

> Hadoop configurations on the classpath seep into the S3 file system configs
> ---
>
> Key: FLINK-10383
> URL: https://issues.apache.org/jira/browse/FLINK-10383
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem
>Affects Versions: 1.6.1
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0, 1.6.2
>
>
> The S3 connectors are based on a self-contained shaded Hadoop. By design, 
> they should only use config value from the Flink configuration.
> However, because Hadoop loads implicitly configs from the classpath, existing 
> "core-site.xml" files can interfere with the configuration in ways 
> intransparent for the user. We should ensure such configs are not loaded.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-10383) Hadoop configurations on the classpath seep into the S3 file system configs

2018-09-25 Thread Stephan Ewen (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10383?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen closed FLINK-10383.


> Hadoop configurations on the classpath seep into the S3 file system configs
> ---
>
> Key: FLINK-10383
> URL: https://issues.apache.org/jira/browse/FLINK-10383
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem
>Affects Versions: 1.6.1
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0, 1.6.2
>
>
> The S3 connectors are based on a self-contained shaded Hadoop. By design, 
> they should only use config value from the Flink configuration.
> However, because Hadoop loads implicitly configs from the classpath, existing 
> "core-site.xml" files can interfere with the configuration in ways 
> intransparent for the user. We should ensure such configs are not loaded.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] StephanEwen commented on issue #6728: [FLINK-10383] [s3] (release-1.6) Prevent Hadoop configs on classpath to interfere with S3 configuration

2018-09-25 Thread GitBox
StephanEwen commented on issue #6728: [FLINK-10383] [s3] (release-1.6) Prevent 
Hadoop configs on classpath to interfere with S3 configuration
URL: https://github.com/apache/flink/pull/6728#issuecomment-424235824
 
 
   Merged in 67fa338c2f4ff7cc5d28f9eac90f170aed7cefcc


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] StephanEwen commented on issue #6726: [FLINK-9061] [s3] Add entropy injection to S3 file systems for checkpoints

2018-09-25 Thread GitBox
StephanEwen commented on issue #6726: [FLINK-9061] [s3] Add entropy injection 
to S3 file systems for checkpoints
URL: https://github.com/apache/flink/pull/6726#issuecomment-424235939
 
 
   Merged in d8a9c72ed675669363e6c7d7499abee30fc19b88


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10383) Hadoop configurations on the classpath seep into the S3 file system configs

2018-09-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10383?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16626914#comment-16626914
 ] 

ASF GitHub Bot commented on FLINK-10383:


StephanEwen commented on issue #6728: [FLINK-10383] [s3] (release-1.6) Prevent 
Hadoop configs on classpath to interfere with S3 configuration
URL: https://github.com/apache/flink/pull/6728#issuecomment-424235824
 
 
   Merged in 67fa338c2f4ff7cc5d28f9eac90f170aed7cefcc


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Hadoop configurations on the classpath seep into the S3 file system configs
> ---
>
> Key: FLINK-10383
> URL: https://issues.apache.org/jira/browse/FLINK-10383
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem
>Affects Versions: 1.6.1
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0, 1.6.2
>
>
> The S3 connectors are based on a self-contained shaded Hadoop. By design, 
> they should only use config value from the Flink configuration.
> However, because Hadoop loads implicitly configs from the classpath, existing 
> "core-site.xml" files can interfere with the configuration in ways 
> intransparent for the user. We should ensure such configs are not loaded.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9061) add entropy to s3 path for better scalability

2018-09-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16626916#comment-16626916
 ] 

ASF GitHub Bot commented on FLINK-9061:
---

StephanEwen commented on issue #6726: [FLINK-9061] [s3] Add entropy injection 
to S3 file systems for checkpoints
URL: https://github.com/apache/flink/pull/6726#issuecomment-424235939
 
 
   Merged in d8a9c72ed675669363e6c7d7499abee30fc19b88


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> add entropy to s3 path for better scalability
> -
>
> Key: FLINK-9061
> URL: https://issues.apache.org/jira/browse/FLINK-9061
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.4.2
>Reporter: Jamie Grier
>Assignee: Indrajit Roychoudhury
>Priority: Critical
>  Labels: pull-request-available
>
> I think we need to modify the way we write checkpoints to S3 for high-scale 
> jobs (those with many total tasks).  The issue is that we are writing all the 
> checkpoint data under a common key prefix.  This is the worst case scenario 
> for S3 performance since the key is used as a partition key.
>  
> In the worst case checkpoints fail with a 500 status code coming back from S3 
> and an internal error type of TooBusyException.
>  
> One possible solution would be to add a hook in the Flink filesystem code 
> that allows me to "rewrite" paths.  For example say I have the checkpoint 
> directory set to:
>  
> s3://bucket/flink/checkpoints
>  
> I would hook that and rewrite that path to:
>  
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
> path
>  
> This would distribute the checkpoint write load around the S3 cluster evenly.
>  
> For reference: 
> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>  
> Any other people hit this issue?  Any other ideas for solutions?  This is a 
> pretty serious problem for people trying to checkpoint to S3.
>  
> -Jamie
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9061) add entropy to s3 path for better scalability

2018-09-25 Thread Stephan Ewen (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16626920#comment-16626920
 ] 

Stephan Ewen commented on FLINK-9061:
-

Fixed in
  - 1.7.0 via d8a9c72ed675669363e6c7d7499abee30fc19b88 for both {{s3-hadoop}} 
and {{s3-presto}}
  - 1.6.2 via a2b2041bf3885ee16d0333b4f153a12d435edff4 for only {{s3-presto}}.

> add entropy to s3 path for better scalability
> -
>
> Key: FLINK-9061
> URL: https://issues.apache.org/jira/browse/FLINK-9061
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.4.2
>Reporter: Jamie Grier
>Assignee: Indrajit Roychoudhury
>Priority: Critical
>  Labels: pull-request-available
>
> I think we need to modify the way we write checkpoints to S3 for high-scale 
> jobs (those with many total tasks).  The issue is that we are writing all the 
> checkpoint data under a common key prefix.  This is the worst case scenario 
> for S3 performance since the key is used as a partition key.
>  
> In the worst case checkpoints fail with a 500 status code coming back from S3 
> and an internal error type of TooBusyException.
>  
> One possible solution would be to add a hook in the Flink filesystem code 
> that allows me to "rewrite" paths.  For example say I have the checkpoint 
> directory set to:
>  
> s3://bucket/flink/checkpoints
>  
> I would hook that and rewrite that path to:
>  
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
> path
>  
> This would distribute the checkpoint write load around the S3 cluster evenly.
>  
> For reference: 
> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>  
> Any other people hit this issue?  Any other ideas for solutions?  This is a 
> pretty serious problem for people trying to checkpoint to S3.
>  
> -Jamie
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-10413) requestPartitionState messages overwhelms JM RPC main thread

2018-09-25 Thread vinoyang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10413?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

vinoyang closed FLINK-10413.

Resolution: Duplicate

> requestPartitionState messages overwhelms JM RPC main thread
> 
>
> Key: FLINK-10413
> URL: https://issues.apache.org/jira/browse/FLINK-10413
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Zhu Zhu
>Assignee: vinoyang
>Priority: Major
>
> We tried to benchmark the job scheduling performance with a 2000x2000 
> ALL-to-ALL streaming(EAGER) job. The input data is empty so the tasks 
> finishes soon after started.
> In this case we see slow RPC responses and TM/RM heartbeats to JM will 
> finally timeout.
> We find ~2,000,000 requestPartitionState messages triggered by 
> triggerPartitionProducerStateCheck in a short time, which overwhelms JM RPC 
> main thread. This is due to downstream tasks can be started earlier than 
> upstream tasks in EAGER scheduling.
>  
> We's suggest no partition producer state check to avoid this issue. The task 
> can just keep waiting for a while and retrying if the partition does not 
> exist. There are two cases when the partition does not exist:
>  # the partition is not started yet
>  # the partition is failed
> In case 1, retry works. In case 2, a task failover will soon happen and 
> cancel the downstream tasks as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] StephanEwen commented on issue #6302: [FLINK-9061][checkpointing] add entropy to s3 path for better scalability

2018-09-25 Thread GitBox
StephanEwen commented on issue #6302:  [FLINK-9061][checkpointing] add entropy 
to s3 path for better scalability
URL: https://github.com/apache/flink/pull/6302#issuecomment-424241775
 
 
   I have adjusted this and merge it for `release-1.6` and `master`.
   Docs are under 
https://github.com/apache/flink/blob/master/docs/ops/filesystems.md#entropy-injection-for-s3-file-systems
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9061) add entropy to s3 path for better scalability

2018-09-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16626941#comment-16626941
 ] 

ASF GitHub Bot commented on FLINK-9061:
---

StephanEwen commented on issue #6302:  [FLINK-9061][checkpointing] add entropy 
to s3 path for better scalability
URL: https://github.com/apache/flink/pull/6302#issuecomment-424241775
 
 
   I have adjusted this and merge it for `release-1.6` and `master`.
   Docs are under 
https://github.com/apache/flink/blob/master/docs/ops/filesystems.md#entropy-injection-for-s3-file-systems
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> add entropy to s3 path for better scalability
> -
>
> Key: FLINK-9061
> URL: https://issues.apache.org/jira/browse/FLINK-9061
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.4.2
>Reporter: Jamie Grier
>Assignee: Indrajit Roychoudhury
>Priority: Critical
>  Labels: pull-request-available
>
> I think we need to modify the way we write checkpoints to S3 for high-scale 
> jobs (those with many total tasks).  The issue is that we are writing all the 
> checkpoint data under a common key prefix.  This is the worst case scenario 
> for S3 performance since the key is used as a partition key.
>  
> In the worst case checkpoints fail with a 500 status code coming back from S3 
> and an internal error type of TooBusyException.
>  
> One possible solution would be to add a hook in the Flink filesystem code 
> that allows me to "rewrite" paths.  For example say I have the checkpoint 
> directory set to:
>  
> s3://bucket/flink/checkpoints
>  
> I would hook that and rewrite that path to:
>  
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
> path
>  
> This would distribute the checkpoint write load around the S3 cluster evenly.
>  
> For reference: 
> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>  
> Any other people hit this issue?  Any other ideas for solutions?  This is a 
> pretty serious problem for people trying to checkpoint to S3.
>  
> -Jamie
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] azagrebin commented on a change in pull request #6735: [FLINK-9126] New CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo

2018-09-25 Thread GitBox
azagrebin commented on a change in pull request #6735: [FLINK-9126] New 
CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo
URL: https://github.com/apache/flink/pull/6735#discussion_r220092435
 
 

 ##
 File path: 
flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchPojoExample.java
 ##
 @@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.batch.connectors.cassandra.example;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.batch.connectors.cassandra.CassandraPojoInputFormat;
+import org.apache.flink.batch.connectors.cassandra.CassandraTupleOutputFormat;
+import 
org.apache.flink.batch.connectors.cassandra.CustomCassandraAnnotatedPojo;
+import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.mapping.Mapper;
+
+import java.util.ArrayList;
+
+/**
+ * This is an example showing the to use the {@link 
CassandraPojoInputFormat}-/CassandraOutputFormats in the Batch API.
+ *
+ * The example assumes that a table exists in a local cassandra database, 
according to the following queries:
+ * CREATE KEYSPACE IF NOT EXISTS test WITH replication = {'class': 
'SimpleStrategy', 'replication_factor': ‘1’};
+ * CREATE TABLE IF NOT EXISTS test.batches (number int, strings text, PRIMARY 
KEY(number, strings));
+ */
+public class BatchPojoExample {
+   private static final String INSERT_QUERY = "INSERT INTO test.batches 
(number, strings) VALUES (?,?);";
+   private static final String SELECT_QUERY = "SELECT number, strings FROM 
test.batches;";
+
+   /*
+*  table script: "CREATE TABLE test.batches (number int, strings 
text, PRIMARY KEY(number, strings));"
+*/
+   public static void main(String[] args) throws Exception {
+
+   ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+   env.setParallelism(1);
+
+   ArrayList> collection = new 
ArrayList<>(20);
+   for (int i = 0; i < 20; i++) {
+   collection.add(new Tuple2<>(i, "string " + i));
+   }
+
+   DataSet> dataSet = 
env.fromCollection(collection);
+
+   dataSet.output(new CassandraTupleOutputFormat>(INSERT_QUERY, new ClusterBuilder() {
+   @Override
+   protected Cluster buildCluster(Cluster.Builder builder) 
{
+   return 
builder.addContactPoints("127.0.0.1").build();
+   }
+   }));
+
 
 Review comment:
   I think it should be first creating output then executing:
   ```
   dataSet.output(new CassandraTupleOutputFormat<>(INSERT_QUERY, 
clusterBuilder));
   env.execute("Write");
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] kleingeist commented on a change in pull request #6727: [FLINK-10371] Allow to enable SSL mutual authentication on REST endpoints by configuration

2018-09-25 Thread GitBox
kleingeist commented on a change in pull request #6727: [FLINK-10371] Allow to 
enable SSL mutual authentication on REST endpoints by configuration
URL: https://github.com/apache/flink/pull/6727#discussion_r220092612
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java
 ##
 @@ -275,6 +340,10 @@ public static SSLContext 
createRestClientSSLContext(Configuration config) throws
return null;
}
 
+   if (isRestSSLAuthenticationEnabled(config)) {
+   return createRestAuthenticationSSLContext(config);
+   }
+
 
 Review comment:
   do you have a hint on what is preferred in Flink:
   - passing ints that may be set by using constants `AUTH_CONFIG_MODE_CLIENT` 

   - using an Enum


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9126) Creation of the CassandraPojoInputFormat class to output data into a Custom Cassandra Annotated Pojo

2018-09-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16626964#comment-16626964
 ] 

ASF GitHub Bot commented on FLINK-9126:
---

azagrebin commented on a change in pull request #6735: [FLINK-9126] New 
CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo
URL: https://github.com/apache/flink/pull/6735#discussion_r220092435
 
 

 ##
 File path: 
flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/example/BatchPojoExample.java
 ##
 @@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.batch.connectors.cassandra.example;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.batch.connectors.cassandra.CassandraPojoInputFormat;
+import org.apache.flink.batch.connectors.cassandra.CassandraTupleOutputFormat;
+import 
org.apache.flink.batch.connectors.cassandra.CustomCassandraAnnotatedPojo;
+import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.mapping.Mapper;
+
+import java.util.ArrayList;
+
+/**
+ * This is an example showing the to use the {@link 
CassandraPojoInputFormat}-/CassandraOutputFormats in the Batch API.
+ *
+ * The example assumes that a table exists in a local cassandra database, 
according to the following queries:
+ * CREATE KEYSPACE IF NOT EXISTS test WITH replication = {'class': 
'SimpleStrategy', 'replication_factor': ‘1’};
+ * CREATE TABLE IF NOT EXISTS test.batches (number int, strings text, PRIMARY 
KEY(number, strings));
+ */
+public class BatchPojoExample {
+   private static final String INSERT_QUERY = "INSERT INTO test.batches 
(number, strings) VALUES (?,?);";
+   private static final String SELECT_QUERY = "SELECT number, strings FROM 
test.batches;";
+
+   /*
+*  table script: "CREATE TABLE test.batches (number int, strings 
text, PRIMARY KEY(number, strings));"
+*/
+   public static void main(String[] args) throws Exception {
+
+   ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+   env.setParallelism(1);
+
+   ArrayList> collection = new 
ArrayList<>(20);
+   for (int i = 0; i < 20; i++) {
+   collection.add(new Tuple2<>(i, "string " + i));
+   }
+
+   DataSet> dataSet = 
env.fromCollection(collection);
+
+   dataSet.output(new CassandraTupleOutputFormat>(INSERT_QUERY, new ClusterBuilder() {
+   @Override
+   protected Cluster buildCluster(Cluster.Builder builder) 
{
+   return 
builder.addContactPoints("127.0.0.1").build();
+   }
+   }));
+
 
 Review comment:
   I think it should be first creating output then executing:
   ```
   dataSet.output(new CassandraTupleOutputFormat<>(INSERT_QUERY, 
clusterBuilder));
   env.execute("Write");
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Creation of the CassandraPojoInputFormat class to output data into a Custom 
> Cassandra Annotated Pojo
> 
>
> Key: FLINK-9126
> URL: https://issues.apache.org/jira/browse/FLINK-9126
> Project: Flink
>  Issue Type: New Feature
>  Components: DataSet API
>Affects Versions: 1.5.0, 1.4.2
>Reporter: Jeffrey Carter
>Assignee: Jeffrey Carter
>Priority: Minor
>  Labels: InputFormat, cassandra, features, pull-r

[jira] [Commented] (FLINK-10371) Allow to enable SSL mutual authentication on REST endpoints by configuration

2018-09-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10371?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16626965#comment-16626965
 ] 

ASF GitHub Bot commented on FLINK-10371:


kleingeist commented on a change in pull request #6727: [FLINK-10371] Allow to 
enable SSL mutual authentication on REST endpoints by configuration
URL: https://github.com/apache/flink/pull/6727#discussion_r220092612
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java
 ##
 @@ -275,6 +340,10 @@ public static SSLContext 
createRestClientSSLContext(Configuration config) throws
return null;
}
 
+   if (isRestSSLAuthenticationEnabled(config)) {
+   return createRestAuthenticationSSLContext(config);
+   }
+
 
 Review comment:
   do you have a hint on what is preferred in Flink:
   - passing ints that may be set by using constants `AUTH_CONFIG_MODE_CLIENT` 

   - using an Enum


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Allow to enable SSL mutual authentication on REST endpoints by configuration
> 
>
> Key: FLINK-10371
> URL: https://issues.apache.org/jira/browse/FLINK-10371
> Project: Flink
>  Issue Type: Improvement
>  Components: Client, REST, Security
>Affects Versions: 1.6.0, 1.7.0
>Reporter: Johannes Dillmann
>Assignee: Johannes Dillmann
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0, 1.6.2
>
>
> With Flink 1.6 SSL mutual authentication was introduced for internal 
> connectivity in FLINK-9312. 
>  SSL support for external connectivity was also introduced in regard to 
> encryption of the connection and verification of the Flink REST endpoint from 
> the client side.
> But _mutual authentication between the REST endpoint and clients is not 
> supported yet_.
>  The [documentation suggests 
> |https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/security-ssl.html]
>  using a side car proxy to enable SSL mutual auth on the REST endpoint and 
> points out the advantages of using a feature rich proxy.
> While this is a good rationale, there are still important use cases for 
> support of  simple mutual authentication directly in Flink: Mainly support 
> for using standard images in a containerized environment.
> There are tools used to setup Flink Jobs (for example on Kubernetes clusters) 
> and act as gateways to the Flink REST endpoint and the Flink web interface. 
> To prevent unauthorised access to Flink the connectivity has to be secured. 
> As the tools acts as gateway it is easy to create and pass a shared keystore  
> and truststore used for mutual authentication to the Flink instances 
> configurations.
> To enable for SSL mutual authentication on REST endpoints, I am suggesting to 
> add a the configuration parameter `security.ssl.rest.authentication-enabled` 
> which defaults to `false`.
>  If it is set to `true` the `SSLUtils` factories for creating the REST server 
> endpoint and the REST clients should set authentication to required and share 
> `security.ssl.rest.keystore` and `security.ssl.rest.truststore` to setup SSL 
> mutual authenticated connections.
>  
> I have a working prototype which I would gladly submit as a PR to get further 
> feedback. The changes to Flink are minimal and the default behaviour won't 
> change.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] azagrebin commented on a change in pull request #6735: [FLINK-9126] New CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo

2018-09-25 Thread GitBox
azagrebin commented on a change in pull request #6735: [FLINK-9126] New 
CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo
URL: https://github.com/apache/flink/pull/6735#discussion_r220093326
 
 

 ##
 File path: 
flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
 ##
 @@ -470,6 +472,34 @@ public void testCassandraTableSink() throws Exception {
Assert.assertTrue("The input data was not completely written to 
Cassandra", input.isEmpty());
}
 
+   @Test
+   public void testCassandraBatchPojoFormat() throws Exception {
+
+   OutputFormat> sink = new 
CassandraTupleOutputFormat<>(injectTableName(INSERT_DATA_QUERY), builder);
+   sink.configure(new Configuration());
+   sink.open(0, 1);
+
+   for (Tuple3 value : collection) {
+   sink.writeRecord(value);
+   }
+
+   sink.close();
+
+   InputFormat source = 
new CassandraPojoInputFormat<>(injectTableName(SELECT_DATA_QUERY), builder, 
CustomCassandraAnnotatedPojo.class);
+   source.configure(new Configuration());
+   source.open(null);
+
+   List result = new ArrayList<>();
+
+   while (!source.reachedEnd()) {
+   CustomCassandraAnnotatedPojo temp = 
source.nextRecord(new CustomCassandraAnnotatedPojo());
+   result.add(temp);
+   }
+
+   source.close();
+   Assert.assertEquals(20, result.size());
 
 Review comment:
   maybe example would be also more consistent if we use `CassandraPojoSink` 
there as well to populate the table, instead of raw tuples.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9126) Creation of the CassandraPojoInputFormat class to output data into a Custom Cassandra Annotated Pojo

2018-09-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16626967#comment-16626967
 ] 

ASF GitHub Bot commented on FLINK-9126:
---

azagrebin commented on a change in pull request #6735: [FLINK-9126] New 
CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo
URL: https://github.com/apache/flink/pull/6735#discussion_r220093326
 
 

 ##
 File path: 
flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
 ##
 @@ -470,6 +472,34 @@ public void testCassandraTableSink() throws Exception {
Assert.assertTrue("The input data was not completely written to 
Cassandra", input.isEmpty());
}
 
+   @Test
+   public void testCassandraBatchPojoFormat() throws Exception {
+
+   OutputFormat> sink = new 
CassandraTupleOutputFormat<>(injectTableName(INSERT_DATA_QUERY), builder);
+   sink.configure(new Configuration());
+   sink.open(0, 1);
+
+   for (Tuple3 value : collection) {
+   sink.writeRecord(value);
+   }
+
+   sink.close();
+
+   InputFormat source = 
new CassandraPojoInputFormat<>(injectTableName(SELECT_DATA_QUERY), builder, 
CustomCassandraAnnotatedPojo.class);
+   source.configure(new Configuration());
+   source.open(null);
+
+   List result = new ArrayList<>();
+
+   while (!source.reachedEnd()) {
+   CustomCassandraAnnotatedPojo temp = 
source.nextRecord(new CustomCassandraAnnotatedPojo());
+   result.add(temp);
+   }
+
+   source.close();
+   Assert.assertEquals(20, result.size());
 
 Review comment:
   maybe example would be also more consistent if we use `CassandraPojoSink` 
there as well to populate the table, instead of raw tuples.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Creation of the CassandraPojoInputFormat class to output data into a Custom 
> Cassandra Annotated Pojo
> 
>
> Key: FLINK-9126
> URL: https://issues.apache.org/jira/browse/FLINK-9126
> Project: Flink
>  Issue Type: New Feature
>  Components: DataSet API
>Affects Versions: 1.5.0, 1.4.2
>Reporter: Jeffrey Carter
>Assignee: Jeffrey Carter
>Priority: Minor
>  Labels: InputFormat, cassandra, features, pull-request-available
> Fix For: 1.7.0
>
> Attachments: CassandraPojoInputFormatText.rtf
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> Currently the DataSet API only has the ability to output data received from 
> Cassandra as a source in as a Tuple. This would be allow the data to be 
> output as a custom POJO that the user has created that has been annotated 
> using Datastax API. This would remove the need of  very long Tuples to be 
> created by the DataSet and then mapped to the custom POJO.
>  
> -The changes to the CassandraInputFormat object would be minimal, but would 
> require importing the Datastax API into the class-. Another option is to make 
> a similar, but slightly different class called CassandraPojoInputFormat.
> I have already gotten code for this working in my own project, but want other 
> thoughts as to the best way this should go about being implemented.
>  
> //Example of its use in main
> CassandraPojoInputFormat cassandraInputFormat = new 
> CassandraPojoInputFormat<>(queryToRun, defaultClusterBuilder, 
> CustomCassandraPojo.class);
>  cassandraInputFormat.configure(null);
>  cassandraInputFormat.open(null);
> DataSet outputTestSet = 
> exEnv.createInput(cassandraInputFormat, TypeInformation.of(new 
> TypeHint(){}));
>  
> //The class that I currently have set up
> [^CassandraPojoInputFormatText.rtf]
>  
> Will make another Jira Issue for the Output version next if this is approved



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10414) Add skip to next strategy

2018-09-25 Thread Dawid Wysakowicz (JIRA)
Dawid Wysakowicz created FLINK-10414:


 Summary: Add skip to next strategy
 Key: FLINK-10414
 URL: https://issues.apache.org/jira/browse/FLINK-10414
 Project: Flink
  Issue Type: Improvement
  Components: CEP
Reporter: Dawid Wysakowicz
Assignee: Dawid Wysakowicz
 Fix For: 1.7.0


Add skip to next strategy, that should discard all partial matches that started 
with the same element as found match.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] dawidwys opened a new pull request #6756: [FLINK-10414] Added skip to next strategy

2018-09-25 Thread GitBox
dawidwys opened a new pull request #6756: [FLINK-10414] Added skip to next 
strategy
URL: https://github.com/apache/flink/pull/6756
 
 
   ## What is the purpose of the change
   
   This pull request adds support of SKIP_TO_NEXT, which will fully cover all 
variants of AFTER MATCH clause in SQL. This clause is important for e.g. 
reluctant quantifiers at the end, as after first, shortest match, all others 
started at the same element will be discarded.
   
   ## Brief change log
   
 * Added `SkipToNextStrategy`
 * Extended documentation with SKIP_TO_NEXT description
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   * 
`org.apache.flink.cep.nfa.AfterMatchSkipITCase#testNoSkipWithQuantifierAtTheEnd`
   * 
`org.apache.flink.cep.nfa.AfterMatchSkipITCase#testSkipToNextWithQuantifierAtTheEnd`
   * `org.apache.flink.cep.nfa.AfterMatchSkipITCase#testNoSkipWithFollowedByAny`
   * 
`org.apache.flink.cep.nfa.AfterMatchSkipITCase#testSkipToNextWithFollowedByAny`
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (**yes** / no)
 - If yes, how is the feature documented? (not applicable / **docs** / 
**JavaDocs** / not documented)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-10414) Add skip to next strategy

2018-09-25 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10414?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-10414:
---
Labels: pull-request-available  (was: )

> Add skip to next strategy
> -
>
> Key: FLINK-10414
> URL: https://issues.apache.org/jira/browse/FLINK-10414
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Add skip to next strategy, that should discard all partial matches that 
> started with the same element as found match.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10414) Add skip to next strategy

2018-09-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16627005#comment-16627005
 ] 

ASF GitHub Bot commented on FLINK-10414:


dawidwys opened a new pull request #6756: [FLINK-10414] Added skip to next 
strategy
URL: https://github.com/apache/flink/pull/6756
 
 
   ## What is the purpose of the change
   
   This pull request adds support of SKIP_TO_NEXT, which will fully cover all 
variants of AFTER MATCH clause in SQL. This clause is important for e.g. 
reluctant quantifiers at the end, as after first, shortest match, all others 
started at the same element will be discarded.
   
   ## Brief change log
   
 * Added `SkipToNextStrategy`
 * Extended documentation with SKIP_TO_NEXT description
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   * 
`org.apache.flink.cep.nfa.AfterMatchSkipITCase#testNoSkipWithQuantifierAtTheEnd`
   * 
`org.apache.flink.cep.nfa.AfterMatchSkipITCase#testSkipToNextWithQuantifierAtTheEnd`
   * `org.apache.flink.cep.nfa.AfterMatchSkipITCase#testNoSkipWithFollowedByAny`
   * 
`org.apache.flink.cep.nfa.AfterMatchSkipITCase#testSkipToNextWithFollowedByAny`
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (**yes** / no)
 - If yes, how is the feature documented? (not applicable / **docs** / 
**JavaDocs** / not documented)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add skip to next strategy
> -
>
> Key: FLINK-10414
> URL: https://issues.apache.org/jira/browse/FLINK-10414
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Add skip to next strategy, that should discard all partial matches that 
> started with the same element as found match.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] tillrohrmann commented on a change in pull request #6751: [FLINK-10403] Port JobManagerHAProcessFailureBatchRecoveryITCase to new code base

2018-09-25 Thread GitBox
tillrohrmann commented on a change in pull request #6751: [FLINK-10403] Port 
JobManagerHAProcessFailureBatchRecoveryITCase to new code base
URL: https://github.com/apache/flink/pull/6751#discussion_r220107840
 
 

 ##
 File path: 
flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
 ##
 @@ -212,7 +216,8 @@ public void flatMap(Long value, Collector out) 
throws Exception {
}
 
@Test
-   public void testJobManagerProcessFailure() throws Exception {
+   public void testDispatcherProcessFailure() throws Exception {
 
 Review comment:
   I think I've found the culprit. It seems as if the `RestClient` does not 
time out an established HTTP connection to the server. Due to that, it can 
happen that we just established the connection when the `DispatcherProcess` is 
killed. This apparently won't be noticed by Netty in time for the test to 
finish. I'm still investigating why Netty cannot detect an unreachable remote 
host.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10403) Port JobManagerHAProcessFailureBatchRecoveryITCase to new code base

2018-09-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10403?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16627022#comment-16627022
 ] 

ASF GitHub Bot commented on FLINK-10403:


tillrohrmann commented on a change in pull request #6751: [FLINK-10403] Port 
JobManagerHAProcessFailureBatchRecoveryITCase to new code base
URL: https://github.com/apache/flink/pull/6751#discussion_r220107840
 
 

 ##
 File path: 
flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
 ##
 @@ -212,7 +216,8 @@ public void flatMap(Long value, Collector out) 
throws Exception {
}
 
@Test
-   public void testJobManagerProcessFailure() throws Exception {
+   public void testDispatcherProcessFailure() throws Exception {
 
 Review comment:
   I think I've found the culprit. It seems as if the `RestClient` does not 
time out an established HTTP connection to the server. Due to that, it can 
happen that we just established the connection when the `DispatcherProcess` is 
killed. This apparently won't be noticed by Netty in time for the test to 
finish. I'm still investigating why Netty cannot detect an unreachable remote 
host.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Port JobManagerHAProcessFailureBatchRecoveryITCase to new code base
> ---
>
> Key: FLINK-10403
> URL: https://issues.apache.org/jira/browse/FLINK-10403
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.7.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Port {{JobManagerHAProcessFailureBatchRecoveryITCase}} to new code base.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10415) RestClient does not react to lost connection

2018-09-25 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-10415:
-

 Summary: RestClient does not react to lost connection
 Key: FLINK-10415
 URL: https://issues.apache.org/jira/browse/FLINK-10415
 Project: Flink
  Issue Type: Bug
  Components: REST
Affects Versions: 1.5.4, 1.6.1, 1.7.0
Reporter: Till Rohrmann
 Fix For: 1.7.0, 1.6.2, 1.5.5


While working on FLINK-10403, I noticed that Flink's {{RestClient}} does not 
seem to react to a lost connections in time. When sending a request to the 
current leader it happened that the leader was killed just after establishing 
the connection. Then the {{RestClient}} did not fail the connection and was 
stuck in writing a request or retrieving a response from the lost leader. I'm 
wondering whether we should introduce a {{ReadTimeoutHandler}} and 
{{WriteTimeoutHandler}} to handle these problems.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-9891) Flink cluster is not shutdown in YARN mode when Flink client is stopped

2018-09-25 Thread Gary Yao (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9891?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gary Yao reassigned FLINK-9891:
---

Assignee: Andrey Zagrebin  (was: Shuyi Chen)

> Flink cluster is not shutdown in YARN mode when Flink client is stopped
> ---
>
> Key: FLINK-9891
> URL: https://issues.apache.org/jira/browse/FLINK-9891
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.5.0, 1.5.1
>Reporter: Sergey Krasovskiy
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
>
> We are not using session mode and detached mode. The command to run Flink job 
> on YARN is:
> {code:java}
> /bin/flink run -m yarn-cluster -yn 1 -yqu flink -yjm 768 -ytm 
> 2048 -j ./flink-quickstart-java-1.0-SNAPSHOT.jar -c org.test.WordCount
> {code}
> Flink CLI logs:
> {code:java}
> Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR was set.
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in 
> [jar:file:/opt/flink-streaming/flink-streaming-1.5.1-1.5.1-bin-hadoop27-scala_2.11-1531485329/lib/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/usr/hdp/2.4.2.10-1/hadoop/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an 
> explanation.
> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
> 2018-07-18 12:47:03,747 INFO 
> org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl - Timeline service 
> address: http://hmaster-1.ipbl.rgcloud.net:8188/ws/v1/timeline/
> 2018-07-18 12:47:04,222 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - 
> No path for the flink jar passed. Using the location of class 
> org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
> 2018-07-18 12:47:04,222 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - 
> No path for the flink jar passed. Using the location of class 
> org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
> 2018-07-18 12:47:04,248 WARN 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor - Neither the 
> HADOOP_CONF_DIR nor the YARN_CONF_DIR environment variable is set. The Flink 
> YARN Client needs one of these to be set to properly load the Hadoop 
> configuration for accessing YARN.
> 2018-07-18 12:47:04,409 INFO 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor - Cluster specification: 
> ClusterSpecification{masterMemoryMB=768, taskManagerMemoryMB=2048, 
> numberTaskManagers=1, slotsPerTaskManager=1}
> 2018-07-18 12:47:04,783 WARN 
> org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory - The short-circuit 
> local reads feature cannot be used because libhadoop cannot be loaded.
> 2018-07-18 12:47:04,788 WARN 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor - The configuration 
> directory 
> ('/opt/flink-streaming/flink-streaming-1.5.1-1.5.1-bin-hadoop27-scala_2.11-1531485329/conf')
>  contains both LOG4J and Logback configuration files. Please delete or rename 
> one of them.
> 2018-07-18 12:47:07,846 INFO 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor - Submitting application 
> master application_1531474158783_10814
> 2018-07-18 12:47:08,073 INFO 
> org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Submitted application 
> application_1531474158783_10814
> 2018-07-18 12:47:08,074 INFO 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor - Waiting for the cluster 
> to be allocated
> 2018-07-18 12:47:08,076 INFO 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor - Deploying cluster, 
> current state ACCEPTED
> 2018-07-18 12:47:12,864 INFO 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor - YARN application has 
> been deployed successfully.
> {code}
> Job Manager logs:
> {code:java}
> 2018-07-18 12:47:09,913 INFO 
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - 
> 
> 2018-07-18 12:47:09,915 INFO 
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Starting 
> YarnSessionClusterEntrypoint (Version: 1.5.1, Rev:3488f8b, Date:10.07.2018 @ 
> 11:51:27 GMT)
> ...
> {code}
> Issues:
>  # Flink job is running as a Flink session
>  # Ctrl+C or 'stop' doesn't stop a job and YARN cluster
>  # Cancel job via Job Maanager web ui doesn't stop Flink cluster. To kill the 
> cluster we need to run: yarn application -kill 
> We also tried to run a flink job with 'mode: legacy' and we have the same 
> issues:
>  # Add property 'mode: legacy' to ./conf/flink-conf.yaml
>  # Execute the following command:
> {code:java}
> /bin/flink run -m yarn-cluster -yn 1 -yqu flink -yjm 768 -ytm 
> 2048 -j ./flink-quickstart-java-1.0-SNAPSHOT.jar -c org.test.WordCount
> {code}
> Flink CLI lo

[jira] [Assigned] (FLINK-9891) Flink cluster is not shutdown in YARN mode when Flink client is stopped

2018-09-25 Thread Gary Yao (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9891?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gary Yao reassigned FLINK-9891:
---

Assignee: Gary Yao  (was: Andrey Zagrebin)

> Flink cluster is not shutdown in YARN mode when Flink client is stopped
> ---
>
> Key: FLINK-9891
> URL: https://issues.apache.org/jira/browse/FLINK-9891
> Project: Flink
>  Issue Type: Bug
>  Components: Client, YARN
>Affects Versions: 1.5.0, 1.5.1
>Reporter: Sergey Krasovskiy
>Assignee: Gary Yao
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0, 1.6.2, 1.5.5
>
>
> We are not using session mode and detached mode. The command to run Flink job 
> on YARN is:
> {code:java}
> /bin/flink run -m yarn-cluster -yn 1 -yqu flink -yjm 768 -ytm 
> 2048 -j ./flink-quickstart-java-1.0-SNAPSHOT.jar -c org.test.WordCount
> {code}
> Flink CLI logs:
> {code:java}
> Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR was set.
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in 
> [jar:file:/opt/flink-streaming/flink-streaming-1.5.1-1.5.1-bin-hadoop27-scala_2.11-1531485329/lib/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/usr/hdp/2.4.2.10-1/hadoop/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an 
> explanation.
> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
> 2018-07-18 12:47:03,747 INFO 
> org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl - Timeline service 
> address: http://hmaster-1.ipbl.rgcloud.net:8188/ws/v1/timeline/
> 2018-07-18 12:47:04,222 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - 
> No path for the flink jar passed. Using the location of class 
> org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
> 2018-07-18 12:47:04,222 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - 
> No path for the flink jar passed. Using the location of class 
> org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
> 2018-07-18 12:47:04,248 WARN 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor - Neither the 
> HADOOP_CONF_DIR nor the YARN_CONF_DIR environment variable is set. The Flink 
> YARN Client needs one of these to be set to properly load the Hadoop 
> configuration for accessing YARN.
> 2018-07-18 12:47:04,409 INFO 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor - Cluster specification: 
> ClusterSpecification{masterMemoryMB=768, taskManagerMemoryMB=2048, 
> numberTaskManagers=1, slotsPerTaskManager=1}
> 2018-07-18 12:47:04,783 WARN 
> org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory - The short-circuit 
> local reads feature cannot be used because libhadoop cannot be loaded.
> 2018-07-18 12:47:04,788 WARN 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor - The configuration 
> directory 
> ('/opt/flink-streaming/flink-streaming-1.5.1-1.5.1-bin-hadoop27-scala_2.11-1531485329/conf')
>  contains both LOG4J and Logback configuration files. Please delete or rename 
> one of them.
> 2018-07-18 12:47:07,846 INFO 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor - Submitting application 
> master application_1531474158783_10814
> 2018-07-18 12:47:08,073 INFO 
> org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Submitted application 
> application_1531474158783_10814
> 2018-07-18 12:47:08,074 INFO 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor - Waiting for the cluster 
> to be allocated
> 2018-07-18 12:47:08,076 INFO 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor - Deploying cluster, 
> current state ACCEPTED
> 2018-07-18 12:47:12,864 INFO 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor - YARN application has 
> been deployed successfully.
> {code}
> Job Manager logs:
> {code:java}
> 2018-07-18 12:47:09,913 INFO 
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - 
> 
> 2018-07-18 12:47:09,915 INFO 
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Starting 
> YarnSessionClusterEntrypoint (Version: 1.5.1, Rev:3488f8b, Date:10.07.2018 @ 
> 11:51:27 GMT)
> ...
> {code}
> Issues:
>  # Flink job is running as a Flink session
>  # Ctrl+C or 'stop' doesn't stop a job and YARN cluster
>  # Cancel job via Job Maanager web ui doesn't stop Flink cluster. To kill the 
> cluster we need to run: yarn application -kill 
> We also tried to run a flink job with 'mode: legacy' and we have the same 
> issues:
>  # Add property 'mode: legacy' to ./conf/flink-conf.yaml
>  # Execute the following command:
> {code:java}
> /bin/flink run -m yarn-cluster -yn 1 -yqu flink -yjm 768 -ytm 
> 2048 -j ./flink-quicks

[jira] [Assigned] (FLINK-9891) Flink cluster is not shutdown in YARN mode when Flink client is stopped

2018-09-25 Thread Gary Yao (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9891?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gary Yao reassigned FLINK-9891:
---

Assignee: Andrey Zagrebin  (was: Gary Yao)

> Flink cluster is not shutdown in YARN mode when Flink client is stopped
> ---
>
> Key: FLINK-9891
> URL: https://issues.apache.org/jira/browse/FLINK-9891
> Project: Flink
>  Issue Type: Bug
>  Components: Client, YARN
>Affects Versions: 1.5.0, 1.5.1
>Reporter: Sergey Krasovskiy
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0, 1.6.2, 1.5.5
>
>
> We are not using session mode and detached mode. The command to run Flink job 
> on YARN is:
> {code:java}
> /bin/flink run -m yarn-cluster -yn 1 -yqu flink -yjm 768 -ytm 
> 2048 -j ./flink-quickstart-java-1.0-SNAPSHOT.jar -c org.test.WordCount
> {code}
> Flink CLI logs:
> {code:java}
> Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR was set.
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in 
> [jar:file:/opt/flink-streaming/flink-streaming-1.5.1-1.5.1-bin-hadoop27-scala_2.11-1531485329/lib/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/usr/hdp/2.4.2.10-1/hadoop/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an 
> explanation.
> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
> 2018-07-18 12:47:03,747 INFO 
> org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl - Timeline service 
> address: http://hmaster-1.ipbl.rgcloud.net:8188/ws/v1/timeline/
> 2018-07-18 12:47:04,222 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - 
> No path for the flink jar passed. Using the location of class 
> org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
> 2018-07-18 12:47:04,222 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - 
> No path for the flink jar passed. Using the location of class 
> org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
> 2018-07-18 12:47:04,248 WARN 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor - Neither the 
> HADOOP_CONF_DIR nor the YARN_CONF_DIR environment variable is set. The Flink 
> YARN Client needs one of these to be set to properly load the Hadoop 
> configuration for accessing YARN.
> 2018-07-18 12:47:04,409 INFO 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor - Cluster specification: 
> ClusterSpecification{masterMemoryMB=768, taskManagerMemoryMB=2048, 
> numberTaskManagers=1, slotsPerTaskManager=1}
> 2018-07-18 12:47:04,783 WARN 
> org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory - The short-circuit 
> local reads feature cannot be used because libhadoop cannot be loaded.
> 2018-07-18 12:47:04,788 WARN 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor - The configuration 
> directory 
> ('/opt/flink-streaming/flink-streaming-1.5.1-1.5.1-bin-hadoop27-scala_2.11-1531485329/conf')
>  contains both LOG4J and Logback configuration files. Please delete or rename 
> one of them.
> 2018-07-18 12:47:07,846 INFO 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor - Submitting application 
> master application_1531474158783_10814
> 2018-07-18 12:47:08,073 INFO 
> org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Submitted application 
> application_1531474158783_10814
> 2018-07-18 12:47:08,074 INFO 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor - Waiting for the cluster 
> to be allocated
> 2018-07-18 12:47:08,076 INFO 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor - Deploying cluster, 
> current state ACCEPTED
> 2018-07-18 12:47:12,864 INFO 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor - YARN application has 
> been deployed successfully.
> {code}
> Job Manager logs:
> {code:java}
> 2018-07-18 12:47:09,913 INFO 
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - 
> 
> 2018-07-18 12:47:09,915 INFO 
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Starting 
> YarnSessionClusterEntrypoint (Version: 1.5.1, Rev:3488f8b, Date:10.07.2018 @ 
> 11:51:27 GMT)
> ...
> {code}
> Issues:
>  # Flink job is running as a Flink session
>  # Ctrl+C or 'stop' doesn't stop a job and YARN cluster
>  # Cancel job via Job Maanager web ui doesn't stop Flink cluster. To kill the 
> cluster we need to run: yarn application -kill 
> We also tried to run a flink job with 'mode: legacy' and we have the same 
> issues:
>  # Add property 'mode: legacy' to ./conf/flink-conf.yaml
>  # Execute the following command:
> {code:java}
> /bin/flink run -m yarn-cluster -yn 1 -yqu flink -yjm 768 -ytm 
> 2048 -j ./flink

[jira] [Updated] (FLINK-9891) Flink cluster is not shutdown in YARN mode when Flink client is stopped

2018-09-25 Thread Gary Yao (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9891?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gary Yao updated FLINK-9891:

Fix Version/s: 1.5.5
   1.6.2
   1.7.0

> Flink cluster is not shutdown in YARN mode when Flink client is stopped
> ---
>
> Key: FLINK-9891
> URL: https://issues.apache.org/jira/browse/FLINK-9891
> Project: Flink
>  Issue Type: Bug
>  Components: Client, YARN
>Affects Versions: 1.5.0, 1.5.1
>Reporter: Sergey Krasovskiy
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0, 1.6.2, 1.5.5
>
>
> We are not using session mode and detached mode. The command to run Flink job 
> on YARN is:
> {code:java}
> /bin/flink run -m yarn-cluster -yn 1 -yqu flink -yjm 768 -ytm 
> 2048 -j ./flink-quickstart-java-1.0-SNAPSHOT.jar -c org.test.WordCount
> {code}
> Flink CLI logs:
> {code:java}
> Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR was set.
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in 
> [jar:file:/opt/flink-streaming/flink-streaming-1.5.1-1.5.1-bin-hadoop27-scala_2.11-1531485329/lib/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/usr/hdp/2.4.2.10-1/hadoop/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an 
> explanation.
> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
> 2018-07-18 12:47:03,747 INFO 
> org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl - Timeline service 
> address: http://hmaster-1.ipbl.rgcloud.net:8188/ws/v1/timeline/
> 2018-07-18 12:47:04,222 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - 
> No path for the flink jar passed. Using the location of class 
> org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
> 2018-07-18 12:47:04,222 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - 
> No path for the flink jar passed. Using the location of class 
> org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
> 2018-07-18 12:47:04,248 WARN 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor - Neither the 
> HADOOP_CONF_DIR nor the YARN_CONF_DIR environment variable is set. The Flink 
> YARN Client needs one of these to be set to properly load the Hadoop 
> configuration for accessing YARN.
> 2018-07-18 12:47:04,409 INFO 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor - Cluster specification: 
> ClusterSpecification{masterMemoryMB=768, taskManagerMemoryMB=2048, 
> numberTaskManagers=1, slotsPerTaskManager=1}
> 2018-07-18 12:47:04,783 WARN 
> org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory - The short-circuit 
> local reads feature cannot be used because libhadoop cannot be loaded.
> 2018-07-18 12:47:04,788 WARN 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor - The configuration 
> directory 
> ('/opt/flink-streaming/flink-streaming-1.5.1-1.5.1-bin-hadoop27-scala_2.11-1531485329/conf')
>  contains both LOG4J and Logback configuration files. Please delete or rename 
> one of them.
> 2018-07-18 12:47:07,846 INFO 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor - Submitting application 
> master application_1531474158783_10814
> 2018-07-18 12:47:08,073 INFO 
> org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Submitted application 
> application_1531474158783_10814
> 2018-07-18 12:47:08,074 INFO 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor - Waiting for the cluster 
> to be allocated
> 2018-07-18 12:47:08,076 INFO 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor - Deploying cluster, 
> current state ACCEPTED
> 2018-07-18 12:47:12,864 INFO 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor - YARN application has 
> been deployed successfully.
> {code}
> Job Manager logs:
> {code:java}
> 2018-07-18 12:47:09,913 INFO 
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - 
> 
> 2018-07-18 12:47:09,915 INFO 
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Starting 
> YarnSessionClusterEntrypoint (Version: 1.5.1, Rev:3488f8b, Date:10.07.2018 @ 
> 11:51:27 GMT)
> ...
> {code}
> Issues:
>  # Flink job is running as a Flink session
>  # Ctrl+C or 'stop' doesn't stop a job and YARN cluster
>  # Cancel job via Job Maanager web ui doesn't stop Flink cluster. To kill the 
> cluster we need to run: yarn application -kill 
> We also tried to run a flink job with 'mode: legacy' and we have the same 
> issues:
>  # Add property 'mode: legacy' to ./conf/flink-conf.yaml
>  # Execute the following command:
> {code:java}
> /bin/flink run -m yarn-cluster -yn 1 -yqu flink -yjm 768 -y

[jira] [Updated] (FLINK-9891) Flink cluster is not shutdown in YARN mode when Flink client is stopped

2018-09-25 Thread Gary Yao (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9891?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gary Yao updated FLINK-9891:

Component/s: YARN
 Client

> Flink cluster is not shutdown in YARN mode when Flink client is stopped
> ---
>
> Key: FLINK-9891
> URL: https://issues.apache.org/jira/browse/FLINK-9891
> Project: Flink
>  Issue Type: Bug
>  Components: Client, YARN
>Affects Versions: 1.5.0, 1.5.1
>Reporter: Sergey Krasovskiy
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0, 1.6.2, 1.5.5
>
>
> We are not using session mode and detached mode. The command to run Flink job 
> on YARN is:
> {code:java}
> /bin/flink run -m yarn-cluster -yn 1 -yqu flink -yjm 768 -ytm 
> 2048 -j ./flink-quickstart-java-1.0-SNAPSHOT.jar -c org.test.WordCount
> {code}
> Flink CLI logs:
> {code:java}
> Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR was set.
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in 
> [jar:file:/opt/flink-streaming/flink-streaming-1.5.1-1.5.1-bin-hadoop27-scala_2.11-1531485329/lib/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/usr/hdp/2.4.2.10-1/hadoop/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an 
> explanation.
> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
> 2018-07-18 12:47:03,747 INFO 
> org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl - Timeline service 
> address: http://hmaster-1.ipbl.rgcloud.net:8188/ws/v1/timeline/
> 2018-07-18 12:47:04,222 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - 
> No path for the flink jar passed. Using the location of class 
> org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
> 2018-07-18 12:47:04,222 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - 
> No path for the flink jar passed. Using the location of class 
> org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
> 2018-07-18 12:47:04,248 WARN 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor - Neither the 
> HADOOP_CONF_DIR nor the YARN_CONF_DIR environment variable is set. The Flink 
> YARN Client needs one of these to be set to properly load the Hadoop 
> configuration for accessing YARN.
> 2018-07-18 12:47:04,409 INFO 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor - Cluster specification: 
> ClusterSpecification{masterMemoryMB=768, taskManagerMemoryMB=2048, 
> numberTaskManagers=1, slotsPerTaskManager=1}
> 2018-07-18 12:47:04,783 WARN 
> org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory - The short-circuit 
> local reads feature cannot be used because libhadoop cannot be loaded.
> 2018-07-18 12:47:04,788 WARN 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor - The configuration 
> directory 
> ('/opt/flink-streaming/flink-streaming-1.5.1-1.5.1-bin-hadoop27-scala_2.11-1531485329/conf')
>  contains both LOG4J and Logback configuration files. Please delete or rename 
> one of them.
> 2018-07-18 12:47:07,846 INFO 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor - Submitting application 
> master application_1531474158783_10814
> 2018-07-18 12:47:08,073 INFO 
> org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Submitted application 
> application_1531474158783_10814
> 2018-07-18 12:47:08,074 INFO 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor - Waiting for the cluster 
> to be allocated
> 2018-07-18 12:47:08,076 INFO 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor - Deploying cluster, 
> current state ACCEPTED
> 2018-07-18 12:47:12,864 INFO 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor - YARN application has 
> been deployed successfully.
> {code}
> Job Manager logs:
> {code:java}
> 2018-07-18 12:47:09,913 INFO 
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - 
> 
> 2018-07-18 12:47:09,915 INFO 
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Starting 
> YarnSessionClusterEntrypoint (Version: 1.5.1, Rev:3488f8b, Date:10.07.2018 @ 
> 11:51:27 GMT)
> ...
> {code}
> Issues:
>  # Flink job is running as a Flink session
>  # Ctrl+C or 'stop' doesn't stop a job and YARN cluster
>  # Cancel job via Job Maanager web ui doesn't stop Flink cluster. To kill the 
> cluster we need to run: yarn application -kill 
> We also tried to run a flink job with 'mode: legacy' and we have the same 
> issues:
>  # Add property 'mode: legacy' to ./conf/flink-conf.yaml
>  # Execute the following command:
> {code:java}
> /bin/flink run -m yarn-cluster -yn 1 -yqu flink -yjm 768 -ytm 
> 2048 -j ./flink-quickst

[GitHub] StefanRRichter commented on issue #6746: [FLINK-10394][build] Remove legacy mode from Travis build matrix

2018-09-25 Thread GitBox
StefanRRichter commented on issue #6746: [FLINK-10394][build] Remove legacy 
mode from Travis build matrix
URL: https://github.com/apache/flink/pull/6746#issuecomment-424266907
 
 
   LGTM 👍


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10394) Remove legacy mode testing profiles from Travis config

2018-09-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16627045#comment-16627045
 ] 

ASF GitHub Bot commented on FLINK-10394:


StefanRRichter commented on issue #6746: [FLINK-10394][build] Remove legacy 
mode from Travis build matrix
URL: https://github.com/apache/flink/pull/6746#issuecomment-424266907
 
 
   LGTM 👍


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Remove legacy mode testing profiles from Travis config
> --
>
> Key: FLINK-10394
> URL: https://issues.apache.org/jira/browse/FLINK-10394
> Project: Flink
>  Issue Type: Sub-task
>  Components: Build System
>Affects Versions: 1.7.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
> Attachments: 
> 0001-FLINK-10394-Remove-legacy-mode-testing-profiles-from.patch
>
>
> Remove the legacy mode testing profiles from Travis config.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] StefanRRichter removed a comment on issue #6746: [FLINK-10394][build] Remove legacy mode from Travis build matrix

2018-09-25 Thread GitBox
StefanRRichter removed a comment on issue #6746: [FLINK-10394][build] Remove 
legacy mode from Travis build matrix
URL: https://github.com/apache/flink/pull/6746#issuecomment-424266907
 
 
   LGTM 👍


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


  1   2   3   >