[jira] [Commented] (FLINK-11261) BlobServer moves file with open OutputStream

2019-03-07 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11261?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16787627#comment-16787627
 ] 

vinoyang commented on FLINK-11261:
--

Hi [~Zentol] what do you think of this changing?
{code:java}
try (FileOutputStream fos = new FileOutputStream(incomingFile)) {
   // read stream
   byte[] buf = new byte[BUFFER_SIZE];
   while (true) {
  final int bytesRead = inputStream.read(buf);
  if (bytesRead == -1) {
 // done
 break;
  }
  fos.write(buf, 0, bytesRead);
  md.update(buf, 0, bytesRead);
   }
} finally {
   try {
  blobKey = moveTempFileToStore(incomingFile, jobId, md.digest(), blobType);

  return blobKey;
   } finally {
  // delete incomingFile from a failed download
  if (!incomingFile.delete() && incomingFile.exists()) {
 LOG.warn("Could not delete the staging file {} for blob key {} and job 
{}.",
incomingFile, blobKey, jobId);
  }
   }
}
{code}
 

> BlobServer moves file with open OutputStream
> 
>
> Key: FLINK-11261
> URL: https://issues.apache.org/jira/browse/FLINK-11261
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Operators
>Affects Versions: 1.8.0
>Reporter: Chesnay Schepler
>Assignee: vinoyang
>Priority: Major
> Fix For: 1.8.0
>
>
> Various tests fail on Windows because the BlobServer attempts to move a file 
> while a {{FileOutputStream}} is still open:
> BlobServer#putInputStream():
> {code}
> try (FileOutputStream fos = new FileOutputStream(incomingFile)) {
>   [ ... use fos ... ]
>   // moves file even though fos is still open
>   blobKey = moveTempFileToStore(incomingFile, jobId, md.digest(), 
> blobType);
>   return blobKey;
> } finally {
>   ...
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Issue Comment Deleted] (FLINK-10884) Flink on yarn TM container will be killed by nodemanager because of the exceeded physical memory.

2019-03-07 Thread chunpinghe (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10884?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

chunpinghe updated FLINK-10884:
---
Comment: was deleted

(was: what's your solution?

yarn will check the physical memory used by container by default, you can 
disable it by set {color:#6a8759}yarn.nodemanager.pmem-check-enabled 
{color:#33}to false. in your example, if your container use too much 
offheap memory(directory memory , or jni malloc) lead to total memory exceeds 
3g then the container will be killed anyhow.{color}
{color}

{color:#6a8759}{color:#33}so, if your container was always killed by 
nodemanager you shoud check if the total memory you provided for it is not 
sufficient or your code has memory leak (mainly native memory 
leak){color}{color}

 

 )

> Flink on yarn  TM container will be killed by nodemanager because of  the 
> exceeded  physical memory.
> 
>
> Key: FLINK-10884
> URL: https://issues.apache.org/jira/browse/FLINK-10884
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / YARN, Runtime / Coordination
>Affects Versions: 1.5.5, 1.6.2, 1.7.0
> Environment: version  : 1.6.2 
> module : flink on yarn
> centos  jdk1.8
> hadoop 2.7
>Reporter: wgcn
>Assignee: wgcn
>Priority: Major
>  Labels: pull-request-available, yarn
>
> TM container will be killed by nodemanager because of  the exceeded  
> [physical|http://www.baidu.com/link?url=Y4LyfMDH59n9-Ey16Fo6EFAYltN1e9anB3y2ynhVmdvuIBCkJGdH0hTExKDZRvXNr6hqhwIXs8JjYqesYbx0BOpQDD0o1VjbVQlOC-9MgXi]
>  memory. I found the lanuch context   lanuching TM container  that  
> "container memory =   heap memory+ offHeapSizeMB"  at the class 
> org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters   
> from line 160 to 166  I set a safety margin for the whole memory container 
> using. For example  if the container  limit 3g  memory,  the sum memory that  
>  "heap memory+ offHeapSizeMB"  is equal to  2.4g to prevent the container 
> being killed.Do we have the 
> [ready-made|http://www.baidu.com/link?url=ylC8cEafGU6DWAdU9ADcJPNugkjbx6IjtqIIxJ9foX4_Yfgc7ctWmpEpQRettVmBiOy7Wfph7S1UvN5LiJj-G1Rsb--oDw4Z2OEbA5Fj0bC]
>  solution  or I can commit my solution



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11860) Remove all the usage of deprecated unit-provided memory options in docs and scripts

2019-03-07 Thread Yun Tang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11860?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16787629#comment-16787629
 ] 

Yun Tang commented on FLINK-11860:
--

This issue is inspired by user mail  
https://lists.apache.org/thread.html/c130622cac22cec96106b5770ad8ec2453f85badf0dd1d00cd9aa420@%3Cuser.flink.apache.org%3E

> Remove all the usage of deprecated unit-provided memory options in docs and 
> scripts
> ---
>
> Key: FLINK-11860
> URL: https://issues.apache.org/jira/browse/FLINK-11860
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / Scripts, Documentation
>Reporter: Yun Tang
>Assignee: Yun Tang
>Priority: Major
> Fix For: 1.9.0
>
>
> Currently, options with unit provided ,e.g. {{jobmanager.heap.mb}} and 
> {{taskmanager.heap.mb}} have already been deprecated. However, these options 
> are still showed in documentation and deployment scripts. We should remove 
> these to not confuse users.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] JingsongLi commented on a change in pull request #7913: [FLINK-11837][table-runtime-blink] Improve internal data format

2019-03-07 Thread GitBox
JingsongLi commented on a change in pull request #7913: 
[FLINK-11837][table-runtime-blink] Improve internal data format
URL: https://github.com/apache/flink/pull/7913#discussion_r263688467
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/JoinedRow.java
 ##
 @@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat;
+
+/**
+ * Join two row to one row.
+ */
+public final class JoinedRow implements BaseRow {
+
+   private BaseRow row1;
+   private BaseRow row2;
+   private byte header;
+
+   public JoinedRow() {}
+
+   public JoinedRow(BaseRow row1, BaseRow row2) {
+   this.row1 = row1;
+   this.row2 = row2;
+   }
+
+   public JoinedRow replace(BaseRow row1, BaseRow row2) {
+   this.row1 = row1;
+   this.row2 = row2;
+   return this;
+   }
+
+   @Override
+   public int getArity() {
+   return row1.getArity() + row2.getArity();
+   }
+
+   @Override
+   public byte getHeader() {
+   return header;
+   }
+
+   @Override
+   public void setHeader(byte header) {
+   this.header = header;
+   }
+
+   @Override
+   public boolean isNullAt(int i) {
+   if (i < row1.getArity()) {
+   return row1.isNullAt(i);
+   } else {
+   return row2.isNullAt(i - row1.getArity());
+   }
+   }
+
+   @Override
+   public boolean getBoolean(int i) {
+   if (i < row1.getArity()) {
+   return row1.getBoolean(i);
+   } else {
+   return row2.getBoolean(i - row1.getArity());
+   }
+   }
+
+   @Override
+   public byte getByte(int i) {
+   if (i < row1.getArity()) {
+   return row1.getByte(i);
+   } else {
+   return row2.getByte(i - row1.getArity());
+   }
+   }
+
+   @Override
+   public short getShort(int i) {
+   if (i < row1.getArity()) {
+   return row1.getShort(i);
+   } else {
+   return row2.getShort(i - row1.getArity());
+   }
+   }
+
+   @Override
+   public int getInt(int i) {
+   if (i < row1.getArity()) {
+   return row1.getInt(i);
+   } else {
+   return row2.getInt(i - row1.getArity());
+   }
+   }
+
+   @Override
+   public long getLong(int i) {
+   if (i < row1.getArity()) {
+   return row1.getLong(i);
+   } else {
+   return row2.getLong(i - row1.getArity());
+   }
+   }
+
+   @Override
+   public float getFloat(int i) {
+   if (i < row1.getArity()) {
+   return row1.getFloat(i);
+   } else {
+   return row2.getFloat(i - row1.getArity());
+   }
+   }
+
+   @Override
+   public double getDouble(int i) {
+   if (i < row1.getArity()) {
+   return row1.getDouble(i);
+   } else {
+   return row2.getDouble(i - row1.getArity());
+   }
+   }
+
+   @Override
+   public char getChar(int i) {
+   if (i < row1.getArity()) {
+   return row1.getChar(i);
+   } else {
+   return row2.getChar(i - row1.getArity());
+   }
+   }
+
+   @Override
+   public Decimal getDecimal(int i, int precision, int scale) {
+   if (i < row1.getArity()) {
+   return row1.getDecimal(i, precision, scale);
+   } else {
+   return row2.getDecimal(i - row1.getArity(), precision, 
scale);
+   }
+   }
+
+   @Override
+   public  BinaryGeneric getGeneric(int i) {
+   if (i < row1.getArity()) {
+   retu

[GitHub] [flink] JingsongLi commented on a change in pull request #7941: [FLINK-11858][table-runtime-blink] Introduce block compression to batch table runtime

2019-03-07 Thread GitBox
JingsongLi commented on a change in pull request #7941: 
[FLINK-11858][table-runtime-blink] Introduce block compression to batch table 
runtime
URL: https://github.com/apache/flink/pull/7941#discussion_r263676627
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/compression/Lz4BlockDecompressor.java
 ##
 @@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.compression;
+
+import net.jpountz.lz4.LZ4Exception;
+import net.jpountz.lz4.LZ4Factory;
+import net.jpountz.lz4.LZ4FastDecompressor;
+import net.jpountz.util.SafeUtils;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Decode data written with {@link Lz4BlockCompressor}.
+ * It reads from and writes to byte arrays provided from the outside, thus 
reducing copy time.
+ * 
+ * This class is copied and modified from {@link 
net.jpountz.lz4.LZ4BlockInputStream}.
+ */
+public class Lz4BlockDecompressor implements BlockDecompressor {
+
+   private final LZ4FastDecompressor decompressor;
+
+   public Lz4BlockDecompressor() {
+   this.decompressor = 
LZ4Factory.fastestInstance().fastDecompressor();
+   }
+
+   @Override
+   public int decompress(ByteBuffer src, int srcOff, int srcLen, 
ByteBuffer dst, int dstOff)
+   throws DataCorruptionException {
+   final int prevSrcOff = src.position() + srcOff;
+   final int prevDstOff = dst.position() + dstOff;
+
+   final int compressedLen = src.getInt(prevSrcOff);
+   final int originalLen = src.getInt(prevSrcOff + 4);
+   validateLength(compressedLen, originalLen);
+
+   if (dst.capacity() - prevDstOff < originalLen) {
+   throw new InsufficientBufferException("Buffer length 
too small");
+   }
+
+   if (src.limit() - prevSrcOff - 
Lz4BlockCompressionFactory.HEADER_LENGTH < compressedLen) {
 
 Review comment:
   import Lz4BlockCompressionFactory.HEADER_LENGTH?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #7941: [FLINK-11858][table-runtime-blink] Introduce block compression to batch table runtime

2019-03-07 Thread GitBox
JingsongLi commented on a change in pull request #7941: 
[FLINK-11858][table-runtime-blink] Introduce block compression to batch table 
runtime
URL: https://github.com/apache/flink/pull/7941#discussion_r263687636
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/compression/BlockCompressionTest.java
 ##
 @@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.compression;
+
+import org.junit.Test;
+import sun.misc.Cleaner;
+import sun.nio.ch.DirectBuffer;
+
+import java.nio.ByteBuffer;
+
+import static org.junit.Assert.assertEquals;
+
+public class BlockCompressionTest {
+
+   @Test
+   public void testLz4() {
 
 Review comment:
   test DataCorruptionException & InsufficientBufferException?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #7941: [FLINK-11858][table-runtime-blink] Introduce block compression to batch table runtime

2019-03-07 Thread GitBox
JingsongLi commented on a change in pull request #7941: 
[FLINK-11858][table-runtime-blink] Introduce block compression to batch table 
runtime
URL: https://github.com/apache/flink/pull/7941#discussion_r263676219
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/compression/BlockCompressionFactory.java
 ##
 @@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.compression;
+
+import org.apache.flink.configuration.Configuration;
+
+/**
+ * Each compression codec has a implementation of {@link 
BlockCompressionFactory}
+ * to create compressors and decompressors.
+ */
+public interface BlockCompressionFactory {
+
+   void setConfiguration(Configuration configuration);
 
 Review comment:
   I don't know what this is for. Remove it?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on a change in pull request #7913: [FLINK-11837][table-runtime-blink] Improve internal data format

2019-03-07 Thread GitBox
KurtYoung commented on a change in pull request #7913: 
[FLINK-11837][table-runtime-blink] Improve internal data format
URL: https://github.com/apache/flink/pull/7913#discussion_r263687900
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/JoinedRow.java
 ##
 @@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat;
+
+/**
+ * Join two row to one row.
+ */
+public final class JoinedRow implements BaseRow {
+
+   private BaseRow row1;
+   private BaseRow row2;
+   private byte header;
+
+   public JoinedRow() {}
+
+   public JoinedRow(BaseRow row1, BaseRow row2) {
+   this.row1 = row1;
+   this.row2 = row2;
+   }
+
+   public JoinedRow replace(BaseRow row1, BaseRow row2) {
+   this.row1 = row1;
+   this.row2 = row2;
+   return this;
+   }
+
+   @Override
+   public int getArity() {
+   return row1.getArity() + row2.getArity();
+   }
+
+   @Override
+   public byte getHeader() {
+   return header;
+   }
+
+   @Override
+   public void setHeader(byte header) {
+   this.header = header;
+   }
+
+   @Override
+   public boolean isNullAt(int i) {
+   if (i < row1.getArity()) {
+   return row1.isNullAt(i);
+   } else {
+   return row2.isNullAt(i - row1.getArity());
+   }
+   }
+
+   @Override
+   public boolean getBoolean(int i) {
+   if (i < row1.getArity()) {
+   return row1.getBoolean(i);
+   } else {
+   return row2.getBoolean(i - row1.getArity());
+   }
+   }
+
+   @Override
+   public byte getByte(int i) {
+   if (i < row1.getArity()) {
+   return row1.getByte(i);
+   } else {
+   return row2.getByte(i - row1.getArity());
+   }
+   }
+
+   @Override
+   public short getShort(int i) {
+   if (i < row1.getArity()) {
+   return row1.getShort(i);
+   } else {
+   return row2.getShort(i - row1.getArity());
+   }
+   }
+
+   @Override
+   public int getInt(int i) {
+   if (i < row1.getArity()) {
+   return row1.getInt(i);
+   } else {
+   return row2.getInt(i - row1.getArity());
+   }
+   }
+
+   @Override
+   public long getLong(int i) {
+   if (i < row1.getArity()) {
+   return row1.getLong(i);
+   } else {
+   return row2.getLong(i - row1.getArity());
+   }
+   }
+
+   @Override
+   public float getFloat(int i) {
+   if (i < row1.getArity()) {
+   return row1.getFloat(i);
+   } else {
+   return row2.getFloat(i - row1.getArity());
+   }
+   }
+
+   @Override
+   public double getDouble(int i) {
+   if (i < row1.getArity()) {
+   return row1.getDouble(i);
+   } else {
+   return row2.getDouble(i - row1.getArity());
+   }
+   }
+
+   @Override
+   public char getChar(int i) {
+   if (i < row1.getArity()) {
+   return row1.getChar(i);
+   } else {
+   return row2.getChar(i - row1.getArity());
+   }
+   }
+
+   @Override
+   public Decimal getDecimal(int i, int precision, int scale) {
+   if (i < row1.getArity()) {
+   return row1.getDecimal(i, precision, scale);
+   } else {
+   return row2.getDecimal(i - row1.getArity(), precision, 
scale);
+   }
+   }
+
+   @Override
+   public  BinaryGeneric getGeneric(int i) {
+   if (i < row1.getArity()) {
+   retur

[GitHub] [flink] JingsongLi commented on a change in pull request #7941: [FLINK-11858][table-runtime-blink] Introduce block compression to batch table runtime

2019-03-07 Thread GitBox
JingsongLi commented on a change in pull request #7941: 
[FLINK-11858][table-runtime-blink] Introduce block compression to batch table 
runtime
URL: https://github.com/apache/flink/pull/7941#discussion_r263675547
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/compression/Lz4BlockCompressor.java
 ##
 @@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.compression;
+
+import net.jpountz.lz4.LZ4Compressor;
+import net.jpountz.lz4.LZ4Exception;
+import net.jpountz.lz4.LZ4Factory;
+
+import java.nio.BufferOverflowException;
+import java.nio.ByteBuffer;
+
+/**
+ * Encode data into LZ4 format (not compatible with the LZ4 Frame format).
+ * It reads from and writes to byte arrays provided from the outside, thus 
reducing copy time.
+ * 
+ * This class is copied and modified from {@link 
net.jpountz.lz4.LZ4BlockOutputStream}.
+ */
+public class Lz4BlockCompressor implements BlockCompressor {
+
+   private final LZ4Compressor compressor;
+
+   public Lz4BlockCompressor() {
+   this.compressor = LZ4Factory.fastestInstance().fastCompressor();
+   }
+
+   @Override
+   public int getMaxCompressedSize(int srcSize) {
+   return Lz4BlockCompressionFactory.HEADER_LENGTH + 
compressor.maxCompressedLength(srcSize);
 
 Review comment:
   import Lz4BlockCompressionFactory.HEADER_LENGTH?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on a change in pull request #7913: [FLINK-11837][table-runtime-blink] Improve internal data format

2019-03-07 Thread GitBox
KurtYoung commented on a change in pull request #7913: 
[FLINK-11837][table-runtime-blink] Improve internal data format
URL: https://github.com/apache/flink/pull/7913#discussion_r263687825
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/JoinedRow.java
 ##
 @@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat;
+
+/**
+ * Join two row to one row.
+ */
+public final class JoinedRow implements BaseRow {
+
+   private BaseRow row1;
+   private BaseRow row2;
+   private byte header;
+
+   public JoinedRow() {}
+
+   public JoinedRow(BaseRow row1, BaseRow row2) {
+   this.row1 = row1;
+   this.row2 = row2;
+   }
+
+   public JoinedRow replace(BaseRow row1, BaseRow row2) {
+   this.row1 = row1;
+   this.row2 = row2;
+   return this;
+   }
+
+   @Override
+   public int getArity() {
+   return row1.getArity() + row2.getArity();
+   }
+
+   @Override
+   public byte getHeader() {
+   return header;
+   }
+
+   @Override
+   public void setHeader(byte header) {
+   this.header = header;
+   }
+
+   @Override
+   public boolean isNullAt(int i) {
+   if (i < row1.getArity()) {
+   return row1.isNullAt(i);
+   } else {
+   return row2.isNullAt(i - row1.getArity());
+   }
+   }
+
+   @Override
+   public boolean getBoolean(int i) {
+   if (i < row1.getArity()) {
+   return row1.getBoolean(i);
+   } else {
+   return row2.getBoolean(i - row1.getArity());
+   }
+   }
+
+   @Override
+   public byte getByte(int i) {
+   if (i < row1.getArity()) {
+   return row1.getByte(i);
+   } else {
+   return row2.getByte(i - row1.getArity());
+   }
+   }
+
+   @Override
+   public short getShort(int i) {
+   if (i < row1.getArity()) {
+   return row1.getShort(i);
+   } else {
+   return row2.getShort(i - row1.getArity());
+   }
+   }
+
+   @Override
+   public int getInt(int i) {
+   if (i < row1.getArity()) {
+   return row1.getInt(i);
+   } else {
+   return row2.getInt(i - row1.getArity());
+   }
+   }
+
+   @Override
+   public long getLong(int i) {
+   if (i < row1.getArity()) {
+   return row1.getLong(i);
+   } else {
+   return row2.getLong(i - row1.getArity());
+   }
+   }
+
+   @Override
+   public float getFloat(int i) {
+   if (i < row1.getArity()) {
+   return row1.getFloat(i);
+   } else {
+   return row2.getFloat(i - row1.getArity());
+   }
+   }
+
+   @Override
+   public double getDouble(int i) {
+   if (i < row1.getArity()) {
+   return row1.getDouble(i);
+   } else {
+   return row2.getDouble(i - row1.getArity());
+   }
+   }
+
+   @Override
+   public char getChar(int i) {
+   if (i < row1.getArity()) {
+   return row1.getChar(i);
+   } else {
+   return row2.getChar(i - row1.getArity());
+   }
+   }
+
+   @Override
+   public Decimal getDecimal(int i, int precision, int scale) {
+   if (i < row1.getArity()) {
+   return row1.getDecimal(i, precision, scale);
+   } else {
+   return row2.getDecimal(i - row1.getArity(), precision, 
scale);
+   }
+   }
+
+   @Override
+   public  BinaryGeneric getGeneric(int i) {
+   if (i < row1.getArity()) {
+   retur

[GitHub] [flink] QiLuo-BD commented on issue #7938: [FLINK-10941] Keep slots which contain unconsumed result partitions (on top of #7186)

2019-03-07 Thread GitBox
QiLuo-BD commented on issue #7938: [FLINK-10941] Keep slots which contain 
unconsumed result partitions (on top of #7186)
URL: https://github.com/apache/flink/pull/7938#issuecomment-470832337
 
 
   LGTM


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Assigned] (FLINK-11860) Remove all the usage of deprecated unit-provided memory options in docs and scripts

2019-03-07 Thread Yun Tang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11860?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang reassigned FLINK-11860:


Assignee: Yun Tang

> Remove all the usage of deprecated unit-provided memory options in docs and 
> scripts
> ---
>
> Key: FLINK-11860
> URL: https://issues.apache.org/jira/browse/FLINK-11860
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / Scripts, Documentation
>Reporter: Yun Tang
>Assignee: Yun Tang
>Priority: Major
> Fix For: 1.9.0
>
>
> Currently, options with unit provided ,e.g. {{jobmanager.heap.mb}} and 
> {{taskmanager.heap.mb}} have already been deprecated. However, these options 
> are still showed in documentation and deployment scripts. We should remove 
> these to not confuse users.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11860) Remove all the usage of deprecated unit-provided memory options in docs and scripts

2019-03-07 Thread Yun Tang (JIRA)
Yun Tang created FLINK-11860:


 Summary: Remove all the usage of deprecated unit-provided memory 
options in docs and scripts
 Key: FLINK-11860
 URL: https://issues.apache.org/jira/browse/FLINK-11860
 Project: Flink
  Issue Type: Improvement
  Components: Deployment / Scripts, Documentation
Reporter: Yun Tang
 Fix For: 1.9.0


Currently, options with unit provided ,e.g. {{jobmanager.heap.mb}} and 
{{taskmanager.heap.mb}} have already been deprecated. However, these options 
are still showed in documentation and deployment scripts. We should remove 
these to not confuse users.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] KurtYoung commented on issue #7941: [FLINK-11858][table-runtime-blink] Introduce block compression to batch table runtime

2019-03-07 Thread GitBox
KurtYoung commented on issue #7941: [FLINK-11858][table-runtime-blink] 
Introduce block compression to batch table runtime
URL: https://github.com/apache/flink/pull/7941#issuecomment-470829598
 
 
   cc @JingsongLi 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] klion26 edited a comment on issue #7921: [FLINK-11825][StateBackends] Resolve name class of StateTTL TimeCharacteristic class

2019-03-07 Thread GitBox
klion26 edited a comment on issue #7921: [FLINK-11825][StateBackends] Resolve 
name class of StateTTL TimeCharacteristic class
URL: https://github.com/apache/flink/pull/7921#issuecomment-470779712
 
 
   @azagrebin thank you for your review, I've updated the code, PTAL when you 
have time. I did not find any reference of 
`org.apache.flink.api.common.state.StateTtlConfig.TimeCharacteristic` in 
`docs/dev/stream/state/state.md`, do I miss something?
   
   hi @rmetzger seems flinkbot is off-line, azagrebin approved all, but 
flinkbot didn't respond. And I found that the problem exists on the other prs 
also. #7935  #7938 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-11859) Improve SpanningRecordSerializer performance by serializing record length to serialization buffer directly

2019-03-07 Thread Yingjie Cao (JIRA)
Yingjie Cao created FLINK-11859:
---

 Summary: Improve SpanningRecordSerializer performance by 
serializing record length to serialization buffer directly
 Key: FLINK-11859
 URL: https://issues.apache.org/jira/browse/FLINK-11859
 Project: Flink
  Issue Type: Improvement
Reporter: Yingjie Cao
Assignee: Yingjie Cao


In the current implementation of SpanningRecordSerializer, the length of a 
record is serialized to an intermediate length buffer and then copied to the 
target buffer. Actually, the length filed can be serialized directly to the 
data buffer (serializationBuffer), which can avoid the copy of length buffer. 
Though the total bytes copied remain unchanged, it one copy of a small record 
which incurs high overhead. The flink-benchmarks shows it can improve 
performance and the test results are as follows.

Result with the optimization:
|Benchmark|Mode|Threads|Samples|Score|Score Error (99.9%)|Unit|Param: 
channelsFlushTimeout|Param: stateBackend|
|KeyByBenchmarks.arrayKeyBy|thrpt|1|30|2228.049605|77.631804|ops/ms| | |
|KeyByBenchmarks.tupleKeyBy|thrpt|1|30|3968.361739|193.501755|ops/ms| | |
|MemoryStateBackendBenchmark.stateBackends|thrpt|1|30|3030.016702|29.272713|ops/ms|
 |MEMORY|
|MemoryStateBackendBenchmark.stateBackends|thrpt|1|30|2754.77678|26.215395|ops/ms|
 |FS|
|MemoryStateBackendBenchmark.stateBackends|thrpt|1|30|3001.957606|29.288019|ops/ms|
 |FS_ASYNC|
|RocksStateBackendBenchmark.stateBackends|thrpt|1|30|123.698984|3.339233|ops/ms|
 |ROCKS|
|RocksStateBackendBenchmark.stateBackends|thrpt|1|30|126.252137|1.137735|ops/ms|
 |ROCKS_INC|
|SerializationFrameworkMiniBenchmarks.serializerAvro|thrpt|1|30|323.658098|5.855697|ops/ms|
 | |
|SerializationFrameworkMiniBenchmarks.serializerKryo|thrpt|1|30|183.34423|3.710787|ops/ms|
 | |
|SerializationFrameworkMiniBenchmarks.serializerPojo|thrpt|1|30|404.380233|5.131744|ops/ms|
 | |
|SerializationFrameworkMiniBenchmarks.serializerRow|thrpt|1|30|527.193369|10.176726|ops/ms|
 | |
|SerializationFrameworkMiniBenchmarks.serializerTuple|thrpt|1|30|550.073024|11.724412|ops/ms|
 | |
|StreamNetworkBroadcastThroughputBenchmarkExecutor.networkBroadcastThroughput|thrpt|1|30|564.690627|13.766809|ops/ms|
 | |
|StreamNetworkThroughputBenchmarkExecutor.networkThroughput|thrpt|1|30|49918.11806|2324.234776|ops/ms|100,100ms|
 |
|StreamNetworkThroughputBenchmarkExecutor.networkThroughput|thrpt|1|30|10443.63491|315.835962|ops/ms|100,100ms,SSL|
 |
|StreamNetworkThroughputBenchmarkExecutor.networkThroughput|thrpt|1|30|21387.47608|2779.832704|ops/ms|1000,1ms|
 |
|StreamNetworkThroughputBenchmarkExecutor.networkThroughput|thrpt|1|30|26585.85453|860.243347|ops/ms|1000,100ms|
 |
|StreamNetworkThroughputBenchmarkExecutor.networkThroughput|thrpt|1|30|8252.563405|947.129028|ops/ms|1000,100ms,SSL|
 |
|SumLongsBenchmark.benchmarkCount|thrpt|1|30|8806.021402|263.995836|ops/ms| | |
|WindowBenchmarks.globalWindow|thrpt|1|30|4573.620126|112.099391|ops/ms| | |
|WindowBenchmarks.sessionWindow|thrpt|1|30|585.246412|7.026569|ops/ms| | |
|WindowBenchmarks.slidingWindow|thrpt|1|30|449.302134|4.123669|ops/ms| | |
|WindowBenchmarks.tumblingWindow|thrpt|1|30|2979.806858|33.818909|ops/ms| | |
|StreamNetworkLatencyBenchmarkExecutor.networkLatency1to1|avgt|1|30|12.842865|0.13796|ms/op|
 | |

Result without the optimization:

 
|Benchmark|Mode|Threads|Samples|Score|Score Error (99.9%)|Unit|Param: 
channelsFlushTimeout|Param: stateBackend|
|KeyByBenchmarks.arrayKeyBy|thrpt|1|30|2060.241715|59.898485|ops/ms| | |
|KeyByBenchmarks.tupleKeyBy|thrpt|1|30|3645.306819|223.821719|ops/ms| | |
|MemoryStateBackendBenchmark.stateBackends|thrpt|1|30|2992.698822|36.978115|ops/ms|
 |MEMORY|
|MemoryStateBackendBenchmark.stateBackends|thrpt|1|30|2756.10949|27.798937|ops/ms|
 |FS|
|MemoryStateBackendBenchmark.stateBackends|thrpt|1|30|2965.969876|44.159793|ops/ms|
 |FS_ASYNC|
|RocksStateBackendBenchmark.stateBackends|thrpt|1|30|125.506942|1.245978|ops/ms|
 |ROCKS|
|RocksStateBackendBenchmark.stateBackends|thrpt|1|30|127.258737|1.190588|ops/ms|
 |ROCKS_INC|
|SerializationFrameworkMiniBenchmarks.serializerAvro|thrpt|1|30|316.497954|8.309241|ops/ms|
 | |
|SerializationFrameworkMiniBenchmarks.serializerKryo|thrpt|1|30|189.065149|6.302073|ops/ms|
 | |
|SerializationFrameworkMiniBenchmarks.serializerPojo|thrpt|1|30|391.51305|7.750728|ops/ms|
 | |
|SerializationFrameworkMiniBenchmarks.serializerRow|thrpt|1|30|513.611151|10.640899|ops/ms|
 | |
|SerializationFrameworkMiniBenchmarks.serializerTuple|thrpt|1|30|534.184947|14.370082|ops/ms|
 | |
|StreamNetworkBroadcastThroughputBenchmarkExecutor.networkBroadcastThroughput|thrpt|1|30|483.388618|19.506723|ops/ms|
 | |
|StreamNetworkThroughputBenchmarkExecutor.networkThroughput|thrpt|1|30|42777.70615|4981.87539|ops/ms|100,100ms|
 |
|StreamNetworkThroughputBenchmarkExecutor.networkThroughput|thrpt|1|30|10201.48525|286.248845|ops/ms|100,100ms,SSL|
 |
|StreamNetworkThr

[jira] [Updated] (FLINK-11696) [checkpoint] Avoid to send mkdir requests to DFS from task side

2019-03-07 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11696?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-11696:
---
Labels: pull-request-available  (was: )

> [checkpoint] Avoid to send mkdir requests to DFS from task side
> ---
>
> Key: FLINK-11696
> URL: https://issues.apache.org/jira/browse/FLINK-11696
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Reporter: Yun Tang
>Assignee: Yun Tang
>Priority: Major
>  Labels: pull-request-available
>
> Currently, when we create checkpoint directory in the distributed file 
> system. Not only {{CheckpointCoordinator}} but also {{FsCheckpointStorage}} 
> in {{StreamTask}} would create the {{checkpointsDirectory}}, 
> {{sharedStateDirectory}} and {{taskOwnedStateDirectory}}. These many 
> {{mkdir}} RPC requests would cause a very high pressure on the distributed 
> file system, especially when the parallelism is large or jobs continue to 
> failover again and again.
> We could avoid these {{mkdir}} requests from task side if writing to a 
> distributed file system.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] Myasuka opened a new pull request #7942: [FLINK-11696][checkpoint] Avoid to send mkdir requests to DFS from task side

2019-03-07 Thread GitBox
Myasuka opened a new pull request #7942: [FLINK-11696][checkpoint] Avoid to 
send mkdir requests to DFS from task side
URL: https://github.com/apache/flink/pull/7942
 
 
   ## What is the purpose of the change
   Previously, not only checkpoint coordinator but also tasks would always send 
those `mkdir` requests out when initialization. This would actually put a lot 
of pressure to master of distributed file system.
   This pull request avoids to call file system's `mkdir` when creating 
`CheckpointStorage`, but call it only once when checkpoint coordinator 
`initializeLocationForCheckpoint`. 
   
   ## Brief change log
   
 - Move the `mkdir` operations lazily to first 
`initializeLocationForCheckpoint` instead of each time when creating the 
`CheckpointStorage`.
   
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
 - refactor `testStorageLocationMkdirs` to verify checkpoint directories 
would be created once `initializeLocationForCheckpoint` instead of each time 
creating the `CheckpointStorage`.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): **no**
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: **no**
 - The serializers: **no**
 - The runtime per-record code paths (performance sensitive): **no**
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: **no**
 - The S3 file system connector: **no**
   
   ## Documentation
   
 - Does this pull request introduce a new feature? **no**
 - If yes, how is the feature documented? **not applicable**
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #7942: [FLINK-11696][checkpoint] Avoid to send mkdir requests to DFS from task side

2019-03-07 Thread GitBox
flinkbot commented on issue #7942: [FLINK-11696][checkpoint] Avoid to send 
mkdir requests to DFS from task side
URL: https://github.com/apache/flink/pull/7942#issuecomment-470819951
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-11858) Introduce block compressor/decompressor for batch table runtime

2019-03-07 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11858?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-11858:
---
Labels: pull-request-available  (was: )

> Introduce block compressor/decompressor for batch table runtime
> ---
>
> Key: FLINK-11858
> URL: https://issues.apache.org/jira/browse/FLINK-11858
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Operators
>Reporter: Kurt Young
>Assignee: Kurt Young
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] flinkbot commented on issue #7941: [FLINK-11858][table-runtime-blink] Introduce block compression to batch table runtime

2019-03-07 Thread GitBox
flinkbot commented on issue #7941: [FLINK-11858][table-runtime-blink] Introduce 
block compression to batch table runtime
URL: https://github.com/apache/flink/pull/7941#issuecomment-470816714
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung opened a new pull request #7941: [FLINK-11858][table-runtime-blink] Introduce block compression to batch table runtime

2019-03-07 Thread GitBox
KurtYoung opened a new pull request #7941: [FLINK-11858][table-runtime-blink] 
Introduce block compression to batch table runtime
URL: https://github.com/apache/flink/pull/7941
 
 
   
   
   
   ## What is the purpose of the change
   
   Introduce BlockCompressor and BlockDecompressor to batch table runtime for 
future usage.
   
   ## Brief change log
   
   - Add BlockCompressor interface
   - Add BlockDecompressor interface
   - Add LZ4 implementation based on the interfaces
   
   ## Verifying this change
   
   Added unit tests
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes, lz4-java is 
introduced which is Apache license)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes)
 - If yes, how is the feature documented? (JavaDocs)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #7940: [hotfix][docs] fix error in functions example

2019-03-07 Thread GitBox
flinkbot commented on issue #7940: [hotfix][docs] fix error in functions 
example 
URL: https://github.com/apache/flink/pull/7940#issuecomment-470805294
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] leesf opened a new pull request #7940: [hotfix][docs] fix error in functions example

2019-03-07 Thread GitBox
leesf opened a new pull request #7940: [hotfix][docs] fix error in functions 
example 
URL: https://github.com/apache/flink/pull/7940
 
 
fix error in functions example.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #7913: [FLINK-11837][table-runtime-blink] Improve internal data format

2019-03-07 Thread GitBox
wuchong commented on a change in pull request #7913: 
[FLINK-11837][table-runtime-blink] Improve internal data format
URL: https://github.com/apache/flink/pull/7913#discussion_r263663630
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BinaryGenericTypeInfo.java
 ##
 @@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.typeutils;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.table.dataformat.BinaryGeneric;
+import org.apache.flink.table.type.GenericType;
+
+/**
+ * TypeInfo for BaseArray.
 
 Review comment:
   for BinaryGeneric ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #7913: [FLINK-11837][table-runtime-blink] Improve internal data format

2019-03-07 Thread GitBox
wuchong commented on a change in pull request #7913: 
[FLINK-11837][table-runtime-blink] Improve internal data format
URL: https://github.com/apache/flink/pull/7913#discussion_r263663580
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BinaryGenericSerializer.java
 ##
 @@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.typeutils;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
+import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.table.dataformat.BinaryGeneric;
+import org.apache.flink.table.dataformat.Decimal;
+import org.apache.flink.table.type.GenericType;
+import org.apache.flink.table.util.SegmentsUtil;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+
+/**
+ * Serializer for {@link Decimal}.
 
 Review comment:
   for BinaryGeneric ? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-11858) Introduce block compressor/decompressor for batch table runtime

2019-03-07 Thread Kurt Young (JIRA)
Kurt Young created FLINK-11858:
--

 Summary: Introduce block compressor/decompressor for batch table 
runtime
 Key: FLINK-11858
 URL: https://issues.apache.org/jira/browse/FLINK-11858
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Operators
Reporter: Kurt Young
Assignee: Kurt Young






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11857) Introduce BinaryExternalSorter to batch table runtime

2019-03-07 Thread Jingsong Lee (JIRA)
Jingsong Lee created FLINK-11857:


 Summary: Introduce BinaryExternalSorter to batch table runtime
 Key: FLINK-11857
 URL: https://issues.apache.org/jira/browse/FLINK-11857
 Project: Flink
  Issue Type: New Feature
  Components: Runtime / Operators
 Environment: We need a sorter to take full advantage of the high 
performance of the Binary format.
Reporter: Jingsong Lee
Assignee: Jingsong Lee






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] TisonKun edited a comment on issue #7927: [FLINK-11603][metrics] Port the MetricQueryService to the new RpcEndpoint

2019-03-07 Thread GitBox
TisonKun edited a comment on issue #7927: [FLINK-11603][metrics] Port the 
MetricQueryService to the new RpcEndpoint
URL: https://github.com/apache/flink/pull/7927#issuecomment-470792906
 
 
   travis failed on
   
   ```
   02:03:48.550 [ERROR] Failures: 
   02:03:48.550 [ERROR]   YARNHighAvailabilityITCase>YarnTestBase.sleep:236 
There is at least one application on the cluster that is not finished.[App 
application_1552010113856_0001 is in state RUNNING.]
   02:03:48.550 [ERROR] Errors: 
   02:03:48.551 [ERROR] 
org.apache.flink.yarn.YARNHighAvailabilityITCase.testJobRecoversAfterKillingTaskManager(org.apache.flink.yarn.YARNHighAvailabilityITCase)
   02:03:48.551 [ERROR]   Run 1: 
YARNHighAvailabilityITCase.testJobRecoversAfterKillingTaskManager:193->waitUntilJobIsRestarted:336
 » IllegalArgument
   02:03:48.551 [ERROR]   Run 2: 
YARNHighAvailabilityITCase>YarnTestBase.sleep:236 There is at least one 
application on the cluster that is not finished.[App 
application_1552010113856_0001 is in state RUNNING.]
   02:03:48.551 [INFO] 
   ```
   
   However, I cannot reproduce locally. Since the reported failing point is 
about querying metric it **might** be relevant but I see no chance to cause an 
`IllegalArgument` exception.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] TisonKun commented on issue #7927: [FLINK-11603][metrics] Port the MetricQueryService to the new RpcEndpoint

2019-03-07 Thread GitBox
TisonKun commented on issue #7927: [FLINK-11603][metrics] Port the 
MetricQueryService to the new RpcEndpoint
URL: https://github.com/apache/flink/pull/7927#issuecomment-470792906
 
 
   travis failed on
   
   ```
   02:03:48.550 [ERROR] Failures: 
   02:03:48.550 [ERROR]   YARNHighAvailabilityITCase>YarnTestBase.sleep:236 
There is at least one application on the cluster that is not finished.[App 
application_1552010113856_0001 is in state RUNNING.]
   02:03:48.550 [ERROR] Errors: 
   02:03:48.551 [ERROR] 
org.apache.flink.yarn.YARNHighAvailabilityITCase.testJobRecoversAfterKillingTaskManager(org.apache.flink.yarn.YARNHighAvailabilityITCase)
   02:03:48.551 [ERROR]   Run 1: 
YARNHighAvailabilityITCase.testJobRecoversAfterKillingTaskManager:193->waitUntilJobIsRestarted:336
 » IllegalArgument
   02:03:48.551 [ERROR]   Run 2: 
YARNHighAvailabilityITCase>YarnTestBase.sleep:236 There is at least one 
application on the cluster that is not finished.[App 
application_1552010113856_0001 is in state RUNNING.]
   02:03:48.551 [INFO] 
   ```
   
   However, I cannot reproduce locally. Since the reported failing point is 
about querying metric it **might** be relevant by I see no chance to cause an 
`IllegalArgument` exception.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #7931: [FLINK-11854] [table-planner-blink] Introduce batch physical nodes

2019-03-07 Thread GitBox
JingsongLi commented on a change in pull request #7931: [FLINK-11854] 
[table-planner-blink] Introduce batch physical nodes
URL: https://github.com/apache/flink/pull/7931#discussion_r263655776
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryRow.java
 ##
 @@ -63,6 +91,22 @@ public static int calculateFixPartSizeInBytes(int arity) {
return calculateBitSetWidthInBytes(arity) + 8 * arity;
}
 
+   /**
+* If it is a fixed-length field, we can call this BinaryRow's setXX 
method for in-place updates.
+* If it is variable-length field, can't use this method, because the 
underlying data is stored continuously.
+*/
+   public static boolean isFixedLength(InternalType type) {
+   if (type instanceof DecimalType) {
+   return ((DecimalType) type).precision() <= 
DecimalType.MAX_COMPACT_PRECISION;
+   } else {
+   return MUTABLE_FIELD_TYPES.contains(type);
+   }
+   }
+
+   public static boolean isMutable(InternalType type) {
+   return MUTABLE_FIELD_TYPES.contains(type) || type instanceof 
DecimalType;
 
 Review comment:
   No matter how precision is, it is mutable.
   Precision is compact, Decimal is stored in 8 bytes in fix-length-part.
   Precision is not compact, Decimal is stored in 16 bytes in var-length-part.
   It can always be update.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #7931: [FLINK-11854] [table-planner-blink] Introduce batch physical nodes

2019-03-07 Thread GitBox
JingsongLi commented on a change in pull request #7931: [FLINK-11854] 
[table-planner-blink] Introduce batch physical nodes
URL: https://github.com/apache/flink/pull/7931#discussion_r263655861
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryRow.java
 ##
 @@ -63,6 +91,22 @@ public static int calculateFixPartSizeInBytes(int arity) {
return calculateBitSetWidthInBytes(arity) + 8 * arity;
}
 
+   /**
+* If it is a fixed-length field, we can call this BinaryRow's setXX 
method for in-place updates.
+* If it is variable-length field, can't use this method, because the 
underlying data is stored continuously.
+*/
+   public static boolean isFixedLength(InternalType type) {
+   if (type instanceof DecimalType) {
+   return ((DecimalType) type).precision() <= 
DecimalType.MAX_COMPACT_PRECISION;
+   } else {
+   return MUTABLE_FIELD_TYPES.contains(type);
+   }
+   }
+
+   public static boolean isMutable(InternalType type) {
+   return MUTABLE_FIELD_TYPES.contains(type) || type instanceof 
DecimalType;
 
 Review comment:
   method isFixedLength should be isInFixedLengthPart


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #7931: [FLINK-11854] [table-planner-blink] Introduce batch physical nodes

2019-03-07 Thread GitBox
JingsongLi commented on a change in pull request #7931: [FLINK-11854] 
[table-planner-blink] Introduce batch physical nodes
URL: https://github.com/apache/flink/pull/7931#discussion_r263655776
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryRow.java
 ##
 @@ -63,6 +91,22 @@ public static int calculateFixPartSizeInBytes(int arity) {
return calculateBitSetWidthInBytes(arity) + 8 * arity;
}
 
+   /**
+* If it is a fixed-length field, we can call this BinaryRow's setXX 
method for in-place updates.
+* If it is variable-length field, can't use this method, because the 
underlying data is stored continuously.
+*/
+   public static boolean isFixedLength(InternalType type) {
+   if (type instanceof DecimalType) {
+   return ((DecimalType) type).precision() <= 
DecimalType.MAX_COMPACT_PRECISION;
+   } else {
+   return MUTABLE_FIELD_TYPES.contains(type);
+   }
+   }
+
+   public static boolean isMutable(InternalType type) {
+   return MUTABLE_FIELD_TYPES.contains(type) || type instanceof 
DecimalType;
 
 Review comment:
   No matter how precision is, it is mutable.
   Precision is compact, Decimal is stored in 8 bytes..
   Precision is not compact, Decimal is stored in 16 bytes.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-11856) Introduce BinaryHashTable and LongHashTable to batch table runtime

2019-03-07 Thread Kurt Young (JIRA)
Kurt Young created FLINK-11856:
--

 Summary: Introduce BinaryHashTable and LongHashTable to batch 
table runtime
 Key: FLINK-11856
 URL: https://issues.apache.org/jira/browse/FLINK-11856
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Operators
Reporter: Kurt Young
Assignee: Kurt Young






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] wuchong commented on issue #7923: [FLINK-11703][table-planner-blink] Introduce TableEnvironments and support registerDataStream and sqlQuery

2019-03-07 Thread GitBox
wuchong commented on issue #7923: [FLINK-11703][table-planner-blink] Introduce 
TableEnvironments and support registerDataStream and sqlQuery
URL: https://github.com/apache/flink/pull/7923#issuecomment-470792029
 
 
   Commits squashed and re-titled. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on issue #7913: [FLINK-11837][table-runtime-blink] Improve internal data format

2019-03-07 Thread GitBox
JingsongLi commented on issue #7913: [FLINK-11837][table-runtime-blink] Improve 
internal data format
URL: https://github.com/apache/flink/pull/7913#issuecomment-470791761
 
 
   @KurtYoung Thanks for your suggestion, I have fixed it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on issue #7186: [FLINK-10941] Keep slots which contain unconsumed result partitions

2019-03-07 Thread GitBox
zhijiangW commented on issue #7186: [FLINK-10941] Keep slots which contain 
unconsumed result partitions
URL: https://github.com/apache/flink/pull/7186#issuecomment-470780791
 
 
   Agree with @azagrebin 's suggestions for checking partition 
registered/released status in current 
`NetworkEnvironment#ResultPartitionManager` which is suitable for the plan of 
`ShuffleService`.
   
   And it would be better if we could provide the option to support two 
strategies for releasing TM (waiting consumer or not). I have no other 
concerns. :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] klion26 commented on a change in pull request #7895: [FLINK-11126][YARN][security] Filter out AMRMToken in TaskManager‘s credentials

2019-03-07 Thread GitBox
klion26 commented on a change in pull request #7895: 
[FLINK-11126][YARN][security] Filter out AMRMToken in TaskManager‘s credentials
URL: https://github.com/apache/flink/pull/7895#discussion_r263646747
 
 

 ##
 File path: flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java
 ##
 @@ -18,25 +18,54 @@
 
 package org.apache.flink.yarn;
 
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.CommonTestUtils;
+import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
 import org.apache.flink.util.TestLogger;
 
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.File;
 import java.nio.file.Files;
 import java.nio.file.Path;
+import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.stream.Stream;
 
 import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
 
 Review comment:
   @walterddr I'm not sure using mockito will introduce instability in the 
flink-yarn/flink-yarn-test package, I saw this before, but I can not remember 
which package it is.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] klion26 commented on issue #7921: [FLINK-11825][StateBackends] Resolve name class of StateTTL TimeCharacteristic class

2019-03-07 Thread GitBox
klion26 commented on issue #7921: [FLINK-11825][StateBackends] Resolve name 
class of StateTTL TimeCharacteristic class
URL: https://github.com/apache/flink/pull/7921#issuecomment-470779712
 
 
   @azagrebin thank you for your review, I've updated the code, PTAL when you 
have time.
   
   hi @rmetzger seems flinkbot is off-line, azagrebin approved all, but 
flinkbot didn't respond. And I found that the problem exists on the other prs 
also. #7935  #7938 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on a change in pull request #7931: [FLINK-11854] [table-planner-blink] Introduce batch physical nodes

2019-03-07 Thread GitBox
KurtYoung commented on a change in pull request #7931: [FLINK-11854] 
[table-planner-blink] Introduce batch physical nodes
URL: https://github.com/apache/flink/pull/7931#discussion_r263639880
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryRow.java
 ##
 @@ -63,6 +91,22 @@ public static int calculateFixPartSizeInBytes(int arity) {
return calculateBitSetWidthInBytes(arity) + 8 * arity;
}
 
+   /**
+* If it is a fixed-length field, we can call this BinaryRow's setXX 
method for in-place updates.
+* If it is variable-length field, can't use this method, because the 
underlying data is stored continuously.
+*/
+   public static boolean isFixedLength(InternalType type) {
+   if (type instanceof DecimalType) {
+   return ((DecimalType) type).precision() <= 
DecimalType.MAX_COMPACT_PRECISION;
+   } else {
+   return MUTABLE_FIELD_TYPES.contains(type);
+   }
+   }
+
+   public static boolean isMutable(InternalType type) {
+   return MUTABLE_FIELD_TYPES.contains(type) || type instanceof 
DecimalType;
 
 Review comment:
   do we also need to check Decimal's precision just like `isFixedLength` does?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on a change in pull request #7931: [FLINK-11854] [table-planner-blink] Introduce batch physical nodes

2019-03-07 Thread GitBox
KurtYoung commented on a change in pull request #7931: [FLINK-11854] 
[table-planner-blink] Introduce batch physical nodes
URL: https://github.com/apache/flink/pull/7931#discussion_r263641774
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/ExpandUtil.scala
 ##
 @@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.util
+
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rex.{RexInputRef, RexLiteral, RexNode}
+
+import java.util
+
+import scala.collection.JavaConversions._
+
+/**
+  * Utility methods for expand operator.
+  */
+object ExpandUtil {
+
+  def projectsToString(
 
 Review comment:
   Can we try to move all methods like these to one util class? Various logical 
or physical operators will share some name formatting utilities. There will be 
a lot repetition if we organize it by operator.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on issue #7913: [FLINK-11837][table-runtime-blink] Improve internal data format

2019-03-07 Thread GitBox
JingsongLi commented on issue #7913: [FLINK-11837][table-runtime-blink] Improve 
internal data format
URL: https://github.com/apache/flink/pull/7913#issuecomment-470774081
 
 
   > BTW, `NestedRow` has not been covered by any test case
   
   Yes, I will add test for NestedRow, JoinedRow, BoxedWrapperRow and GenericRow


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Assigned] (FLINK-11852) Improve Processing function example

2019-03-07 Thread vinoyang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11852?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

vinoyang reassigned FLINK-11852:


Assignee: TANG Wen-hui

> Improve Processing function example
> ---
>
> Key: FLINK-11852
> URL: https://issues.apache.org/jira/browse/FLINK-11852
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Affects Versions: 1.7.2
>Reporter: Flavio Pompermaier
>Assignee: TANG Wen-hui
>Priority: Minor
>
> In the processing function documentation 
> ([https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html)]
>  there's an "abusive" usage of the timers since a new timer is registered for 
> every new tuple coming in. This could cause problems in terms of allocated 
> objects and could burden the overall application.
> It could worth to mention this problem and remove useless timers, e.g.:
>  
> {code:java}
> CountWithTimestamp current = state.value();
> if (current == null) {
>      current = new CountWithTimestamp();
>      current.key = value.f0;
>  } else {
>     ctx.timerService().deleteEventTimeTimer(current.lastModified + timeout);
>  }{code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11819) Additional attribute for order by not support by flink sql, but told supported in doc

2019-03-07 Thread Hequn Cheng (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11819?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16787428#comment-16787428
 ] 

Hequn Cheng commented on FLINK-11819:
-

[~hustclf] Thanks for bringing up the discussion.
[~xccui][~hustclf] How about update the doc to make it more clear? The 
explanation from [~xccui] is a good example. 

Best, Hequn

> Additional attribute for order by not support by flink sql, but told 
> supported in doc
> -
>
> Key: FLINK-11819
> URL: https://issues.apache.org/jira/browse/FLINK-11819
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.7.2
>Reporter: Lifei Chen
>Assignee: Lifei Chen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.3
>
>   Original Estimate: 3h
>  Time Spent: 20m
>  Remaining Estimate: 2h 40m
>
> I am using flink v1.7.1, when I use flink sql to order by an attribute (not 
> time attribute), the error logs is as follow.
>  
> sql:
> {quote}"SELECT * FROM events order by tenantId"
> {quote}
>  
> error logs:
> {quote}Exception in thread "main" org.apache.flink.table.api.TableException: 
> Cannot generate a valid execution plan for the given query:
> FlinkLogicalSort(sort0=[$2], dir0=[ASC])
>  FlinkLogicalNativeTableScan(table=[[_DataStreamTable_0]])
> This exception indicates that the query uses an unsupported SQL feature.
>  Please check the documentation for the set of currently supported SQL 
> features.
>  at 
> org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:377)
>  at 
> org.apache.flink.table.api.TableEnvironment.optimizePhysicalPlan(TableEnvironment.scala:302)
>  at 
> org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:814)
>  at 
> org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:860)
>  at 
> org.apache.flink.table.api.java.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:305)
>  at 
> org.apache.flink.table.api.java.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:248)
> {quote}
>  
> So as for now, only time attribute is supported by flink for command `order 
> by`, additional attribute is not supported yet, Is that right ?
> If so, there is a mistake, indicated that other attribute except for `time 
> attribute` is supported .
> related links: 
> [https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/table/sql.html#orderby--limit]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10935) Implement KubeClient with Faric8 Kubernetes clients

2019-03-07 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10935?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-10935:
---
Labels: pull-request-available  (was: )

> Implement KubeClient with Faric8 Kubernetes clients 
> 
>
> Key: FLINK-10935
> URL: https://issues.apache.org/jira/browse/FLINK-10935
> Project: Flink
>  Issue Type: Sub-task
>  Components: Deployment / Kubernetes
>Reporter: JIN SUN
>Assignee: Chunhui Shi
>Priority: Major
>  Labels: pull-request-available
>
> Implement KubeClient with Faric8 Kubernetes clients and add tests



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] chunhui-shi commented on issue #7939: [FLINK-10935][ResourceManager] Implement KubeClient with Fabric8 Kubenetes clients

2019-03-07 Thread GitBox
chunhui-shi commented on issue #7939: [FLINK-10935][ResourceManager] Implement 
KubeClient with Fabric8 Kubenetes clients
URL: https://github.com/apache/flink/pull/7939#issuecomment-470764389
 
 
   This is the second pull request coming after #7844.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] chunhui-shi opened a new pull request #7939: [FLINK-10935][ResourceManager] Implement KubeClient with Fabric8 Kubenetes clients

2019-03-07 Thread GitBox
chunhui-shi opened a new pull request #7939: [FLINK-10935][ResourceManager] 
Implement KubeClient with Fabric8 Kubenetes clients
URL: https://github.com/apache/flink/pull/7939
 
 
   ##What is the purpose of the change
   As part of native kubernetes integration. Introduce client wrapper to access 
Kubernetes service.
   
   ##Brief change log
   * _Implement KubeClient interface and actual implementation.
   * _Add mock kube server which is derived from Fabric's mock implementation.
   * _A unit tests of KubeClient.
   
   ##Does this pull request potentially affect one of the following parts:
   * Dependencies (does it add or upgrade a dependency): (no)
   * The public API, i.e., is any changed class annotated with 
@Public(Evolving): ( no)
   * The serializers: (no )
   * The runtime per-record code paths (performance sensitive): ( no )
   * Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no )
   * The S3 file system connector: ( no )
   ##Documentation
   * Does this pull request introduce a new feature? (yes)
   * If yes, how is the feature documented? (the document will be introduced in 
later pull requests)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #7939: [FLINK-10935][ResourceManager] Implement KubeClient with Fabric8 Kubenetes clients

2019-03-07 Thread GitBox
flinkbot commented on issue #7939: [FLINK-10935][ResourceManager] Implement 
KubeClient with Fabric8 Kubenetes clients
URL: https://github.com/apache/flink/pull/7939#issuecomment-470764365
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9685) Flink should support hostname-substitution for security.kerberos.login.principal

2019-03-07 Thread Rong Rong (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9685?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16787326#comment-16787326
 ] 

Rong Rong commented on FLINK-9685:
--

Hi. I was wondering whether I fully understood the requirement for this JIRA. 
 1. do we want to support general substitution on principal keywords? e.g. 
REALM as well?
 2. what kind of principal format we are going to support
 3. what would be the good keyword substitution reserved for this purpose.

Can we refer to the kerberos document for any possible improvement for 2/3?

Regarding the implementation, should we differ this change or the replacement 
logic to the specific module instead, for example: {{HadoopModule}}. It seems 
to me that the only place should be using the principal is the security 
modules/contexts themselves, and each module might interpret this differently?? 
this is actually part of the overall effort to provide SeviceProvider pattern 
in FLINK-11589.

> Flink should support hostname-substitution for 
> security.kerberos.login.principal
> 
>
> Key: FLINK-9685
> URL: https://issues.apache.org/jira/browse/FLINK-9685
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Reporter: Ethan Li
>Assignee: Aleksandr Salatich
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 40m
>  Remaining Estimate: 0h
>
> [https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityConfiguration.java#L83]
>  
> We can have something like this
> {code:java}
> String rawPrincipal = 
> flinkConf.getString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL);
> if (rawPrincipal != null) {
>try {
>   rawPrincipal = rawPrincipal.replace("HOSTNAME", 
> InetAddress.getLocalHost().getCanonicalHostName());
>} catch (UnknownHostException e) {
>   LOG.error("Failed to replace HOSTNAME with localhost because {}", e);
>}
> }
> this.principal = rawPrincipal;
> {code}
> So it will be easier to deploy flink to cluster. Instead of setting different 
> principal on every node, we can have the same principal 
> headless_user/HOSTNAME@DOMAIN .



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-11848) Delete outdated kafka topics caused UNKNOWN_TOPIC_EXCEPTIION

2019-03-07 Thread David Anderson (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11848?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Anderson updated FLINK-11848:
---
Description: 
Recently we are doing some streaming jobs with apache flink. There are multiple 
KAFKA topics with a format as xx_yy-mm-dd. We used a topic regex pattern to 
let a consumer to consume those topics. However, if we delete some older 
topics, it seems that the metadata in consumer does not update properly so It 
still remember those outdated topic in its topic list, which leads to 
*UNKNOWN_TOPIC_EXCEPTION*. We must restart the consumer job to recovery. It 
seems to occur in producer as well. Any idea to solve this problem? Thank you 
very much!
  
  

  was:
Recently we are doing some streaming jobs with apache flink. There are multiple 
KAFKA topics with a format as xx_yy-mm-dd. We used a topic regex pattern to 
let a consumer to consume those topics. However, if we delete some older 
topics, it seems that the metadata in consumer does not update properly so It 
still remember those outdated topic in its topic list, which leads to 
*UNKNOWN_TOPIC_EXCEPTIOIN*. We must restart the consumer job to recovery. It 
seems to occur in producer as well. Any idea to solve this problem? Thank you 
very much!
 
 


> Delete outdated kafka topics caused UNKNOWN_TOPIC_EXCEPTIION
> 
>
> Key: FLINK-11848
> URL: https://issues.apache.org/jira/browse/FLINK-11848
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.6.4
>Reporter: Shengnan YU
>Priority: Major
>
> Recently we are doing some streaming jobs with apache flink. There are 
> multiple KAFKA topics with a format as xx_yy-mm-dd. We used a topic regex 
> pattern to let a consumer to consume those topics. However, if we delete some 
> older topics, it seems that the metadata in consumer does not update properly 
> so It still remember those outdated topic in its topic list, which leads to 
> *UNKNOWN_TOPIC_EXCEPTION*. We must restart the consumer job to recovery. It 
> seems to occur in producer as well. Any idea to solve this problem? Thank you 
> very much!
>   
>   



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-11851) ClusterEntrypoint provides wrong executor to HaServices

2019-03-07 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11851?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-11851.
-
Resolution: Fixed

Fixed via
master: 3c7ed148cf39fa81a18832f7774365d78c3af08c
1.8.0: b7d85c87195200d5cdccb30160d7f3cecd980de8
1.7.3: 215204ca2298eb6c2d71baef5d3ea1dce17d368c

> ClusterEntrypoint provides wrong executor to HaServices
> ---
>
> Key: FLINK-11851
> URL: https://issues.apache.org/jira/browse/FLINK-11851
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.7.2, 1.8.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.7.3, 1.8.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> The {{ClusterEntrypoint}} provides the executor of the common {{RpcService}} 
> to the {{HighAvailabilityServices}} which uses the executor to run io 
> operations. In I/O heavy cases, this can block all {{RpcService}} threads and 
> make the {{RpcEndpoints}} running in the respective {{RpcService}} 
> unresponsive.
> I suggest to introduce a dedicated I/O executor which is used for io heavy 
> operations.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] asfgit merged pull request #7926: [BP-1.7][FLINK-11851] Introduce dedicated io executor for ClusterEntrypoint and MiniCluster

2019-03-07 Thread GitBox
asfgit merged pull request #7926: [BP-1.7][FLINK-11851] Introduce dedicated io 
executor for ClusterEntrypoint and MiniCluster
URL: https://github.com/apache/flink/pull/7926
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Resolved] (FLINK-11846) Duplicate job submission delete HA files

2019-03-07 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11846?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann resolved FLINK-11846.
---
   Resolution: Fixed
Fix Version/s: (was: 1.8.0)

Fixed via
master: f2b0e494e6089ccda488d6fa69cdb135a4213cf9
1.8.0: 813286bc3b27068aeb7c8dc2b98b4b6c96fbc8fb

> Duplicate job submission delete HA files
> 
>
> Key: FLINK-11846
> URL: https://issues.apache.org/jira/browse/FLINK-11846
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.8.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Blocker
>  Labels: pull-request-available
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> Due to changes for FLINK-11383, the {{Dispatcher}} now delete HA files if the 
> client submits twice a job. A duplicate job submission should, however, 
> simply be rejected but not cause that HA files are being deleted.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-11850) ZooKeeperHaServicesTest#testSimpleCloseAndCleanupAllData fails on Travis

2019-03-07 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11850?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann resolved FLINK-11850.
---
Resolution: Fixed

Fixed via
master: b464df2e5728bebee45dde7410ddb4f5ff1e6e2f
1.8.0: 2f765e7b47f9acfd7c3f538cbdb5e560a2f41a3b

> ZooKeeperHaServicesTest#testSimpleCloseAndCleanupAllData fails on Travis
> 
>
> Key: FLINK-11850
> URL: https://issues.apache.org/jira/browse/FLINK-11850
> Project: Flink
>  Issue Type: Test
>  Components: Runtime / Coordination, Tests
>Reporter: Congxian Qiu(klion26)
>Assignee: Till Rohrmann
>Priority: Critical
>  Labels: pull-request-available, test-stability
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> org.apache.flink.runtime.highavailability.zookeeper.ZooKeeperHaServicesTest
> 08:20:01.694 [ERROR] 
> testSimpleCloseAndCleanupAllData(org.apache.flink.runtime.highavailability.zookeeper.ZooKeeperHaServicesTest)
>  Time elapsed: 0.076 s <<< ERROR!
> org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = 
> NoNode for 
> /foo/bar/flink/default/leaderlatch/resource_manager_lock/_c_477d0124-92f3-4069-98aa-a71b8243250c-latch-00
>  at 
> org.apache.flink.runtime.highavailability.zookeeper.ZooKeeperHaServicesTest.runCleanupTest(ZooKeeperHaServicesTest.java:203)
>  at 
> org.apache.flink.runtime.highavailability.zookeeper.ZooKeeperHaServicesTest.testSimpleCloseAndCleanupAllData(ZooKeeperHaServicesTest.java:128)
>  
> Travis links: https://travis-ci.org/apache/flink/jobs/502960186



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] asfgit merged pull request #7925: [BP-1.8][FLINK-11851] Introduce dedicated io executor for ClusterEntrypoint and MiniCluster

2019-03-07 Thread GitBox
asfgit merged pull request #7925: [BP-1.8][FLINK-11851] Introduce dedicated io 
executor for ClusterEntrypoint and MiniCluster
URL: https://github.com/apache/flink/pull/7925
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] asfgit merged pull request #7919: [BP-1.8][FLINK-11846] Don't delete HA job files in case of duplicate job submission

2019-03-07 Thread GitBox
asfgit merged pull request #7919: [BP-1.8][FLINK-11846] Don't delete HA job 
files in case of duplicate job submission
URL: https://github.com/apache/flink/pull/7919
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] asfgit merged pull request #7929: [BP-1.8][FLINK-11850][zk] Tolerate concurrent child deletions when deleting owned zNode

2019-03-07 Thread GitBox
asfgit merged pull request #7929: [BP-1.8][FLINK-11850][zk] Tolerate concurrent 
child deletions when deleting owned zNode
URL: https://github.com/apache/flink/pull/7929
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] asfgit merged pull request #7924: [FLINK-11851] Introduce dedicated io executor for ClusterEntrypoint and MiniCluster

2019-03-07 Thread GitBox
asfgit merged pull request #7924: [FLINK-11851] Introduce dedicated io executor 
for ClusterEntrypoint and MiniCluster
URL: https://github.com/apache/flink/pull/7924
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] asfgit merged pull request #7928: [FLINK-11850][zk] Tolerate concurrent child deletions when deleting owned zNode

2019-03-07 Thread GitBox
asfgit merged pull request #7928: [FLINK-11850][zk] Tolerate concurrent child 
deletions when deleting owned zNode
URL: https://github.com/apache/flink/pull/7928
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] asfgit merged pull request #7918: [FLINK-11846] Don't delete HA job files in case of duplicate job submission

2019-03-07 Thread GitBox
asfgit merged pull request #7918: [FLINK-11846] Don't delete HA job files in 
case of duplicate job submission
URL: https://github.com/apache/flink/pull/7918
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #7713: [FLINK-10995][network] Copy intermediate serialization results only once for broadcast mode

2019-03-07 Thread GitBox
pnowojski commented on a change in pull request #7713: [FLINK-10995][network] 
Copy intermediate serialization results only once for broadcast mode
URL: https://github.com/apache/flink/pull/7713#discussion_r263563494
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/BroadcastRecordWriter.java
 ##
 @@ -44,4 +52,67 @@ public BroadcastRecordWriter(
public void emit(T record) throws IOException, InterruptedException {
broadcastEmit(record);
}
+
+   @Override
+   public void broadcastEmit(T record) throws IOException, 
InterruptedException {
 
 Review comment:
   > Considering initiating BufferConsumer#currentReaderPosition to 
LatencyMarker.size() for other channels in BroadcastRecordWriter#randomEmit, I 
meant this way of getting marker size seems coupled with LatencyMarker 
currently. 
   
   We don't have to hardcode `LatencyMarker.size()`. After all when we were 
about to set the offset, we have just serialized the record for `randomEmit()` 
to a fresh `BufferBuilder` :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] walterddr commented on a change in pull request #7860: [FLINK-9685] Adding support hostname-substitution for security.kerberos login.principal

2019-03-07 Thread GitBox
walterddr commented on a change in pull request #7860: [FLINK-9685] Adding 
support hostname-substitution for security.kerberos login.principal
URL: https://github.com/apache/flink/pull/7860#discussion_r263552559
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityConfiguration.java
 ##
 @@ -80,13 +86,21 @@ public SecurityConfiguration(Configuration flinkConf,
List securityModuleFactories) {
this.isZkSaslDisable = 
flinkConf.getBoolean(SecurityOptions.ZOOKEEPER_SASL_DISABLE);
this.keytab = 
flinkConf.getString(SecurityOptions.KERBEROS_LOGIN_KEYTAB);
-   this.principal = 
flinkConf.getString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL);
this.useTicketCache = 
flinkConf.getBoolean(SecurityOptions.KERBEROS_LOGIN_USETICKETCACHE);
this.loginContextNames = 
parseList(flinkConf.getString(SecurityOptions.KERBEROS_LOGIN_CONTEXTS));
this.zkServiceName = 
flinkConf.getString(SecurityOptions.ZOOKEEPER_SASL_SERVICE_NAME);
this.zkLoginContextName = 
flinkConf.getString(SecurityOptions.ZOOKEEPER_SASL_LOGIN_CONTEXT_NAME);
this.securityModuleFactories = 
Collections.unmodifiableList(securityModuleFactories);
this.flinkConfig = checkNotNull(flinkConf);
+   String rawPrincipal = 
flinkConf.getString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL);
 
 Review comment:
   I am not sure this will work. I think we will have to make this as part of 
the `getPrincipal()` method, because it will be called via 
`SecurityUtils.install` in host, JM and TM?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] walterddr commented on a change in pull request #7860: [FLINK-9685] Adding support hostname-substitution for security.kerberos login.principal

2019-03-07 Thread GitBox
walterddr commented on a change in pull request #7860: [FLINK-9685] Adding 
support hostname-substitution for security.kerberos login.principal
URL: https://github.com/apache/flink/pull/7860#discussion_r263553448
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityConfiguration.java
 ##
 @@ -80,13 +86,21 @@ public SecurityConfiguration(Configuration flinkConf,
List securityModuleFactories) {
this.isZkSaslDisable = 
flinkConf.getBoolean(SecurityOptions.ZOOKEEPER_SASL_DISABLE);
this.keytab = 
flinkConf.getString(SecurityOptions.KERBEROS_LOGIN_KEYTAB);
-   this.principal = 
flinkConf.getString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL);
this.useTicketCache = 
flinkConf.getBoolean(SecurityOptions.KERBEROS_LOGIN_USETICKETCACHE);
this.loginContextNames = 
parseList(flinkConf.getString(SecurityOptions.KERBEROS_LOGIN_CONTEXTS));
this.zkServiceName = 
flinkConf.getString(SecurityOptions.ZOOKEEPER_SASL_SERVICE_NAME);
this.zkLoginContextName = 
flinkConf.getString(SecurityOptions.ZOOKEEPER_SASL_LOGIN_CONTEXT_NAME);
this.securityModuleFactories = 
Collections.unmodifiableList(securityModuleFactories);
this.flinkConfig = checkNotNull(flinkConf);
+   String rawPrincipal = 
flinkConf.getString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL);
+   if (rawPrincipal != null) {
+   try {
+   rawPrincipal = rawPrincipal.replace("HOSTNAME", 
InetAddress.getLocalHost().getCanonicalHostName());
 
 Review comment:
   IN order to make dynamic replacement, I think we have to have some indicator 
on which of the 3 Kerberos principal is used:
   ```
   user_name@REALM
   host/host_name@REALM
   service_name/host_name@REALM
   ```
   and if we use `"HOSTNAME"` as reserved keyword we also need to change 
documentations as well


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11798) Incorrect Kubernetes Documentation

2019-03-07 Thread Alex (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11798?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16787142#comment-16787142
 ] 

Alex commented on FLINK-11798:
--

[~pritesh-patel], unfortunately the docker entry point for public Flink 1.7 
docker images doesn't pass additional arguments to Task Manager.
I wasn't aware of it, as I was using 
https://github.com/apache/flink/blob/release-1.7/flink-container/docker/docker-entrypoint.sh
 for the reference.

There is a [PR|https://github.com/docker-flink/docker-flink/pull/68] to allow 
passing additional arguments to Task Manager.
But to make the described option take the effect, either Flink 1.7.2 docker 
images need to be rebuilt and retagged OR wait when the updated scripts would 
be included in some other Flink 1.7.x release.

> Incorrect Kubernetes Documentation
> --
>
> Key: FLINK-11798
> URL: https://issues.apache.org/jira/browse/FLINK-11798
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / Kubernetes
>Affects Versions: 1.7.2
>Reporter: Pritesh Patel
>Assignee: Alex
>Priority: Major
>
> I have been trying to use the kubernetes session cluster manifests provided 
> in the documentation. The -Dtaskmanager.host flag doesn't seem to pass 
> through, meaning it uses the pod name as the host name. This wont work.
> The current docs state the args should be:
>  
> {code:java}
> args: 
> - taskmanager 
> - "-Dtaskmanager.host=$(K8S_POD_IP)"
> {code}
>  
> I did manage to get it to work by using this manifest for the taskmanager 
> instead. This did waste alot of time as it was very hard to find.
> {code:java}
> args: 
> - taskmanager.sh
> - -Dtaskmanager.host=$(K8S_POD_IP)
> - -Djobmanager.rpc.address=$(JOB_MANAGER_RPC_ADDRESS) 
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] walterddr commented on a change in pull request #7895: [FLINK-11126][YARN][security] Filter out AMRMToken in TaskManager‘s credentials

2019-03-07 Thread GitBox
walterddr commented on a change in pull request #7895: 
[FLINK-11126][YARN][security] Filter out AMRMToken in TaskManager‘s credentials
URL: https://github.com/apache/flink/pull/7895#discussion_r263529807
 
 

 ##
 File path: flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java
 ##
 @@ -58,4 +87,84 @@ public void testDeleteApplicationFiles() throws Exception {
assertThat(files.count(), equalTo(0L));
}
}
+
+   @Test
+   public void testCreateTaskExecutorContext() throws Exception {
 
 Review comment:
   This is very much involved, I was wondering if it's better to have a 
separate util method to just simply test this specific piece of removing 
AM_RM_TOKEN from the credential factory. and/or move it to `flink-yarn-test` 
package to have a more comprehensive testing?
   e.g. add another assertion:
   ```
Assert.assertThat(
"AMRMToken", taskManagerRunsWithKerberos, 
Matchers.is(false));
   ```
   in `YARNSessionFIFOSecuredITCase` ??


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] walterddr commented on a change in pull request #7895: [FLINK-11126][YARN][security] Filter out AMRMToken in TaskManager‘s credentials

2019-03-07 Thread GitBox
walterddr commented on a change in pull request #7895: 
[FLINK-11126][YARN][security] Filter out AMRMToken in TaskManager‘s credentials
URL: https://github.com/apache/flink/pull/7895#discussion_r263529807
 
 

 ##
 File path: flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java
 ##
 @@ -58,4 +87,84 @@ public void testDeleteApplicationFiles() throws Exception {
assertThat(files.count(), equalTo(0L));
}
}
+
+   @Test
+   public void testCreateTaskExecutorContext() throws Exception {
 
 Review comment:
   This is very much involved, I was wondering if it's better to have a 
separate util method to just simply test this specific piece of removing 
AM_RM_TOKEN from the credential factory. and/or move it to `flink-yarn-test` 
package to have a more comprehensive testing?
   e.g.
   ```
Assert.assertThat(
"AMRMToken", taskManagerRunsWithKerberos, 
Matchers.is(false));
   ```
   in `YARNSessionFIFOSecuredITCase` ??


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] walterddr commented on a change in pull request #7895: [FLINK-11126][YARN][security] Filter out AMRMToken in TaskManager‘s credentials

2019-03-07 Thread GitBox
walterddr commented on a change in pull request #7895: 
[FLINK-11126][YARN][security] Filter out AMRMToken in TaskManager‘s credentials
URL: https://github.com/apache/flink/pull/7895#discussion_r263528664
 
 

 ##
 File path: flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
 ##
 @@ -565,7 +565,20 @@ static ContainerLaunchContext createTaskExecutorContext(
new File(fileLocation),

HadoopUtils.getHadoopConfiguration(flinkConfig));
 
-   cred.writeTokenStorageToStream(dob);
+   // Filter out AMRMToken before setting the 
tokens to the TaskManager container context.
+   Method getAllTokensMethod = 
Credentials.class.getMethod("getAllTokens");
+   Credentials taskManagerCred = new Credentials();
+   final Text amRmTokenKind = new 
Text("YARN_AM_RM_TOKEN");
 
 Review comment:
   Can we refactor `YARN_AM_RM_TOKEN` to `static final ` indicator of the Utils 
class? it will also be used in test as well.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] walterddr commented on a change in pull request #7895: [FLINK-11126][YARN][security] Filter out AMRMToken in TaskManager‘s credentials

2019-03-07 Thread GitBox
walterddr commented on a change in pull request #7895: 
[FLINK-11126][YARN][security] Filter out AMRMToken in TaskManager‘s credentials
URL: https://github.com/apache/flink/pull/7895#discussion_r263529202
 
 

 ##
 File path: flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java
 ##
 @@ -18,25 +18,54 @@
 
 package org.apache.flink.yarn;
 
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.CommonTestUtils;
+import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
 import org.apache.flink.util.TestLogger;
 
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.File;
 import java.nio.file.Files;
 import java.nio.file.Path;
+import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.stream.Stream;
 
 import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
 
 Review comment:
   just curious, @klion26 is there any concrete discussion that mockito is 
introducing instability in the flink-yarn / filnk-yarn-test package? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on issue #7928: [FLINK-11850][zk] Tolerate concurrent child deletions when deleting owned zNode

2019-03-07 Thread GitBox
tillrohrmann commented on issue #7928: [FLINK-11850][zk] Tolerate concurrent 
child deletions when deleting owned zNode
URL: https://github.com/apache/flink/pull/7928#issuecomment-470638988
 
 
   Thanks for the review @TisonKun and @zentol. Addressing Chesnay's last 
comment and then merging this PR.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #7928: [FLINK-11850][zk] Tolerate concurrent child deletions when deleting owned zNode

2019-03-07 Thread GitBox
tillrohrmann commented on a change in pull request #7928: [FLINK-11850][zk] 
Tolerate concurrent child deletions when deleting owned zNode
URL: https://github.com/apache/flink/pull/7928#discussion_r263508836
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java
 ##
 @@ -251,7 +251,20 @@ private void cleanupZooKeeperPaths() throws Exception {
 
private void deleteOwnedZNode() throws Exception {
// delete the HA_CLUSTER_ID znode which is owned by this cluster
-   client.delete().deletingChildrenIfNeeded().forPath("/");
+
+   // Since we are using Curator version 2.12 there is a bug in 
deleting the children
+   // if there is a concurrent delete operation. Therefore we need 
to add this retry
+   // logic. See https://issues.apache.org/jira/browse/CURATOR-430 
for more information.
+   // The retry logic can be removed once we upgrade to Curator 
version >= 4.0.1.
+   boolean zNodeDeleted = false;
+   while (!zNodeDeleted) {
+   try {
+   
client.delete().deletingChildrenIfNeeded().forPath("/");
+   zNodeDeleted = true;
+   } catch (KeeperException.NoNodeException ignored) {
+   // concurrent delete operation. Try again.
 
 Review comment:
   True, will add it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on issue #7924: [FLINK-11851] Introduce dedicated io executor for ClusterEntrypoint and MiniCluster

2019-03-07 Thread GitBox
tillrohrmann commented on issue #7924: [FLINK-11851] Introduce dedicated io 
executor for ClusterEntrypoint and MiniCluster
URL: https://github.com/apache/flink/pull/7924#issuecomment-470637680
 
 
   Thanks for the review @StefanRRichter and @jgrier. Merging this PR.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on issue #7935: [FLINK-11855] Fix race condition in EmbeddedLeaderService#GrantLeadershipCall

2019-03-07 Thread GitBox
tillrohrmann commented on issue #7935: [FLINK-11855] Fix race condition in 
EmbeddedLeaderService#GrantLeadershipCall
URL: https://github.com/apache/flink/pull/7935#issuecomment-470634979
 
 
   Thanks for the review @zentol. I think you are right that we should also fix 
the `RevokeLeadershipCall`. I've added a fixup and addressed your comments.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] connord-stripe commented on issue #7168: [FLINK-6756][DataStream API] Provide Rich AsyncFunction to Scala API …

2019-03-07 Thread GitBox
connord-stripe commented on issue #7168: [FLINK-6756][DataStream API] Provide 
Rich AsyncFunction to Scala API …
URL: https://github.com/apache/flink/pull/7168#issuecomment-470629861
 
 
   Neat. Thanks!


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on issue #7938: [FLINK-10941] Keep slots which contain unconsumed result partitions (on top of #7186)

2019-03-07 Thread GitBox
azagrebin commented on issue #7938: [FLINK-10941] Keep slots which contain 
unconsumed result partitions (on top of #7186)
URL: https://github.com/apache/flink/pull/7938#issuecomment-470629769
 
 
   cc @QiLuo-BD @zhijiangW @pnowojski 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on issue #7186: [FLINK-10941] Keep slots which contain unconsumed result partitions

2019-03-07 Thread GitBox
azagrebin commented on issue #7186: [FLINK-10941] Keep slots which contain 
unconsumed result partitions
URL: https://github.com/apache/flink/pull/7186#issuecomment-470629679
 
 
   I agree with @zhijiangW. 
   
   We discussed it also with @pnowojski, we might not merge it into 1.8.0 right 
away because we want to give it some exposure on running tests for couple of 
weeks, but the fix is still useful for subsequent minor releases of 1.8 and 
e.g. 1.7.
   
   Thinking more about delaying the releasing to moment of channel closing in 
happy scenario, when isEndOfPartitionEvent comes, there will probably be not so 
much to close apart of file handles which is not so much. Buffers are also 
pretty much flushed to netty and either already freed by netty or will be. 
Sorry for confusion, actually the current approach in PR is simpler than 
introduction of one more `isClosed` state.
   
   @QiLuo-BD 
   I would suggest couple of more changes apart of using `Future` in 
task executor gateway, commented by @zhijiangW.
   
   The task executor gateway could use `NetworkEnviroment.PartitionManager` to 
check still registered and unreleased partitions because it is the central 
point to manage partitions. It would also simplify how it is queried now going 
through `TaskSlot` and lingering `Tasks`. Eventually, `NetworkEviroment` 
becomes `ShuffleService` which could decide whether producer can be released or 
not this way or another.
   
   We still make producer task executor depend on another consumer task 
executor, though, we do not see any problems with it now, I think it is still 
more safe to introduce an option as a feature flag. By default, task executor 
will wait for consumers, as this PR suggests, but users can always fallback to 
the previous behaviour using the option.
   
   I created a co-authored PR #7938 on top of your PR with those changes 
rebased on the fresh master. You could review it and we decide further.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on issue #7938: [FLINK-10941] Keep slots which contain unconsumed result partitions (on top of #7186)

2019-03-07 Thread GitBox
azagrebin commented on issue #7938: [FLINK-10941] Keep slots which contain 
unconsumed result partitions (on top of #7186)
URL: https://github.com/apache/flink/pull/7938#issuecomment-470629074
 
 
   @flinkbot attention @pnowojski 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #7938: [FLINK-10941] Keep slots which contain unconsumed result partitions (on top of #7186)

2019-03-07 Thread GitBox
flinkbot commented on issue #7938: [FLINK-10941] Keep slots which contain 
unconsumed result partitions (on top of #7186)
URL: https://github.com/apache/flink/pull/7938#issuecomment-470628733
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin opened a new pull request #7938: [FLINK-10941] Keep slots which contain unconsumed result partitions (on top of #7186)

2019-03-07 Thread GitBox
azagrebin opened a new pull request #7938: [FLINK-10941] Keep slots which 
contain unconsumed result partitions (on top of #7186)
URL: https://github.com/apache/flink/pull/7938
 
 
   ## What is the purpose of the change
   
   More changes based on #7186.
   
   ## Brief change log
   
   The task executor gateway can use `NetworkEviroment.PartitionManager` to 
check still registered and unreleased partitions because it is the central 
point to manage partitions. It would also simplify how it is queried now going 
through `TaskSlot` and lingering `Tasks`. Eventually, `NetworkEviroment` 
becomes `ShuffleService` which could decide whether producer can be released or 
not this way or another.
   
   We still make producer task executor depend on another consumer task 
executor, though, we do not see any problems with it now, I think it is still 
more safe to introduce an option as a feature flag. By default, task executor 
will wait for consumers, as this PR suggests, but users can always fallback to 
the previous behaviour using the option.
   
   ## Verifying this change
   
   unit tests, e2e tests
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on issue #7921: [FLINK-11825][StateBackends] Resolve name class of StateTTL TimeCharacteristic class

2019-03-07 Thread GitBox
azagrebin commented on issue #7921: [FLINK-11825][StateBackends] Resolve name 
class of StateTTL TimeCharacteristic class
URL: https://github.com/apache/flink/pull/7921#issuecomment-470619215
 
 
   @flinkbot approve all


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #7935: [FLINK-11855] Fix race condition in EmbeddedLeaderService#GrantLeadershipCall

2019-03-07 Thread GitBox
tillrohrmann commented on a change in pull request #7935: [FLINK-11855] Fix 
race condition in EmbeddedLeaderService#GrantLeadershipCall
URL: https://github.com/apache/flink/pull/7935#discussion_r263487660
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedLeaderServiceTest.java
 ##
 @@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability.nonha.embedded;
+
+import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.UUID;
+import java.util.concurrent.ArrayBlockingQueue;
+
+import static org.hamcrest.Matchers.is;
+
+/**
+ * Tests for the {@link EmbeddedLeaderService}.
+ */
+public class EmbeddedLeaderServiceTest extends TestLogger {
+
+   /**
+* Tests that the {@link EmbeddedLeaderService} can handle a concurrent 
grant
+* leadership call and a shutdown.
+*/
+   @Test
+   public void testConcurrentGrantLeadershipAndShutdown() throws Exception 
{
+   final ManuallyTriggeredScheduledExecutor executor = new 
ManuallyTriggeredScheduledExecutor();
+   final EmbeddedLeaderService embeddedLeaderService = new 
EmbeddedLeaderService(executor);
+
+   try {
+   final LeaderElectionService leaderElectionService = 
embeddedLeaderService.createLeaderElectionService();
+
+   final ArrayBlockingQueue offeredSessionIds = new 
ArrayBlockingQueue<>(1);
+   final TestingLeaderContender contender = new 
TestingLeaderContender(offeredSessionIds);
+
+   leaderElectionService.start(contender);
+   leaderElectionService.stop();
+
+   if (executor.numQueuedRunnables() > 0) {
 
 Review comment:
   But I think you are right that we could do this check a little bit better.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #7935: [FLINK-11855] Fix race condition in EmbeddedLeaderService#GrantLeadershipCall

2019-03-07 Thread GitBox
tillrohrmann commented on a change in pull request #7935: [FLINK-11855] Fix 
race condition in EmbeddedLeaderService#GrantLeadershipCall
URL: https://github.com/apache/flink/pull/7935#discussion_r263483901
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedLeaderService.java
 ##
 @@ -506,33 +515,28 @@ public void run() {
 
private static class GrantLeadershipCall implements Runnable {
 
-   private final EmbeddedLeaderElectionService 
leaderElectionService;
+   private final LeaderContender contender;
private final UUID leaderSessionId;
private final Logger logger;
 
GrantLeadershipCall(
-   EmbeddedLeaderElectionService 
leaderElectionService,
+   LeaderContender contender,
UUID leaderSessionId,
Logger logger) {
 
-   this.leaderElectionService = 
checkNotNull(leaderElectionService);
+   this.contender = checkNotNull(contender);
this.leaderSessionId = checkNotNull(leaderSessionId);
this.logger = checkNotNull(logger);
}
 
@Override
public void run() {
-   leaderElectionService.isLeader = true;
-
-   final LeaderContender contender = 
leaderElectionService.contender;
-
try {
contender.grantLeadership(leaderSessionId);
}
catch (Throwable t) {
logger.warn("Error granting leadership to 
contender", t);
contender.handleError(t instanceof Exception ? 
(Exception) t : new Exception(t));
-   leaderElectionService.isLeader = false;
 
 Review comment:
   I kept the flag because if we would remove it, then a user of the 
`EmbeddedLeaderElectionService#hasLeadership` could get a false positive if it 
guessed the current leadership id by chance.
   
   It is true that we don't reset the `isLeader` flat to false in case of a 
failure. The assumption is that the `leaderContender` will stop the leader 
election service in case of a failure.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #7935: [FLINK-11855] Fix race condition in EmbeddedLeaderService#GrantLeadershipCall

2019-03-07 Thread GitBox
tillrohrmann commented on a change in pull request #7935: [FLINK-11855] Fix 
race condition in EmbeddedLeaderService#GrantLeadershipCall
URL: https://github.com/apache/flink/pull/7935#discussion_r263482371
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedLeaderServiceTest.java
 ##
 @@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability.nonha.embedded;
+
+import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.UUID;
+import java.util.concurrent.ArrayBlockingQueue;
+
+import static org.hamcrest.Matchers.is;
+
+/**
+ * Tests for the {@link EmbeddedLeaderService}.
+ */
+public class EmbeddedLeaderServiceTest extends TestLogger {
+
+   /**
+* Tests that the {@link EmbeddedLeaderService} can handle a concurrent 
grant
+* leadership call and a shutdown.
+*/
+   @Test
+   public void testConcurrentGrantLeadershipAndShutdown() throws Exception 
{
+   final ManuallyTriggeredScheduledExecutor executor = new 
ManuallyTriggeredScheduledExecutor();
+   final EmbeddedLeaderService embeddedLeaderService = new 
EmbeddedLeaderService(executor);
+
+   try {
+   final LeaderElectionService leaderElectionService = 
embeddedLeaderService.createLeaderElectionService();
+
+   final ArrayBlockingQueue offeredSessionIds = new 
ArrayBlockingQueue<>(1);
+   final TestingLeaderContender contender = new 
TestingLeaderContender(offeredSessionIds);
+
+   leaderElectionService.start(contender);
+   leaderElectionService.stop();
+
+   if (executor.numQueuedRunnables() > 0) {
 
 Review comment:
   There are no safeguards since it is actually an implementation detail how 
the `LeaderContender` is granted leadership. Also that `start` will directly 
send a `GrantLeadershipCall` is an implementation detail on which we rely by 
not having this check. Strictly speaking also this check does not guarantee 
that if the `ManuallyTriggeredScheduledExecutor` has a runnable enqueued that 
this is the `GrantLeadershipCall` runnable. Therefore, I opted for this 
compromise.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on issue #7918: [FLINK-11846] Don't delete HA job files in case of duplicate job submission

2019-03-07 Thread GitBox
tillrohrmann commented on issue #7918: [FLINK-11846] Don't delete HA job files 
in case of duplicate job submission
URL: https://github.com/apache/flink/pull/7918#issuecomment-470611600
 
 
   Thanks for the review @TisonKun and @StefanRRichter. Merging this PR.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zentol commented on a change in pull request #7935: [FLINK-11855] Fix race condition in EmbeddedLeaderService#GrantLeadershipCall

2019-03-07 Thread GitBox
zentol commented on a change in pull request #7935: [FLINK-11855] Fix race 
condition in EmbeddedLeaderService#GrantLeadershipCall
URL: https://github.com/apache/flink/pull/7935#discussion_r263465103
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedLeaderServiceTest.java
 ##
 @@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability.nonha.embedded;
+
+import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.UUID;
+import java.util.concurrent.ArrayBlockingQueue;
+
+import static org.hamcrest.Matchers.is;
+
+/**
+ * Tests for the {@link EmbeddedLeaderService}.
+ */
+public class EmbeddedLeaderServiceTest extends TestLogger {
+
+   /**
+* Tests that the {@link EmbeddedLeaderService} can handle a concurrent 
grant
+* leadership call and a shutdown.
+*/
+   @Test
+   public void testConcurrentGrantLeadershipAndShutdown() throws Exception 
{
+   final ManuallyTriggeredScheduledExecutor executor = new 
ManuallyTriggeredScheduledExecutor();
+   final EmbeddedLeaderService embeddedLeaderService = new 
EmbeddedLeaderService(executor);
+
+   try {
+   final LeaderElectionService leaderElectionService = 
embeddedLeaderService.createLeaderElectionService();
+
+   final ArrayBlockingQueue offeredSessionIds = new 
ArrayBlockingQueue<>(1);
+   final TestingLeaderContender contender = new 
TestingLeaderContender(offeredSessionIds);
+
+   leaderElectionService.start(contender);
+   leaderElectionService.stop();
+
+   if (executor.numQueuedRunnables() > 0) {
 
 Review comment:
   what safeguards are in place to ensure that this is usually true?
   I'd prefer introducing a CountdownLatch into the 
`ManuallyTriggeredScheduledExecutor` to allow waiting for X runnables.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zentol commented on a change in pull request #7935: [FLINK-11855] Fix race condition in EmbeddedLeaderService#GrantLeadershipCall

2019-03-07 Thread GitBox
zentol commented on a change in pull request #7935: [FLINK-11855] Fix race 
condition in EmbeddedLeaderService#GrantLeadershipCall
URL: https://github.com/apache/flink/pull/7935#discussion_r263479048
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedLeaderService.java
 ##
 @@ -506,33 +515,28 @@ public void run() {
 
private static class GrantLeadershipCall implements Runnable {
 
-   private final EmbeddedLeaderElectionService 
leaderElectionService;
+   private final LeaderContender contender;
private final UUID leaderSessionId;
private final Logger logger;
 
GrantLeadershipCall(
-   EmbeddedLeaderElectionService 
leaderElectionService,
+   LeaderContender contender,
UUID leaderSessionId,
Logger logger) {
 
-   this.leaderElectionService = 
checkNotNull(leaderElectionService);
+   this.contender = checkNotNull(contender);
this.leaderSessionId = checkNotNull(leaderSessionId);
this.logger = checkNotNull(logger);
}
 
@Override
public void run() {
-   leaderElectionService.isLeader = true;
-
-   final LeaderContender contender = 
leaderElectionService.contender;
-
try {
contender.grantLeadership(leaderSessionId);
}
catch (Throwable t) {
logger.warn("Error granting leadership to 
contender", t);
contender.handleError(t instanceof Exception ? 
(Exception) t : new Exception(t));
-   leaderElectionService.isLeader = false;
 
 Review comment:
   The flag is no longer reset if the grant fails.
   
   I'm wondering whether we need this flag at all. It's only being used in 
`EmbeddedLeaderElectionService#hasLeadership`, but shouldn't the 
`leaderSessionId` be sufficient for checking that? This seems especially true 
now that `updateLeader` updates both `isLeader` and `currentLeaderSessionId` 
right after another.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #7937: [BP-1.7][FLINK-11855] Fix race condition in EmbeddedLeaderService#GrantLeadershipCall

2019-03-07 Thread GitBox
flinkbot commented on issue #7937: [BP-1.7][FLINK-11855] Fix race condition in 
EmbeddedLeaderService#GrantLeadershipCall
URL: https://github.com/apache/flink/pull/7937#issuecomment-470601948
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann opened a new pull request #7937: [BP-1.7][FLINK-11855] Fix race condition in EmbeddedLeaderService#GrantLeadershipCall

2019-03-07 Thread GitBox
tillrohrmann opened a new pull request #7937: [BP-1.7][FLINK-11855] Fix race 
condition in EmbeddedLeaderService#GrantLeadershipCall
URL: https://github.com/apache/flink/pull/7937
 
 
   Backport of #7935 to `release-1.7`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #7936: [BP-1.8][FLINK-11855] Fix race condition in EmbeddedLeaderService#GrantLeadershipCall

2019-03-07 Thread GitBox
flinkbot commented on issue #7936: [BP-1.8][FLINK-11855] Fix race condition in 
EmbeddedLeaderService#GrantLeadershipCall
URL: https://github.com/apache/flink/pull/7936#issuecomment-470601303
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann opened a new pull request #7936: [BP-1.8][FLINK-11855] Fix race condition in EmbeddedLeaderService#GrantLeadershipCall

2019-03-07 Thread GitBox
tillrohrmann opened a new pull request #7936: [BP-1.8][FLINK-11855] Fix race 
condition in EmbeddedLeaderService#GrantLeadershipCall
URL: https://github.com/apache/flink/pull/7936
 
 
   Backport of #7935 to `release-1.8`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #7935: [FLINK-11855] Fix race condition in EmbeddedLeaderService#GrantLeadershipCall

2019-03-07 Thread GitBox
flinkbot commented on issue #7935: [FLINK-11855] Fix race condition in 
EmbeddedLeaderService#GrantLeadershipCall
URL: https://github.com/apache/flink/pull/7935#issuecomment-470597573
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-11855) Race condition in EmbeddedLeaderService

2019-03-07 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11855?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-11855:
---
Labels: pull-request-available  (was: )

> Race condition in EmbeddedLeaderService
> ---
>
> Key: FLINK-11855
> URL: https://issues.apache.org/jira/browse/FLINK-11855
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.7.2, 1.8.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.3, 1.8.0
>
>
> There is a race condition in the {{EmbeddedLeaderService}} which can occur if 
> the {{EmbeddedLeaderService}} is shut down before the {{GrantLeadershipCall}} 
> has been executed. In this case, the {{contender}} is nulled which leads to a 
> NPE.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] tillrohrmann opened a new pull request #7935: [FLINK-11855] Fix race condition in EmbeddedLeaderService#GrantLeadershipCall

2019-03-07 Thread GitBox
tillrohrmann opened a new pull request #7935: [FLINK-11855] Fix race condition 
in EmbeddedLeaderService#GrantLeadershipCall
URL: https://github.com/apache/flink/pull/7935
 
 
   ## What is the purpose of the change
   
   Fix the race condition between executing 
EmbeddedLeaderService#GrantLeadershipCall
   and a concurrent shutdown of the leader service by making 
GrantLeadershipCall not
   accessing mutable state outside of a lock.
   
   ## Verifying this change
   
   - Added `EmbeddedLeaderServiceTest#testConcurrentGrantLeadershipAndShutdown`
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zentol commented on a change in pull request #7928: [FLINK-11850][zk] Tolerate concurrent child deletions when deleting owned zNode

2019-03-07 Thread GitBox
zentol commented on a change in pull request #7928: [FLINK-11850][zk] Tolerate 
concurrent child deletions when deleting owned zNode
URL: https://github.com/apache/flink/pull/7928#discussion_r263450942
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java
 ##
 @@ -251,7 +251,20 @@ private void cleanupZooKeeperPaths() throws Exception {
 
private void deleteOwnedZNode() throws Exception {
// delete the HA_CLUSTER_ID znode which is owned by this cluster
-   client.delete().deletingChildrenIfNeeded().forPath("/");
+
+   // Since we are using Curator version 2.12 there is a bug in 
deleting the children
+   // if there is a concurrent delete operation. Therefore we need 
to add this retry
+   // logic. See https://issues.apache.org/jira/browse/CURATOR-430 
for more information.
+   // The retry logic can be removed once we upgrade to Curator 
version >= 4.0.1.
+   boolean zNodeDeleted = false;
+   while (!zNodeDeleted) {
+   try {
+   
client.delete().deletingChildrenIfNeeded().forPath("/");
+   zNodeDeleted = true;
+   } catch (KeeperException.NoNodeException ignored) {
+   // concurrent delete operation. Try again.
 
 Review comment:
   we could log this on debug just in case.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Assigned] (FLINK-11798) Incorrect Kubernetes Documentation

2019-03-07 Thread Alex (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11798?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alex reassigned FLINK-11798:


Assignee: Alex

> Incorrect Kubernetes Documentation
> --
>
> Key: FLINK-11798
> URL: https://issues.apache.org/jira/browse/FLINK-11798
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / Kubernetes
>Affects Versions: 1.7.2
>Reporter: Pritesh Patel
>Assignee: Alex
>Priority: Major
>
> I have been trying to use the kubernetes session cluster manifests provided 
> in the documentation. The -Dtaskmanager.host flag doesn't seem to pass 
> through, meaning it uses the pod name as the host name. This wont work.
> The current docs state the args should be:
>  
> {code:java}
> args: 
> - taskmanager 
> - "-Dtaskmanager.host=$(K8S_POD_IP)"
> {code}
>  
> I did manage to get it to work by using this manifest for the taskmanager 
> instead. This did waste alot of time as it was very hard to find.
> {code:java}
> args: 
> - taskmanager.sh
> - -Dtaskmanager.host=$(K8S_POD_IP)
> - -Djobmanager.rpc.address=$(JOB_MANAGER_RPC_ADDRESS) 
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] desiam commented on issue #7861: [FLINK-11747] [ Connectors / ElasticSearch] Expose RestHighLevelClient to allow for custom sniffing

2019-03-07 Thread GitBox
desiam commented on issue #7861: [FLINK-11747] [ Connectors / ElasticSearch] 
Expose RestHighLevelClient to allow for custom sniffing
URL: https://github.com/apache/flink/pull/7861#issuecomment-470579574
 
 
   @tzulitai If it's difficult to expose the RestHighLevelClient, we may look 
into creating our own implementation of the Elasticsearch 6 ElasticsearchSink.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-11855) Race condition in EmbeddedLeaderService

2019-03-07 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11855?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann updated FLINK-11855:
--
Priority: Major  (was: Critical)

> Race condition in EmbeddedLeaderService
> ---
>
> Key: FLINK-11855
> URL: https://issues.apache.org/jira/browse/FLINK-11855
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.7.2, 1.8.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
> Fix For: 1.7.3, 1.8.0
>
>
> There is a race condition in the {{EmbeddedLeaderService}} which can occur if 
> the {{EmbeddedLeaderService}} is shut down before the {{GrantLeadershipCall}} 
> has been executed. In this case, the {{contender}} is nulled which leads to a 
> NPE.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11855) Race condition in EmbeddedLeaderService

2019-03-07 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-11855:
-

 Summary: Race condition in EmbeddedLeaderService
 Key: FLINK-11855
 URL: https://issues.apache.org/jira/browse/FLINK-11855
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Affects Versions: 1.7.2, 1.8.0
Reporter: Till Rohrmann
Assignee: Till Rohrmann
 Fix For: 1.7.3, 1.8.0


There is a race condition in the {{EmbeddedLeaderService}} which can occur if 
the {{EmbeddedLeaderService}} is shut down before the {{GrantLeadershipCall}} 
has been executed. In this case, the {{contender}} is nulled which leads to a 
NPE.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] azagrebin commented on a change in pull request #7921: [FLINK-11825][StateBackends] Resolve name class of StateTTL TimeCharacteristic class

2019-03-07 Thread GitBox
azagrebin commented on a change in pull request #7921: 
[FLINK-11825][StateBackends] Resolve name class of StateTTL TimeCharacteristic 
class
URL: https://github.com/apache/flink/pull/7921#discussion_r263432284
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java
 ##
 @@ -71,32 +73,61 @@
}
 
/**
+* @deprecated use {@link TtlTimeCharacteristic} to avoid class name 
clash
+* with org.apache.flink.streaming.api.TimeCharacteristic.
+*
 * This option configures time scale to use for ttl.
 */
+   @Deprecated
public enum TimeCharacteristic {
/** Processing time, see also 
TimeCharacteristic.ProcessingTime. */
ProcessingTime
}
 
+   /**
+* This option configures time scale to use for ttl.
+*/
+   public enum TtlTimeCharacteristic {
+   /** Processing time, see also 
org.apache.flink.streaming.api.TimeCharacteristic.ProcessingTime. 
*/
+   ProcessingTime
+   }
+
private final UpdateType updateType;
private final StateVisibility stateVisibility;
-   private final TimeCharacteristic timeCharacteristic;
+   @Deprecated
+   private TimeCharacteristic timeCharacteristic;
+   private TtlTimeCharacteristic ttlTimeCharacteristic;
private final Time ttl;
private final CleanupStrategies cleanupStrategies;
 
+   @Deprecated
 
 Review comment:
   this is private, it can be just removed


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #7921: [FLINK-11825][StateBackends] Resolve name class of StateTTL TimeCharacteristic class

2019-03-07 Thread GitBox
azagrebin commented on a change in pull request #7921: 
[FLINK-11825][StateBackends] Resolve name class of StateTTL TimeCharacteristic 
class
URL: https://github.com/apache/flink/pull/7921#discussion_r263434298
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java
 ##
 @@ -205,15 +247,26 @@ public Builder neverReturnExpired() {
 *
 * @param timeCharacteristic The time characteristic configures 
time scale to use for ttl.
 */
+   @Deprecated
 
 Review comment:
   when we deprecate, we should also say what to use instead in doc comment, 
like `FoldingState`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #7921: [FLINK-11825][StateBackends] Resolve name class of StateTTL TimeCharacteristic class

2019-03-07 Thread GitBox
azagrebin commented on a change in pull request #7921: 
[FLINK-11825][StateBackends] Resolve name class of StateTTL TimeCharacteristic 
class
URL: https://github.com/apache/flink/pull/7921#discussion_r263432469
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java
 ##
 @@ -71,32 +73,61 @@
}
 
/**
+* @deprecated use {@link TtlTimeCharacteristic} to avoid class name 
clash
+* with org.apache.flink.streaming.api.TimeCharacteristic.
+*
 * This option configures time scale to use for ttl.
 */
+   @Deprecated
public enum TimeCharacteristic {
/** Processing time, see also 
TimeCharacteristic.ProcessingTime. */
ProcessingTime
}
 
+   /**
+* This option configures time scale to use for ttl.
+*/
+   public enum TtlTimeCharacteristic {
+   /** Processing time, see also 
org.apache.flink.streaming.api.TimeCharacteristic.ProcessingTime. 
*/
+   ProcessingTime
+   }
+
private final UpdateType updateType;
private final StateVisibility stateVisibility;
-   private final TimeCharacteristic timeCharacteristic;
+   @Deprecated
+   private TimeCharacteristic timeCharacteristic;
 
 Review comment:
   could we get rid of this field, also in Builder? `TimeCharacteristic` can be 
always converted into `TtlTimeCharacteristic` and back in relevant methods.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


  1   2   3   >