[jira] [Commented] (FLINK-11261) BlobServer moves file with open OutputStream
[ https://issues.apache.org/jira/browse/FLINK-11261?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16787627#comment-16787627 ] vinoyang commented on FLINK-11261: -- Hi [~Zentol] what do you think of this changing? {code:java} try (FileOutputStream fos = new FileOutputStream(incomingFile)) { // read stream byte[] buf = new byte[BUFFER_SIZE]; while (true) { final int bytesRead = inputStream.read(buf); if (bytesRead == -1) { // done break; } fos.write(buf, 0, bytesRead); md.update(buf, 0, bytesRead); } } finally { try { blobKey = moveTempFileToStore(incomingFile, jobId, md.digest(), blobType); return blobKey; } finally { // delete incomingFile from a failed download if (!incomingFile.delete() && incomingFile.exists()) { LOG.warn("Could not delete the staging file {} for blob key {} and job {}.", incomingFile, blobKey, jobId); } } } {code} > BlobServer moves file with open OutputStream > > > Key: FLINK-11261 > URL: https://issues.apache.org/jira/browse/FLINK-11261 > Project: Flink > Issue Type: Bug > Components: Runtime / Operators >Affects Versions: 1.8.0 >Reporter: Chesnay Schepler >Assignee: vinoyang >Priority: Major > Fix For: 1.8.0 > > > Various tests fail on Windows because the BlobServer attempts to move a file > while a {{FileOutputStream}} is still open: > BlobServer#putInputStream(): > {code} > try (FileOutputStream fos = new FileOutputStream(incomingFile)) { > [ ... use fos ... ] > // moves file even though fos is still open > blobKey = moveTempFileToStore(incomingFile, jobId, md.digest(), > blobType); > return blobKey; > } finally { > ... > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Issue Comment Deleted] (FLINK-10884) Flink on yarn TM container will be killed by nodemanager because of the exceeded physical memory.
[ https://issues.apache.org/jira/browse/FLINK-10884?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] chunpinghe updated FLINK-10884: --- Comment: was deleted (was: what's your solution? yarn will check the physical memory used by container by default, you can disable it by set {color:#6a8759}yarn.nodemanager.pmem-check-enabled {color:#33}to false. in your example, if your container use too much offheap memory(directory memory , or jni malloc) lead to total memory exceeds 3g then the container will be killed anyhow.{color} {color} {color:#6a8759}{color:#33}so, if your container was always killed by nodemanager you shoud check if the total memory you provided for it is not sufficient or your code has memory leak (mainly native memory leak){color}{color} ) > Flink on yarn TM container will be killed by nodemanager because of the > exceeded physical memory. > > > Key: FLINK-10884 > URL: https://issues.apache.org/jira/browse/FLINK-10884 > Project: Flink > Issue Type: Bug > Components: Deployment / YARN, Runtime / Coordination >Affects Versions: 1.5.5, 1.6.2, 1.7.0 > Environment: version : 1.6.2 > module : flink on yarn > centos jdk1.8 > hadoop 2.7 >Reporter: wgcn >Assignee: wgcn >Priority: Major > Labels: pull-request-available, yarn > > TM container will be killed by nodemanager because of the exceeded > [physical|http://www.baidu.com/link?url=Y4LyfMDH59n9-Ey16Fo6EFAYltN1e9anB3y2ynhVmdvuIBCkJGdH0hTExKDZRvXNr6hqhwIXs8JjYqesYbx0BOpQDD0o1VjbVQlOC-9MgXi] > memory. I found the lanuch context lanuching TM container that > "container memory = heap memory+ offHeapSizeMB" at the class > org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters > from line 160 to 166 I set a safety margin for the whole memory container > using. For example if the container limit 3g memory, the sum memory that > "heap memory+ offHeapSizeMB" is equal to 2.4g to prevent the container > being killed.Do we have the > [ready-made|http://www.baidu.com/link?url=ylC8cEafGU6DWAdU9ADcJPNugkjbx6IjtqIIxJ9foX4_Yfgc7ctWmpEpQRettVmBiOy7Wfph7S1UvN5LiJj-G1Rsb--oDw4Z2OEbA5Fj0bC] > solution or I can commit my solution -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11860) Remove all the usage of deprecated unit-provided memory options in docs and scripts
[ https://issues.apache.org/jira/browse/FLINK-11860?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16787629#comment-16787629 ] Yun Tang commented on FLINK-11860: -- This issue is inspired by user mail https://lists.apache.org/thread.html/c130622cac22cec96106b5770ad8ec2453f85badf0dd1d00cd9aa420@%3Cuser.flink.apache.org%3E > Remove all the usage of deprecated unit-provided memory options in docs and > scripts > --- > > Key: FLINK-11860 > URL: https://issues.apache.org/jira/browse/FLINK-11860 > Project: Flink > Issue Type: Improvement > Components: Deployment / Scripts, Documentation >Reporter: Yun Tang >Assignee: Yun Tang >Priority: Major > Fix For: 1.9.0 > > > Currently, options with unit provided ,e.g. {{jobmanager.heap.mb}} and > {{taskmanager.heap.mb}} have already been deprecated. However, these options > are still showed in documentation and deployment scripts. We should remove > these to not confuse users. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] JingsongLi commented on a change in pull request #7913: [FLINK-11837][table-runtime-blink] Improve internal data format
JingsongLi commented on a change in pull request #7913: [FLINK-11837][table-runtime-blink] Improve internal data format URL: https://github.com/apache/flink/pull/7913#discussion_r263688467 ## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/JoinedRow.java ## @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License.You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.dataformat; + +/** + * Join two row to one row. + */ +public final class JoinedRow implements BaseRow { + + private BaseRow row1; + private BaseRow row2; + private byte header; + + public JoinedRow() {} + + public JoinedRow(BaseRow row1, BaseRow row2) { + this.row1 = row1; + this.row2 = row2; + } + + public JoinedRow replace(BaseRow row1, BaseRow row2) { + this.row1 = row1; + this.row2 = row2; + return this; + } + + @Override + public int getArity() { + return row1.getArity() + row2.getArity(); + } + + @Override + public byte getHeader() { + return header; + } + + @Override + public void setHeader(byte header) { + this.header = header; + } + + @Override + public boolean isNullAt(int i) { + if (i < row1.getArity()) { + return row1.isNullAt(i); + } else { + return row2.isNullAt(i - row1.getArity()); + } + } + + @Override + public boolean getBoolean(int i) { + if (i < row1.getArity()) { + return row1.getBoolean(i); + } else { + return row2.getBoolean(i - row1.getArity()); + } + } + + @Override + public byte getByte(int i) { + if (i < row1.getArity()) { + return row1.getByte(i); + } else { + return row2.getByte(i - row1.getArity()); + } + } + + @Override + public short getShort(int i) { + if (i < row1.getArity()) { + return row1.getShort(i); + } else { + return row2.getShort(i - row1.getArity()); + } + } + + @Override + public int getInt(int i) { + if (i < row1.getArity()) { + return row1.getInt(i); + } else { + return row2.getInt(i - row1.getArity()); + } + } + + @Override + public long getLong(int i) { + if (i < row1.getArity()) { + return row1.getLong(i); + } else { + return row2.getLong(i - row1.getArity()); + } + } + + @Override + public float getFloat(int i) { + if (i < row1.getArity()) { + return row1.getFloat(i); + } else { + return row2.getFloat(i - row1.getArity()); + } + } + + @Override + public double getDouble(int i) { + if (i < row1.getArity()) { + return row1.getDouble(i); + } else { + return row2.getDouble(i - row1.getArity()); + } + } + + @Override + public char getChar(int i) { + if (i < row1.getArity()) { + return row1.getChar(i); + } else { + return row2.getChar(i - row1.getArity()); + } + } + + @Override + public Decimal getDecimal(int i, int precision, int scale) { + if (i < row1.getArity()) { + return row1.getDecimal(i, precision, scale); + } else { + return row2.getDecimal(i - row1.getArity(), precision, scale); + } + } + + @Override + public BinaryGeneric getGeneric(int i) { + if (i < row1.getArity()) { + retu
[GitHub] [flink] JingsongLi commented on a change in pull request #7941: [FLINK-11858][table-runtime-blink] Introduce block compression to batch table runtime
JingsongLi commented on a change in pull request #7941: [FLINK-11858][table-runtime-blink] Introduce block compression to batch table runtime URL: https://github.com/apache/flink/pull/7941#discussion_r263676627 ## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/compression/Lz4BlockDecompressor.java ## @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.compression; + +import net.jpountz.lz4.LZ4Exception; +import net.jpountz.lz4.LZ4Factory; +import net.jpountz.lz4.LZ4FastDecompressor; +import net.jpountz.util.SafeUtils; + +import java.nio.ByteBuffer; + +/** + * Decode data written with {@link Lz4BlockCompressor}. + * It reads from and writes to byte arrays provided from the outside, thus reducing copy time. + * + * This class is copied and modified from {@link net.jpountz.lz4.LZ4BlockInputStream}. + */ +public class Lz4BlockDecompressor implements BlockDecompressor { + + private final LZ4FastDecompressor decompressor; + + public Lz4BlockDecompressor() { + this.decompressor = LZ4Factory.fastestInstance().fastDecompressor(); + } + + @Override + public int decompress(ByteBuffer src, int srcOff, int srcLen, ByteBuffer dst, int dstOff) + throws DataCorruptionException { + final int prevSrcOff = src.position() + srcOff; + final int prevDstOff = dst.position() + dstOff; + + final int compressedLen = src.getInt(prevSrcOff); + final int originalLen = src.getInt(prevSrcOff + 4); + validateLength(compressedLen, originalLen); + + if (dst.capacity() - prevDstOff < originalLen) { + throw new InsufficientBufferException("Buffer length too small"); + } + + if (src.limit() - prevSrcOff - Lz4BlockCompressionFactory.HEADER_LENGTH < compressedLen) { Review comment: import Lz4BlockCompressionFactory.HEADER_LENGTH? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on a change in pull request #7941: [FLINK-11858][table-runtime-blink] Introduce block compression to batch table runtime
JingsongLi commented on a change in pull request #7941: [FLINK-11858][table-runtime-blink] Introduce block compression to batch table runtime URL: https://github.com/apache/flink/pull/7941#discussion_r263687636 ## File path: flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/compression/BlockCompressionTest.java ## @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.compression; + +import org.junit.Test; +import sun.misc.Cleaner; +import sun.nio.ch.DirectBuffer; + +import java.nio.ByteBuffer; + +import static org.junit.Assert.assertEquals; + +public class BlockCompressionTest { + + @Test + public void testLz4() { Review comment: test DataCorruptionException & InsufficientBufferException? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on a change in pull request #7941: [FLINK-11858][table-runtime-blink] Introduce block compression to batch table runtime
JingsongLi commented on a change in pull request #7941: [FLINK-11858][table-runtime-blink] Introduce block compression to batch table runtime URL: https://github.com/apache/flink/pull/7941#discussion_r263676219 ## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/compression/BlockCompressionFactory.java ## @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.compression; + +import org.apache.flink.configuration.Configuration; + +/** + * Each compression codec has a implementation of {@link BlockCompressionFactory} + * to create compressors and decompressors. + */ +public interface BlockCompressionFactory { + + void setConfiguration(Configuration configuration); Review comment: I don't know what this is for. Remove it? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] KurtYoung commented on a change in pull request #7913: [FLINK-11837][table-runtime-blink] Improve internal data format
KurtYoung commented on a change in pull request #7913: [FLINK-11837][table-runtime-blink] Improve internal data format URL: https://github.com/apache/flink/pull/7913#discussion_r263687900 ## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/JoinedRow.java ## @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License.You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.dataformat; + +/** + * Join two row to one row. + */ +public final class JoinedRow implements BaseRow { + + private BaseRow row1; + private BaseRow row2; + private byte header; + + public JoinedRow() {} + + public JoinedRow(BaseRow row1, BaseRow row2) { + this.row1 = row1; + this.row2 = row2; + } + + public JoinedRow replace(BaseRow row1, BaseRow row2) { + this.row1 = row1; + this.row2 = row2; + return this; + } + + @Override + public int getArity() { + return row1.getArity() + row2.getArity(); + } + + @Override + public byte getHeader() { + return header; + } + + @Override + public void setHeader(byte header) { + this.header = header; + } + + @Override + public boolean isNullAt(int i) { + if (i < row1.getArity()) { + return row1.isNullAt(i); + } else { + return row2.isNullAt(i - row1.getArity()); + } + } + + @Override + public boolean getBoolean(int i) { + if (i < row1.getArity()) { + return row1.getBoolean(i); + } else { + return row2.getBoolean(i - row1.getArity()); + } + } + + @Override + public byte getByte(int i) { + if (i < row1.getArity()) { + return row1.getByte(i); + } else { + return row2.getByte(i - row1.getArity()); + } + } + + @Override + public short getShort(int i) { + if (i < row1.getArity()) { + return row1.getShort(i); + } else { + return row2.getShort(i - row1.getArity()); + } + } + + @Override + public int getInt(int i) { + if (i < row1.getArity()) { + return row1.getInt(i); + } else { + return row2.getInt(i - row1.getArity()); + } + } + + @Override + public long getLong(int i) { + if (i < row1.getArity()) { + return row1.getLong(i); + } else { + return row2.getLong(i - row1.getArity()); + } + } + + @Override + public float getFloat(int i) { + if (i < row1.getArity()) { + return row1.getFloat(i); + } else { + return row2.getFloat(i - row1.getArity()); + } + } + + @Override + public double getDouble(int i) { + if (i < row1.getArity()) { + return row1.getDouble(i); + } else { + return row2.getDouble(i - row1.getArity()); + } + } + + @Override + public char getChar(int i) { + if (i < row1.getArity()) { + return row1.getChar(i); + } else { + return row2.getChar(i - row1.getArity()); + } + } + + @Override + public Decimal getDecimal(int i, int precision, int scale) { + if (i < row1.getArity()) { + return row1.getDecimal(i, precision, scale); + } else { + return row2.getDecimal(i - row1.getArity(), precision, scale); + } + } + + @Override + public BinaryGeneric getGeneric(int i) { + if (i < row1.getArity()) { + retur
[GitHub] [flink] JingsongLi commented on a change in pull request #7941: [FLINK-11858][table-runtime-blink] Introduce block compression to batch table runtime
JingsongLi commented on a change in pull request #7941: [FLINK-11858][table-runtime-blink] Introduce block compression to batch table runtime URL: https://github.com/apache/flink/pull/7941#discussion_r263675547 ## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/compression/Lz4BlockCompressor.java ## @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.compression; + +import net.jpountz.lz4.LZ4Compressor; +import net.jpountz.lz4.LZ4Exception; +import net.jpountz.lz4.LZ4Factory; + +import java.nio.BufferOverflowException; +import java.nio.ByteBuffer; + +/** + * Encode data into LZ4 format (not compatible with the LZ4 Frame format). + * It reads from and writes to byte arrays provided from the outside, thus reducing copy time. + * + * This class is copied and modified from {@link net.jpountz.lz4.LZ4BlockOutputStream}. + */ +public class Lz4BlockCompressor implements BlockCompressor { + + private final LZ4Compressor compressor; + + public Lz4BlockCompressor() { + this.compressor = LZ4Factory.fastestInstance().fastCompressor(); + } + + @Override + public int getMaxCompressedSize(int srcSize) { + return Lz4BlockCompressionFactory.HEADER_LENGTH + compressor.maxCompressedLength(srcSize); Review comment: import Lz4BlockCompressionFactory.HEADER_LENGTH? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] KurtYoung commented on a change in pull request #7913: [FLINK-11837][table-runtime-blink] Improve internal data format
KurtYoung commented on a change in pull request #7913: [FLINK-11837][table-runtime-blink] Improve internal data format URL: https://github.com/apache/flink/pull/7913#discussion_r263687825 ## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/JoinedRow.java ## @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License.You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.dataformat; + +/** + * Join two row to one row. + */ +public final class JoinedRow implements BaseRow { + + private BaseRow row1; + private BaseRow row2; + private byte header; + + public JoinedRow() {} + + public JoinedRow(BaseRow row1, BaseRow row2) { + this.row1 = row1; + this.row2 = row2; + } + + public JoinedRow replace(BaseRow row1, BaseRow row2) { + this.row1 = row1; + this.row2 = row2; + return this; + } + + @Override + public int getArity() { + return row1.getArity() + row2.getArity(); + } + + @Override + public byte getHeader() { + return header; + } + + @Override + public void setHeader(byte header) { + this.header = header; + } + + @Override + public boolean isNullAt(int i) { + if (i < row1.getArity()) { + return row1.isNullAt(i); + } else { + return row2.isNullAt(i - row1.getArity()); + } + } + + @Override + public boolean getBoolean(int i) { + if (i < row1.getArity()) { + return row1.getBoolean(i); + } else { + return row2.getBoolean(i - row1.getArity()); + } + } + + @Override + public byte getByte(int i) { + if (i < row1.getArity()) { + return row1.getByte(i); + } else { + return row2.getByte(i - row1.getArity()); + } + } + + @Override + public short getShort(int i) { + if (i < row1.getArity()) { + return row1.getShort(i); + } else { + return row2.getShort(i - row1.getArity()); + } + } + + @Override + public int getInt(int i) { + if (i < row1.getArity()) { + return row1.getInt(i); + } else { + return row2.getInt(i - row1.getArity()); + } + } + + @Override + public long getLong(int i) { + if (i < row1.getArity()) { + return row1.getLong(i); + } else { + return row2.getLong(i - row1.getArity()); + } + } + + @Override + public float getFloat(int i) { + if (i < row1.getArity()) { + return row1.getFloat(i); + } else { + return row2.getFloat(i - row1.getArity()); + } + } + + @Override + public double getDouble(int i) { + if (i < row1.getArity()) { + return row1.getDouble(i); + } else { + return row2.getDouble(i - row1.getArity()); + } + } + + @Override + public char getChar(int i) { + if (i < row1.getArity()) { + return row1.getChar(i); + } else { + return row2.getChar(i - row1.getArity()); + } + } + + @Override + public Decimal getDecimal(int i, int precision, int scale) { + if (i < row1.getArity()) { + return row1.getDecimal(i, precision, scale); + } else { + return row2.getDecimal(i - row1.getArity(), precision, scale); + } + } + + @Override + public BinaryGeneric getGeneric(int i) { + if (i < row1.getArity()) { + retur
[GitHub] [flink] QiLuo-BD commented on issue #7938: [FLINK-10941] Keep slots which contain unconsumed result partitions (on top of #7186)
QiLuo-BD commented on issue #7938: [FLINK-10941] Keep slots which contain unconsumed result partitions (on top of #7186) URL: https://github.com/apache/flink/pull/7938#issuecomment-470832337 LGTM This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Assigned] (FLINK-11860) Remove all the usage of deprecated unit-provided memory options in docs and scripts
[ https://issues.apache.org/jira/browse/FLINK-11860?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Tang reassigned FLINK-11860: Assignee: Yun Tang > Remove all the usage of deprecated unit-provided memory options in docs and > scripts > --- > > Key: FLINK-11860 > URL: https://issues.apache.org/jira/browse/FLINK-11860 > Project: Flink > Issue Type: Improvement > Components: Deployment / Scripts, Documentation >Reporter: Yun Tang >Assignee: Yun Tang >Priority: Major > Fix For: 1.9.0 > > > Currently, options with unit provided ,e.g. {{jobmanager.heap.mb}} and > {{taskmanager.heap.mb}} have already been deprecated. However, these options > are still showed in documentation and deployment scripts. We should remove > these to not confuse users. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11860) Remove all the usage of deprecated unit-provided memory options in docs and scripts
Yun Tang created FLINK-11860: Summary: Remove all the usage of deprecated unit-provided memory options in docs and scripts Key: FLINK-11860 URL: https://issues.apache.org/jira/browse/FLINK-11860 Project: Flink Issue Type: Improvement Components: Deployment / Scripts, Documentation Reporter: Yun Tang Fix For: 1.9.0 Currently, options with unit provided ,e.g. {{jobmanager.heap.mb}} and {{taskmanager.heap.mb}} have already been deprecated. However, these options are still showed in documentation and deployment scripts. We should remove these to not confuse users. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] KurtYoung commented on issue #7941: [FLINK-11858][table-runtime-blink] Introduce block compression to batch table runtime
KurtYoung commented on issue #7941: [FLINK-11858][table-runtime-blink] Introduce block compression to batch table runtime URL: https://github.com/apache/flink/pull/7941#issuecomment-470829598 cc @JingsongLi This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] klion26 edited a comment on issue #7921: [FLINK-11825][StateBackends] Resolve name class of StateTTL TimeCharacteristic class
klion26 edited a comment on issue #7921: [FLINK-11825][StateBackends] Resolve name class of StateTTL TimeCharacteristic class URL: https://github.com/apache/flink/pull/7921#issuecomment-470779712 @azagrebin thank you for your review, I've updated the code, PTAL when you have time. I did not find any reference of `org.apache.flink.api.common.state.StateTtlConfig.TimeCharacteristic` in `docs/dev/stream/state/state.md`, do I miss something? hi @rmetzger seems flinkbot is off-line, azagrebin approved all, but flinkbot didn't respond. And I found that the problem exists on the other prs also. #7935 #7938 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-11859) Improve SpanningRecordSerializer performance by serializing record length to serialization buffer directly
Yingjie Cao created FLINK-11859: --- Summary: Improve SpanningRecordSerializer performance by serializing record length to serialization buffer directly Key: FLINK-11859 URL: https://issues.apache.org/jira/browse/FLINK-11859 Project: Flink Issue Type: Improvement Reporter: Yingjie Cao Assignee: Yingjie Cao In the current implementation of SpanningRecordSerializer, the length of a record is serialized to an intermediate length buffer and then copied to the target buffer. Actually, the length filed can be serialized directly to the data buffer (serializationBuffer), which can avoid the copy of length buffer. Though the total bytes copied remain unchanged, it one copy of a small record which incurs high overhead. The flink-benchmarks shows it can improve performance and the test results are as follows. Result with the optimization: |Benchmark|Mode|Threads|Samples|Score|Score Error (99.9%)|Unit|Param: channelsFlushTimeout|Param: stateBackend| |KeyByBenchmarks.arrayKeyBy|thrpt|1|30|2228.049605|77.631804|ops/ms| | | |KeyByBenchmarks.tupleKeyBy|thrpt|1|30|3968.361739|193.501755|ops/ms| | | |MemoryStateBackendBenchmark.stateBackends|thrpt|1|30|3030.016702|29.272713|ops/ms| |MEMORY| |MemoryStateBackendBenchmark.stateBackends|thrpt|1|30|2754.77678|26.215395|ops/ms| |FS| |MemoryStateBackendBenchmark.stateBackends|thrpt|1|30|3001.957606|29.288019|ops/ms| |FS_ASYNC| |RocksStateBackendBenchmark.stateBackends|thrpt|1|30|123.698984|3.339233|ops/ms| |ROCKS| |RocksStateBackendBenchmark.stateBackends|thrpt|1|30|126.252137|1.137735|ops/ms| |ROCKS_INC| |SerializationFrameworkMiniBenchmarks.serializerAvro|thrpt|1|30|323.658098|5.855697|ops/ms| | | |SerializationFrameworkMiniBenchmarks.serializerKryo|thrpt|1|30|183.34423|3.710787|ops/ms| | | |SerializationFrameworkMiniBenchmarks.serializerPojo|thrpt|1|30|404.380233|5.131744|ops/ms| | | |SerializationFrameworkMiniBenchmarks.serializerRow|thrpt|1|30|527.193369|10.176726|ops/ms| | | |SerializationFrameworkMiniBenchmarks.serializerTuple|thrpt|1|30|550.073024|11.724412|ops/ms| | | |StreamNetworkBroadcastThroughputBenchmarkExecutor.networkBroadcastThroughput|thrpt|1|30|564.690627|13.766809|ops/ms| | | |StreamNetworkThroughputBenchmarkExecutor.networkThroughput|thrpt|1|30|49918.11806|2324.234776|ops/ms|100,100ms| | |StreamNetworkThroughputBenchmarkExecutor.networkThroughput|thrpt|1|30|10443.63491|315.835962|ops/ms|100,100ms,SSL| | |StreamNetworkThroughputBenchmarkExecutor.networkThroughput|thrpt|1|30|21387.47608|2779.832704|ops/ms|1000,1ms| | |StreamNetworkThroughputBenchmarkExecutor.networkThroughput|thrpt|1|30|26585.85453|860.243347|ops/ms|1000,100ms| | |StreamNetworkThroughputBenchmarkExecutor.networkThroughput|thrpt|1|30|8252.563405|947.129028|ops/ms|1000,100ms,SSL| | |SumLongsBenchmark.benchmarkCount|thrpt|1|30|8806.021402|263.995836|ops/ms| | | |WindowBenchmarks.globalWindow|thrpt|1|30|4573.620126|112.099391|ops/ms| | | |WindowBenchmarks.sessionWindow|thrpt|1|30|585.246412|7.026569|ops/ms| | | |WindowBenchmarks.slidingWindow|thrpt|1|30|449.302134|4.123669|ops/ms| | | |WindowBenchmarks.tumblingWindow|thrpt|1|30|2979.806858|33.818909|ops/ms| | | |StreamNetworkLatencyBenchmarkExecutor.networkLatency1to1|avgt|1|30|12.842865|0.13796|ms/op| | | Result without the optimization: |Benchmark|Mode|Threads|Samples|Score|Score Error (99.9%)|Unit|Param: channelsFlushTimeout|Param: stateBackend| |KeyByBenchmarks.arrayKeyBy|thrpt|1|30|2060.241715|59.898485|ops/ms| | | |KeyByBenchmarks.tupleKeyBy|thrpt|1|30|3645.306819|223.821719|ops/ms| | | |MemoryStateBackendBenchmark.stateBackends|thrpt|1|30|2992.698822|36.978115|ops/ms| |MEMORY| |MemoryStateBackendBenchmark.stateBackends|thrpt|1|30|2756.10949|27.798937|ops/ms| |FS| |MemoryStateBackendBenchmark.stateBackends|thrpt|1|30|2965.969876|44.159793|ops/ms| |FS_ASYNC| |RocksStateBackendBenchmark.stateBackends|thrpt|1|30|125.506942|1.245978|ops/ms| |ROCKS| |RocksStateBackendBenchmark.stateBackends|thrpt|1|30|127.258737|1.190588|ops/ms| |ROCKS_INC| |SerializationFrameworkMiniBenchmarks.serializerAvro|thrpt|1|30|316.497954|8.309241|ops/ms| | | |SerializationFrameworkMiniBenchmarks.serializerKryo|thrpt|1|30|189.065149|6.302073|ops/ms| | | |SerializationFrameworkMiniBenchmarks.serializerPojo|thrpt|1|30|391.51305|7.750728|ops/ms| | | |SerializationFrameworkMiniBenchmarks.serializerRow|thrpt|1|30|513.611151|10.640899|ops/ms| | | |SerializationFrameworkMiniBenchmarks.serializerTuple|thrpt|1|30|534.184947|14.370082|ops/ms| | | |StreamNetworkBroadcastThroughputBenchmarkExecutor.networkBroadcastThroughput|thrpt|1|30|483.388618|19.506723|ops/ms| | | |StreamNetworkThroughputBenchmarkExecutor.networkThroughput|thrpt|1|30|42777.70615|4981.87539|ops/ms|100,100ms| | |StreamNetworkThroughputBenchmarkExecutor.networkThroughput|thrpt|1|30|10201.48525|286.248845|ops/ms|100,100ms,SSL| | |StreamNetworkThr
[jira] [Updated] (FLINK-11696) [checkpoint] Avoid to send mkdir requests to DFS from task side
[ https://issues.apache.org/jira/browse/FLINK-11696?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-11696: --- Labels: pull-request-available (was: ) > [checkpoint] Avoid to send mkdir requests to DFS from task side > --- > > Key: FLINK-11696 > URL: https://issues.apache.org/jira/browse/FLINK-11696 > Project: Flink > Issue Type: Improvement > Components: Runtime / Checkpointing >Reporter: Yun Tang >Assignee: Yun Tang >Priority: Major > Labels: pull-request-available > > Currently, when we create checkpoint directory in the distributed file > system. Not only {{CheckpointCoordinator}} but also {{FsCheckpointStorage}} > in {{StreamTask}} would create the {{checkpointsDirectory}}, > {{sharedStateDirectory}} and {{taskOwnedStateDirectory}}. These many > {{mkdir}} RPC requests would cause a very high pressure on the distributed > file system, especially when the parallelism is large or jobs continue to > failover again and again. > We could avoid these {{mkdir}} requests from task side if writing to a > distributed file system. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] Myasuka opened a new pull request #7942: [FLINK-11696][checkpoint] Avoid to send mkdir requests to DFS from task side
Myasuka opened a new pull request #7942: [FLINK-11696][checkpoint] Avoid to send mkdir requests to DFS from task side URL: https://github.com/apache/flink/pull/7942 ## What is the purpose of the change Previously, not only checkpoint coordinator but also tasks would always send those `mkdir` requests out when initialization. This would actually put a lot of pressure to master of distributed file system. This pull request avoids to call file system's `mkdir` when creating `CheckpointStorage`, but call it only once when checkpoint coordinator `initializeLocationForCheckpoint`. ## Brief change log - Move the `mkdir` operations lazily to first `initializeLocationForCheckpoint` instead of each time when creating the `CheckpointStorage`. ## Verifying this change This change added tests and can be verified as follows: - refactor `testStorageLocationMkdirs` to verify checkpoint directories would be created once `initializeLocationForCheckpoint` instead of each time creating the `CheckpointStorage`. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): **no** - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: **no** - The serializers: **no** - The runtime per-record code paths (performance sensitive): **no** - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: **no** - The S3 file system connector: **no** ## Documentation - Does this pull request introduce a new feature? **no** - If yes, how is the feature documented? **not applicable** This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #7942: [FLINK-11696][checkpoint] Avoid to send mkdir requests to DFS from task side
flinkbot commented on issue #7942: [FLINK-11696][checkpoint] Avoid to send mkdir requests to DFS from task side URL: https://github.com/apache/flink/pull/7942#issuecomment-470819951 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-11858) Introduce block compressor/decompressor for batch table runtime
[ https://issues.apache.org/jira/browse/FLINK-11858?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-11858: --- Labels: pull-request-available (was: ) > Introduce block compressor/decompressor for batch table runtime > --- > > Key: FLINK-11858 > URL: https://issues.apache.org/jira/browse/FLINK-11858 > Project: Flink > Issue Type: Improvement > Components: Runtime / Operators >Reporter: Kurt Young >Assignee: Kurt Young >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] flinkbot commented on issue #7941: [FLINK-11858][table-runtime-blink] Introduce block compression to batch table runtime
flinkbot commented on issue #7941: [FLINK-11858][table-runtime-blink] Introduce block compression to batch table runtime URL: https://github.com/apache/flink/pull/7941#issuecomment-470816714 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] KurtYoung opened a new pull request #7941: [FLINK-11858][table-runtime-blink] Introduce block compression to batch table runtime
KurtYoung opened a new pull request #7941: [FLINK-11858][table-runtime-blink] Introduce block compression to batch table runtime URL: https://github.com/apache/flink/pull/7941 ## What is the purpose of the change Introduce BlockCompressor and BlockDecompressor to batch table runtime for future usage. ## Brief change log - Add BlockCompressor interface - Add BlockDecompressor interface - Add LZ4 implementation based on the interfaces ## Verifying this change Added unit tests ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes, lz4-java is introduced which is Apache license) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (JavaDocs) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #7940: [hotfix][docs] fix error in functions example
flinkbot commented on issue #7940: [hotfix][docs] fix error in functions example URL: https://github.com/apache/flink/pull/7940#issuecomment-470805294 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] leesf opened a new pull request #7940: [hotfix][docs] fix error in functions example
leesf opened a new pull request #7940: [hotfix][docs] fix error in functions example URL: https://github.com/apache/flink/pull/7940 fix error in functions example. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #7913: [FLINK-11837][table-runtime-blink] Improve internal data format
wuchong commented on a change in pull request #7913: [FLINK-11837][table-runtime-blink] Improve internal data format URL: https://github.com/apache/flink/pull/7913#discussion_r263663630 ## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BinaryGenericTypeInfo.java ## @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.typeutils; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.table.dataformat.BinaryGeneric; +import org.apache.flink.table.type.GenericType; + +/** + * TypeInfo for BaseArray. Review comment: for BinaryGeneric ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #7913: [FLINK-11837][table-runtime-blink] Improve internal data format
wuchong commented on a change in pull request #7913: [FLINK-11837][table-runtime-blink] Improve internal data format URL: https://github.com/apache/flink/pull/7913#discussion_r263663580 ## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BinaryGenericSerializer.java ## @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.typeutils; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility; +import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; +import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream; +import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.table.dataformat.BinaryGeneric; +import org.apache.flink.table.dataformat.Decimal; +import org.apache.flink.table.type.GenericType; +import org.apache.flink.table.util.SegmentsUtil; +import org.apache.flink.util.InstantiationUtil; + +import java.io.IOException; + +/** + * Serializer for {@link Decimal}. Review comment: for BinaryGeneric ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-11858) Introduce block compressor/decompressor for batch table runtime
Kurt Young created FLINK-11858: -- Summary: Introduce block compressor/decompressor for batch table runtime Key: FLINK-11858 URL: https://issues.apache.org/jira/browse/FLINK-11858 Project: Flink Issue Type: Improvement Components: Runtime / Operators Reporter: Kurt Young Assignee: Kurt Young -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11857) Introduce BinaryExternalSorter to batch table runtime
Jingsong Lee created FLINK-11857: Summary: Introduce BinaryExternalSorter to batch table runtime Key: FLINK-11857 URL: https://issues.apache.org/jira/browse/FLINK-11857 Project: Flink Issue Type: New Feature Components: Runtime / Operators Environment: We need a sorter to take full advantage of the high performance of the Binary format. Reporter: Jingsong Lee Assignee: Jingsong Lee -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] TisonKun edited a comment on issue #7927: [FLINK-11603][metrics] Port the MetricQueryService to the new RpcEndpoint
TisonKun edited a comment on issue #7927: [FLINK-11603][metrics] Port the MetricQueryService to the new RpcEndpoint URL: https://github.com/apache/flink/pull/7927#issuecomment-470792906 travis failed on ``` 02:03:48.550 [ERROR] Failures: 02:03:48.550 [ERROR] YARNHighAvailabilityITCase>YarnTestBase.sleep:236 There is at least one application on the cluster that is not finished.[App application_1552010113856_0001 is in state RUNNING.] 02:03:48.550 [ERROR] Errors: 02:03:48.551 [ERROR] org.apache.flink.yarn.YARNHighAvailabilityITCase.testJobRecoversAfterKillingTaskManager(org.apache.flink.yarn.YARNHighAvailabilityITCase) 02:03:48.551 [ERROR] Run 1: YARNHighAvailabilityITCase.testJobRecoversAfterKillingTaskManager:193->waitUntilJobIsRestarted:336 » IllegalArgument 02:03:48.551 [ERROR] Run 2: YARNHighAvailabilityITCase>YarnTestBase.sleep:236 There is at least one application on the cluster that is not finished.[App application_1552010113856_0001 is in state RUNNING.] 02:03:48.551 [INFO] ``` However, I cannot reproduce locally. Since the reported failing point is about querying metric it **might** be relevant but I see no chance to cause an `IllegalArgument` exception. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] TisonKun commented on issue #7927: [FLINK-11603][metrics] Port the MetricQueryService to the new RpcEndpoint
TisonKun commented on issue #7927: [FLINK-11603][metrics] Port the MetricQueryService to the new RpcEndpoint URL: https://github.com/apache/flink/pull/7927#issuecomment-470792906 travis failed on ``` 02:03:48.550 [ERROR] Failures: 02:03:48.550 [ERROR] YARNHighAvailabilityITCase>YarnTestBase.sleep:236 There is at least one application on the cluster that is not finished.[App application_1552010113856_0001 is in state RUNNING.] 02:03:48.550 [ERROR] Errors: 02:03:48.551 [ERROR] org.apache.flink.yarn.YARNHighAvailabilityITCase.testJobRecoversAfterKillingTaskManager(org.apache.flink.yarn.YARNHighAvailabilityITCase) 02:03:48.551 [ERROR] Run 1: YARNHighAvailabilityITCase.testJobRecoversAfterKillingTaskManager:193->waitUntilJobIsRestarted:336 » IllegalArgument 02:03:48.551 [ERROR] Run 2: YARNHighAvailabilityITCase>YarnTestBase.sleep:236 There is at least one application on the cluster that is not finished.[App application_1552010113856_0001 is in state RUNNING.] 02:03:48.551 [INFO] ``` However, I cannot reproduce locally. Since the reported failing point is about querying metric it **might** be relevant by I see no chance to cause an `IllegalArgument` exception. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on a change in pull request #7931: [FLINK-11854] [table-planner-blink] Introduce batch physical nodes
JingsongLi commented on a change in pull request #7931: [FLINK-11854] [table-planner-blink] Introduce batch physical nodes URL: https://github.com/apache/flink/pull/7931#discussion_r263655776 ## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryRow.java ## @@ -63,6 +91,22 @@ public static int calculateFixPartSizeInBytes(int arity) { return calculateBitSetWidthInBytes(arity) + 8 * arity; } + /** +* If it is a fixed-length field, we can call this BinaryRow's setXX method for in-place updates. +* If it is variable-length field, can't use this method, because the underlying data is stored continuously. +*/ + public static boolean isFixedLength(InternalType type) { + if (type instanceof DecimalType) { + return ((DecimalType) type).precision() <= DecimalType.MAX_COMPACT_PRECISION; + } else { + return MUTABLE_FIELD_TYPES.contains(type); + } + } + + public static boolean isMutable(InternalType type) { + return MUTABLE_FIELD_TYPES.contains(type) || type instanceof DecimalType; Review comment: No matter how precision is, it is mutable. Precision is compact, Decimal is stored in 8 bytes in fix-length-part. Precision is not compact, Decimal is stored in 16 bytes in var-length-part. It can always be update. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on a change in pull request #7931: [FLINK-11854] [table-planner-blink] Introduce batch physical nodes
JingsongLi commented on a change in pull request #7931: [FLINK-11854] [table-planner-blink] Introduce batch physical nodes URL: https://github.com/apache/flink/pull/7931#discussion_r263655861 ## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryRow.java ## @@ -63,6 +91,22 @@ public static int calculateFixPartSizeInBytes(int arity) { return calculateBitSetWidthInBytes(arity) + 8 * arity; } + /** +* If it is a fixed-length field, we can call this BinaryRow's setXX method for in-place updates. +* If it is variable-length field, can't use this method, because the underlying data is stored continuously. +*/ + public static boolean isFixedLength(InternalType type) { + if (type instanceof DecimalType) { + return ((DecimalType) type).precision() <= DecimalType.MAX_COMPACT_PRECISION; + } else { + return MUTABLE_FIELD_TYPES.contains(type); + } + } + + public static boolean isMutable(InternalType type) { + return MUTABLE_FIELD_TYPES.contains(type) || type instanceof DecimalType; Review comment: method isFixedLength should be isInFixedLengthPart This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on a change in pull request #7931: [FLINK-11854] [table-planner-blink] Introduce batch physical nodes
JingsongLi commented on a change in pull request #7931: [FLINK-11854] [table-planner-blink] Introduce batch physical nodes URL: https://github.com/apache/flink/pull/7931#discussion_r263655776 ## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryRow.java ## @@ -63,6 +91,22 @@ public static int calculateFixPartSizeInBytes(int arity) { return calculateBitSetWidthInBytes(arity) + 8 * arity; } + /** +* If it is a fixed-length field, we can call this BinaryRow's setXX method for in-place updates. +* If it is variable-length field, can't use this method, because the underlying data is stored continuously. +*/ + public static boolean isFixedLength(InternalType type) { + if (type instanceof DecimalType) { + return ((DecimalType) type).precision() <= DecimalType.MAX_COMPACT_PRECISION; + } else { + return MUTABLE_FIELD_TYPES.contains(type); + } + } + + public static boolean isMutable(InternalType type) { + return MUTABLE_FIELD_TYPES.contains(type) || type instanceof DecimalType; Review comment: No matter how precision is, it is mutable. Precision is compact, Decimal is stored in 8 bytes.. Precision is not compact, Decimal is stored in 16 bytes. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-11856) Introduce BinaryHashTable and LongHashTable to batch table runtime
Kurt Young created FLINK-11856: -- Summary: Introduce BinaryHashTable and LongHashTable to batch table runtime Key: FLINK-11856 URL: https://issues.apache.org/jira/browse/FLINK-11856 Project: Flink Issue Type: Improvement Components: Runtime / Operators Reporter: Kurt Young Assignee: Kurt Young -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] wuchong commented on issue #7923: [FLINK-11703][table-planner-blink] Introduce TableEnvironments and support registerDataStream and sqlQuery
wuchong commented on issue #7923: [FLINK-11703][table-planner-blink] Introduce TableEnvironments and support registerDataStream and sqlQuery URL: https://github.com/apache/flink/pull/7923#issuecomment-470792029 Commits squashed and re-titled. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on issue #7913: [FLINK-11837][table-runtime-blink] Improve internal data format
JingsongLi commented on issue #7913: [FLINK-11837][table-runtime-blink] Improve internal data format URL: https://github.com/apache/flink/pull/7913#issuecomment-470791761 @KurtYoung Thanks for your suggestion, I have fixed it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on issue #7186: [FLINK-10941] Keep slots which contain unconsumed result partitions
zhijiangW commented on issue #7186: [FLINK-10941] Keep slots which contain unconsumed result partitions URL: https://github.com/apache/flink/pull/7186#issuecomment-470780791 Agree with @azagrebin 's suggestions for checking partition registered/released status in current `NetworkEnvironment#ResultPartitionManager` which is suitable for the plan of `ShuffleService`. And it would be better if we could provide the option to support two strategies for releasing TM (waiting consumer or not). I have no other concerns. :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] klion26 commented on a change in pull request #7895: [FLINK-11126][YARN][security] Filter out AMRMToken in TaskManager‘s credentials
klion26 commented on a change in pull request #7895: [FLINK-11126][YARN][security] Filter out AMRMToken in TaskManager‘s credentials URL: https://github.com/apache/flink/pull/7895#discussion_r263646747 ## File path: flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java ## @@ -18,25 +18,54 @@ package org.apache.flink.yarn; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.testutils.CommonTestUtils; +import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; import org.apache.flink.util.TestLogger; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.File; import java.nio.file.Files; import java.nio.file.Path; +import java.util.Collection; import java.util.Collections; +import java.util.HashMap; +import java.util.Map; import java.util.stream.Stream; import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; Review comment: @walterddr I'm not sure using mockito will introduce instability in the flink-yarn/flink-yarn-test package, I saw this before, but I can not remember which package it is. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] klion26 commented on issue #7921: [FLINK-11825][StateBackends] Resolve name class of StateTTL TimeCharacteristic class
klion26 commented on issue #7921: [FLINK-11825][StateBackends] Resolve name class of StateTTL TimeCharacteristic class URL: https://github.com/apache/flink/pull/7921#issuecomment-470779712 @azagrebin thank you for your review, I've updated the code, PTAL when you have time. hi @rmetzger seems flinkbot is off-line, azagrebin approved all, but flinkbot didn't respond. And I found that the problem exists on the other prs also. #7935 #7938 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] KurtYoung commented on a change in pull request #7931: [FLINK-11854] [table-planner-blink] Introduce batch physical nodes
KurtYoung commented on a change in pull request #7931: [FLINK-11854] [table-planner-blink] Introduce batch physical nodes URL: https://github.com/apache/flink/pull/7931#discussion_r263639880 ## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryRow.java ## @@ -63,6 +91,22 @@ public static int calculateFixPartSizeInBytes(int arity) { return calculateBitSetWidthInBytes(arity) + 8 * arity; } + /** +* If it is a fixed-length field, we can call this BinaryRow's setXX method for in-place updates. +* If it is variable-length field, can't use this method, because the underlying data is stored continuously. +*/ + public static boolean isFixedLength(InternalType type) { + if (type instanceof DecimalType) { + return ((DecimalType) type).precision() <= DecimalType.MAX_COMPACT_PRECISION; + } else { + return MUTABLE_FIELD_TYPES.contains(type); + } + } + + public static boolean isMutable(InternalType type) { + return MUTABLE_FIELD_TYPES.contains(type) || type instanceof DecimalType; Review comment: do we also need to check Decimal's precision just like `isFixedLength` does? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] KurtYoung commented on a change in pull request #7931: [FLINK-11854] [table-planner-blink] Introduce batch physical nodes
KurtYoung commented on a change in pull request #7931: [FLINK-11854] [table-planner-blink] Introduce batch physical nodes URL: https://github.com/apache/flink/pull/7931#discussion_r263641774 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/ExpandUtil.scala ## @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.util + +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rex.{RexInputRef, RexLiteral, RexNode} + +import java.util + +import scala.collection.JavaConversions._ + +/** + * Utility methods for expand operator. + */ +object ExpandUtil { + + def projectsToString( Review comment: Can we try to move all methods like these to one util class? Various logical or physical operators will share some name formatting utilities. There will be a lot repetition if we organize it by operator. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on issue #7913: [FLINK-11837][table-runtime-blink] Improve internal data format
JingsongLi commented on issue #7913: [FLINK-11837][table-runtime-blink] Improve internal data format URL: https://github.com/apache/flink/pull/7913#issuecomment-470774081 > BTW, `NestedRow` has not been covered by any test case Yes, I will add test for NestedRow, JoinedRow, BoxedWrapperRow and GenericRow This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Assigned] (FLINK-11852) Improve Processing function example
[ https://issues.apache.org/jira/browse/FLINK-11852?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] vinoyang reassigned FLINK-11852: Assignee: TANG Wen-hui > Improve Processing function example > --- > > Key: FLINK-11852 > URL: https://issues.apache.org/jira/browse/FLINK-11852 > Project: Flink > Issue Type: Improvement > Components: Documentation >Affects Versions: 1.7.2 >Reporter: Flavio Pompermaier >Assignee: TANG Wen-hui >Priority: Minor > > In the processing function documentation > ([https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html)] > there's an "abusive" usage of the timers since a new timer is registered for > every new tuple coming in. This could cause problems in terms of allocated > objects and could burden the overall application. > It could worth to mention this problem and remove useless timers, e.g.: > > {code:java} > CountWithTimestamp current = state.value(); > if (current == null) { > current = new CountWithTimestamp(); > current.key = value.f0; > } else { > ctx.timerService().deleteEventTimeTimer(current.lastModified + timeout); > }{code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11819) Additional attribute for order by not support by flink sql, but told supported in doc
[ https://issues.apache.org/jira/browse/FLINK-11819?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16787428#comment-16787428 ] Hequn Cheng commented on FLINK-11819: - [~hustclf] Thanks for bringing up the discussion. [~xccui][~hustclf] How about update the doc to make it more clear? The explanation from [~xccui] is a good example. Best, Hequn > Additional attribute for order by not support by flink sql, but told > supported in doc > - > > Key: FLINK-11819 > URL: https://issues.apache.org/jira/browse/FLINK-11819 > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.7.2 >Reporter: Lifei Chen >Assignee: Lifei Chen >Priority: Major > Labels: pull-request-available > Fix For: 1.7.3 > > Original Estimate: 3h > Time Spent: 20m > Remaining Estimate: 2h 40m > > I am using flink v1.7.1, when I use flink sql to order by an attribute (not > time attribute), the error logs is as follow. > > sql: > {quote}"SELECT * FROM events order by tenantId" > {quote} > > error logs: > {quote}Exception in thread "main" org.apache.flink.table.api.TableException: > Cannot generate a valid execution plan for the given query: > FlinkLogicalSort(sort0=[$2], dir0=[ASC]) > FlinkLogicalNativeTableScan(table=[[_DataStreamTable_0]]) > This exception indicates that the query uses an unsupported SQL feature. > Please check the documentation for the set of currently supported SQL > features. > at > org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:377) > at > org.apache.flink.table.api.TableEnvironment.optimizePhysicalPlan(TableEnvironment.scala:302) > at > org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:814) > at > org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:860) > at > org.apache.flink.table.api.java.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:305) > at > org.apache.flink.table.api.java.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:248) > {quote} > > So as for now, only time attribute is supported by flink for command `order > by`, additional attribute is not supported yet, Is that right ? > If so, there is a mistake, indicated that other attribute except for `time > attribute` is supported . > related links: > [https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/table/sql.html#orderby--limit] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10935) Implement KubeClient with Faric8 Kubernetes clients
[ https://issues.apache.org/jira/browse/FLINK-10935?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-10935: --- Labels: pull-request-available (was: ) > Implement KubeClient with Faric8 Kubernetes clients > > > Key: FLINK-10935 > URL: https://issues.apache.org/jira/browse/FLINK-10935 > Project: Flink > Issue Type: Sub-task > Components: Deployment / Kubernetes >Reporter: JIN SUN >Assignee: Chunhui Shi >Priority: Major > Labels: pull-request-available > > Implement KubeClient with Faric8 Kubernetes clients and add tests -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] chunhui-shi commented on issue #7939: [FLINK-10935][ResourceManager] Implement KubeClient with Fabric8 Kubenetes clients
chunhui-shi commented on issue #7939: [FLINK-10935][ResourceManager] Implement KubeClient with Fabric8 Kubenetes clients URL: https://github.com/apache/flink/pull/7939#issuecomment-470764389 This is the second pull request coming after #7844. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] chunhui-shi opened a new pull request #7939: [FLINK-10935][ResourceManager] Implement KubeClient with Fabric8 Kubenetes clients
chunhui-shi opened a new pull request #7939: [FLINK-10935][ResourceManager] Implement KubeClient with Fabric8 Kubenetes clients URL: https://github.com/apache/flink/pull/7939 ##What is the purpose of the change As part of native kubernetes integration. Introduce client wrapper to access Kubernetes service. ##Brief change log * _Implement KubeClient interface and actual implementation. * _Add mock kube server which is derived from Fabric's mock implementation. * _A unit tests of KubeClient. ##Does this pull request potentially affect one of the following parts: * Dependencies (does it add or upgrade a dependency): (no) * The public API, i.e., is any changed class annotated with @Public(Evolving): ( no) * The serializers: (no ) * The runtime per-record code paths (performance sensitive): ( no ) * Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no ) * The S3 file system connector: ( no ) ##Documentation * Does this pull request introduce a new feature? (yes) * If yes, how is the feature documented? (the document will be introduced in later pull requests) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #7939: [FLINK-10935][ResourceManager] Implement KubeClient with Fabric8 Kubenetes clients
flinkbot commented on issue #7939: [FLINK-10935][ResourceManager] Implement KubeClient with Fabric8 Kubenetes clients URL: https://github.com/apache/flink/pull/7939#issuecomment-470764365 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9685) Flink should support hostname-substitution for security.kerberos.login.principal
[ https://issues.apache.org/jira/browse/FLINK-9685?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16787326#comment-16787326 ] Rong Rong commented on FLINK-9685: -- Hi. I was wondering whether I fully understood the requirement for this JIRA. 1. do we want to support general substitution on principal keywords? e.g. REALM as well? 2. what kind of principal format we are going to support 3. what would be the good keyword substitution reserved for this purpose. Can we refer to the kerberos document for any possible improvement for 2/3? Regarding the implementation, should we differ this change or the replacement logic to the specific module instead, for example: {{HadoopModule}}. It seems to me that the only place should be using the principal is the security modules/contexts themselves, and each module might interpret this differently?? this is actually part of the overall effort to provide SeviceProvider pattern in FLINK-11589. > Flink should support hostname-substitution for > security.kerberos.login.principal > > > Key: FLINK-9685 > URL: https://issues.apache.org/jira/browse/FLINK-9685 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Reporter: Ethan Li >Assignee: Aleksandr Salatich >Priority: Major > Labels: pull-request-available > Time Spent: 40m > Remaining Estimate: 0h > > [https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityConfiguration.java#L83] > > We can have something like this > {code:java} > String rawPrincipal = > flinkConf.getString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL); > if (rawPrincipal != null) { >try { > rawPrincipal = rawPrincipal.replace("HOSTNAME", > InetAddress.getLocalHost().getCanonicalHostName()); >} catch (UnknownHostException e) { > LOG.error("Failed to replace HOSTNAME with localhost because {}", e); >} > } > this.principal = rawPrincipal; > {code} > So it will be easier to deploy flink to cluster. Instead of setting different > principal on every node, we can have the same principal > headless_user/HOSTNAME@DOMAIN . -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11848) Delete outdated kafka topics caused UNKNOWN_TOPIC_EXCEPTIION
[ https://issues.apache.org/jira/browse/FLINK-11848?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Anderson updated FLINK-11848: --- Description: Recently we are doing some streaming jobs with apache flink. There are multiple KAFKA topics with a format as xx_yy-mm-dd. We used a topic regex pattern to let a consumer to consume those topics. However, if we delete some older topics, it seems that the metadata in consumer does not update properly so It still remember those outdated topic in its topic list, which leads to *UNKNOWN_TOPIC_EXCEPTION*. We must restart the consumer job to recovery. It seems to occur in producer as well. Any idea to solve this problem? Thank you very much! was: Recently we are doing some streaming jobs with apache flink. There are multiple KAFKA topics with a format as xx_yy-mm-dd. We used a topic regex pattern to let a consumer to consume those topics. However, if we delete some older topics, it seems that the metadata in consumer does not update properly so It still remember those outdated topic in its topic list, which leads to *UNKNOWN_TOPIC_EXCEPTIOIN*. We must restart the consumer job to recovery. It seems to occur in producer as well. Any idea to solve this problem? Thank you very much! > Delete outdated kafka topics caused UNKNOWN_TOPIC_EXCEPTIION > > > Key: FLINK-11848 > URL: https://issues.apache.org/jira/browse/FLINK-11848 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.6.4 >Reporter: Shengnan YU >Priority: Major > > Recently we are doing some streaming jobs with apache flink. There are > multiple KAFKA topics with a format as xx_yy-mm-dd. We used a topic regex > pattern to let a consumer to consume those topics. However, if we delete some > older topics, it seems that the metadata in consumer does not update properly > so It still remember those outdated topic in its topic list, which leads to > *UNKNOWN_TOPIC_EXCEPTION*. We must restart the consumer job to recovery. It > seems to occur in producer as well. Any idea to solve this problem? Thank you > very much! > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-11851) ClusterEntrypoint provides wrong executor to HaServices
[ https://issues.apache.org/jira/browse/FLINK-11851?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann closed FLINK-11851. - Resolution: Fixed Fixed via master: 3c7ed148cf39fa81a18832f7774365d78c3af08c 1.8.0: b7d85c87195200d5cdccb30160d7f3cecd980de8 1.7.3: 215204ca2298eb6c2d71baef5d3ea1dce17d368c > ClusterEntrypoint provides wrong executor to HaServices > --- > > Key: FLINK-11851 > URL: https://issues.apache.org/jira/browse/FLINK-11851 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.7.2, 1.8.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Critical > Labels: pull-request-available > Fix For: 1.7.3, 1.8.0 > > Time Spent: 20m > Remaining Estimate: 0h > > The {{ClusterEntrypoint}} provides the executor of the common {{RpcService}} > to the {{HighAvailabilityServices}} which uses the executor to run io > operations. In I/O heavy cases, this can block all {{RpcService}} threads and > make the {{RpcEndpoints}} running in the respective {{RpcService}} > unresponsive. > I suggest to introduce a dedicated I/O executor which is used for io heavy > operations. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] asfgit merged pull request #7926: [BP-1.7][FLINK-11851] Introduce dedicated io executor for ClusterEntrypoint and MiniCluster
asfgit merged pull request #7926: [BP-1.7][FLINK-11851] Introduce dedicated io executor for ClusterEntrypoint and MiniCluster URL: https://github.com/apache/flink/pull/7926 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Resolved] (FLINK-11846) Duplicate job submission delete HA files
[ https://issues.apache.org/jira/browse/FLINK-11846?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann resolved FLINK-11846. --- Resolution: Fixed Fix Version/s: (was: 1.8.0) Fixed via master: f2b0e494e6089ccda488d6fa69cdb135a4213cf9 1.8.0: 813286bc3b27068aeb7c8dc2b98b4b6c96fbc8fb > Duplicate job submission delete HA files > > > Key: FLINK-11846 > URL: https://issues.apache.org/jira/browse/FLINK-11846 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.8.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Blocker > Labels: pull-request-available > Time Spent: 20m > Remaining Estimate: 0h > > Due to changes for FLINK-11383, the {{Dispatcher}} now delete HA files if the > client submits twice a job. A duplicate job submission should, however, > simply be rejected but not cause that HA files are being deleted. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (FLINK-11850) ZooKeeperHaServicesTest#testSimpleCloseAndCleanupAllData fails on Travis
[ https://issues.apache.org/jira/browse/FLINK-11850?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann resolved FLINK-11850. --- Resolution: Fixed Fixed via master: b464df2e5728bebee45dde7410ddb4f5ff1e6e2f 1.8.0: 2f765e7b47f9acfd7c3f538cbdb5e560a2f41a3b > ZooKeeperHaServicesTest#testSimpleCloseAndCleanupAllData fails on Travis > > > Key: FLINK-11850 > URL: https://issues.apache.org/jira/browse/FLINK-11850 > Project: Flink > Issue Type: Test > Components: Runtime / Coordination, Tests >Reporter: Congxian Qiu(klion26) >Assignee: Till Rohrmann >Priority: Critical > Labels: pull-request-available, test-stability > Time Spent: 20m > Remaining Estimate: 0h > > org.apache.flink.runtime.highavailability.zookeeper.ZooKeeperHaServicesTest > 08:20:01.694 [ERROR] > testSimpleCloseAndCleanupAllData(org.apache.flink.runtime.highavailability.zookeeper.ZooKeeperHaServicesTest) > Time elapsed: 0.076 s <<< ERROR! > org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = > NoNode for > /foo/bar/flink/default/leaderlatch/resource_manager_lock/_c_477d0124-92f3-4069-98aa-a71b8243250c-latch-00 > at > org.apache.flink.runtime.highavailability.zookeeper.ZooKeeperHaServicesTest.runCleanupTest(ZooKeeperHaServicesTest.java:203) > at > org.apache.flink.runtime.highavailability.zookeeper.ZooKeeperHaServicesTest.testSimpleCloseAndCleanupAllData(ZooKeeperHaServicesTest.java:128) > > Travis links: https://travis-ci.org/apache/flink/jobs/502960186 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] asfgit merged pull request #7925: [BP-1.8][FLINK-11851] Introduce dedicated io executor for ClusterEntrypoint and MiniCluster
asfgit merged pull request #7925: [BP-1.8][FLINK-11851] Introduce dedicated io executor for ClusterEntrypoint and MiniCluster URL: https://github.com/apache/flink/pull/7925 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] asfgit merged pull request #7919: [BP-1.8][FLINK-11846] Don't delete HA job files in case of duplicate job submission
asfgit merged pull request #7919: [BP-1.8][FLINK-11846] Don't delete HA job files in case of duplicate job submission URL: https://github.com/apache/flink/pull/7919 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] asfgit merged pull request #7929: [BP-1.8][FLINK-11850][zk] Tolerate concurrent child deletions when deleting owned zNode
asfgit merged pull request #7929: [BP-1.8][FLINK-11850][zk] Tolerate concurrent child deletions when deleting owned zNode URL: https://github.com/apache/flink/pull/7929 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] asfgit merged pull request #7924: [FLINK-11851] Introduce dedicated io executor for ClusterEntrypoint and MiniCluster
asfgit merged pull request #7924: [FLINK-11851] Introduce dedicated io executor for ClusterEntrypoint and MiniCluster URL: https://github.com/apache/flink/pull/7924 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] asfgit merged pull request #7928: [FLINK-11850][zk] Tolerate concurrent child deletions when deleting owned zNode
asfgit merged pull request #7928: [FLINK-11850][zk] Tolerate concurrent child deletions when deleting owned zNode URL: https://github.com/apache/flink/pull/7928 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] asfgit merged pull request #7918: [FLINK-11846] Don't delete HA job files in case of duplicate job submission
asfgit merged pull request #7918: [FLINK-11846] Don't delete HA job files in case of duplicate job submission URL: https://github.com/apache/flink/pull/7918 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #7713: [FLINK-10995][network] Copy intermediate serialization results only once for broadcast mode
pnowojski commented on a change in pull request #7713: [FLINK-10995][network] Copy intermediate serialization results only once for broadcast mode URL: https://github.com/apache/flink/pull/7713#discussion_r263563494 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/BroadcastRecordWriter.java ## @@ -44,4 +52,67 @@ public BroadcastRecordWriter( public void emit(T record) throws IOException, InterruptedException { broadcastEmit(record); } + + @Override + public void broadcastEmit(T record) throws IOException, InterruptedException { Review comment: > Considering initiating BufferConsumer#currentReaderPosition to LatencyMarker.size() for other channels in BroadcastRecordWriter#randomEmit, I meant this way of getting marker size seems coupled with LatencyMarker currently. We don't have to hardcode `LatencyMarker.size()`. After all when we were about to set the offset, we have just serialized the record for `randomEmit()` to a fresh `BufferBuilder` :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] walterddr commented on a change in pull request #7860: [FLINK-9685] Adding support hostname-substitution for security.kerberos login.principal
walterddr commented on a change in pull request #7860: [FLINK-9685] Adding support hostname-substitution for security.kerberos login.principal URL: https://github.com/apache/flink/pull/7860#discussion_r263552559 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityConfiguration.java ## @@ -80,13 +86,21 @@ public SecurityConfiguration(Configuration flinkConf, List securityModuleFactories) { this.isZkSaslDisable = flinkConf.getBoolean(SecurityOptions.ZOOKEEPER_SASL_DISABLE); this.keytab = flinkConf.getString(SecurityOptions.KERBEROS_LOGIN_KEYTAB); - this.principal = flinkConf.getString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL); this.useTicketCache = flinkConf.getBoolean(SecurityOptions.KERBEROS_LOGIN_USETICKETCACHE); this.loginContextNames = parseList(flinkConf.getString(SecurityOptions.KERBEROS_LOGIN_CONTEXTS)); this.zkServiceName = flinkConf.getString(SecurityOptions.ZOOKEEPER_SASL_SERVICE_NAME); this.zkLoginContextName = flinkConf.getString(SecurityOptions.ZOOKEEPER_SASL_LOGIN_CONTEXT_NAME); this.securityModuleFactories = Collections.unmodifiableList(securityModuleFactories); this.flinkConfig = checkNotNull(flinkConf); + String rawPrincipal = flinkConf.getString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL); Review comment: I am not sure this will work. I think we will have to make this as part of the `getPrincipal()` method, because it will be called via `SecurityUtils.install` in host, JM and TM? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] walterddr commented on a change in pull request #7860: [FLINK-9685] Adding support hostname-substitution for security.kerberos login.principal
walterddr commented on a change in pull request #7860: [FLINK-9685] Adding support hostname-substitution for security.kerberos login.principal URL: https://github.com/apache/flink/pull/7860#discussion_r263553448 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityConfiguration.java ## @@ -80,13 +86,21 @@ public SecurityConfiguration(Configuration flinkConf, List securityModuleFactories) { this.isZkSaslDisable = flinkConf.getBoolean(SecurityOptions.ZOOKEEPER_SASL_DISABLE); this.keytab = flinkConf.getString(SecurityOptions.KERBEROS_LOGIN_KEYTAB); - this.principal = flinkConf.getString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL); this.useTicketCache = flinkConf.getBoolean(SecurityOptions.KERBEROS_LOGIN_USETICKETCACHE); this.loginContextNames = parseList(flinkConf.getString(SecurityOptions.KERBEROS_LOGIN_CONTEXTS)); this.zkServiceName = flinkConf.getString(SecurityOptions.ZOOKEEPER_SASL_SERVICE_NAME); this.zkLoginContextName = flinkConf.getString(SecurityOptions.ZOOKEEPER_SASL_LOGIN_CONTEXT_NAME); this.securityModuleFactories = Collections.unmodifiableList(securityModuleFactories); this.flinkConfig = checkNotNull(flinkConf); + String rawPrincipal = flinkConf.getString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL); + if (rawPrincipal != null) { + try { + rawPrincipal = rawPrincipal.replace("HOSTNAME", InetAddress.getLocalHost().getCanonicalHostName()); Review comment: IN order to make dynamic replacement, I think we have to have some indicator on which of the 3 Kerberos principal is used: ``` user_name@REALM host/host_name@REALM service_name/host_name@REALM ``` and if we use `"HOSTNAME"` as reserved keyword we also need to change documentations as well This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-11798) Incorrect Kubernetes Documentation
[ https://issues.apache.org/jira/browse/FLINK-11798?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16787142#comment-16787142 ] Alex commented on FLINK-11798: -- [~pritesh-patel], unfortunately the docker entry point for public Flink 1.7 docker images doesn't pass additional arguments to Task Manager. I wasn't aware of it, as I was using https://github.com/apache/flink/blob/release-1.7/flink-container/docker/docker-entrypoint.sh for the reference. There is a [PR|https://github.com/docker-flink/docker-flink/pull/68] to allow passing additional arguments to Task Manager. But to make the described option take the effect, either Flink 1.7.2 docker images need to be rebuilt and retagged OR wait when the updated scripts would be included in some other Flink 1.7.x release. > Incorrect Kubernetes Documentation > -- > > Key: FLINK-11798 > URL: https://issues.apache.org/jira/browse/FLINK-11798 > Project: Flink > Issue Type: Bug > Components: Deployment / Kubernetes >Affects Versions: 1.7.2 >Reporter: Pritesh Patel >Assignee: Alex >Priority: Major > > I have been trying to use the kubernetes session cluster manifests provided > in the documentation. The -Dtaskmanager.host flag doesn't seem to pass > through, meaning it uses the pod name as the host name. This wont work. > The current docs state the args should be: > > {code:java} > args: > - taskmanager > - "-Dtaskmanager.host=$(K8S_POD_IP)" > {code} > > I did manage to get it to work by using this manifest for the taskmanager > instead. This did waste alot of time as it was very hard to find. > {code:java} > args: > - taskmanager.sh > - -Dtaskmanager.host=$(K8S_POD_IP) > - -Djobmanager.rpc.address=$(JOB_MANAGER_RPC_ADDRESS) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] walterddr commented on a change in pull request #7895: [FLINK-11126][YARN][security] Filter out AMRMToken in TaskManager‘s credentials
walterddr commented on a change in pull request #7895: [FLINK-11126][YARN][security] Filter out AMRMToken in TaskManager‘s credentials URL: https://github.com/apache/flink/pull/7895#discussion_r263529807 ## File path: flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java ## @@ -58,4 +87,84 @@ public void testDeleteApplicationFiles() throws Exception { assertThat(files.count(), equalTo(0L)); } } + + @Test + public void testCreateTaskExecutorContext() throws Exception { Review comment: This is very much involved, I was wondering if it's better to have a separate util method to just simply test this specific piece of removing AM_RM_TOKEN from the credential factory. and/or move it to `flink-yarn-test` package to have a more comprehensive testing? e.g. add another assertion: ``` Assert.assertThat( "AMRMToken", taskManagerRunsWithKerberos, Matchers.is(false)); ``` in `YARNSessionFIFOSecuredITCase` ?? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] walterddr commented on a change in pull request #7895: [FLINK-11126][YARN][security] Filter out AMRMToken in TaskManager‘s credentials
walterddr commented on a change in pull request #7895: [FLINK-11126][YARN][security] Filter out AMRMToken in TaskManager‘s credentials URL: https://github.com/apache/flink/pull/7895#discussion_r263529807 ## File path: flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java ## @@ -58,4 +87,84 @@ public void testDeleteApplicationFiles() throws Exception { assertThat(files.count(), equalTo(0L)); } } + + @Test + public void testCreateTaskExecutorContext() throws Exception { Review comment: This is very much involved, I was wondering if it's better to have a separate util method to just simply test this specific piece of removing AM_RM_TOKEN from the credential factory. and/or move it to `flink-yarn-test` package to have a more comprehensive testing? e.g. ``` Assert.assertThat( "AMRMToken", taskManagerRunsWithKerberos, Matchers.is(false)); ``` in `YARNSessionFIFOSecuredITCase` ?? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] walterddr commented on a change in pull request #7895: [FLINK-11126][YARN][security] Filter out AMRMToken in TaskManager‘s credentials
walterddr commented on a change in pull request #7895: [FLINK-11126][YARN][security] Filter out AMRMToken in TaskManager‘s credentials URL: https://github.com/apache/flink/pull/7895#discussion_r263528664 ## File path: flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java ## @@ -565,7 +565,20 @@ static ContainerLaunchContext createTaskExecutorContext( new File(fileLocation), HadoopUtils.getHadoopConfiguration(flinkConfig)); - cred.writeTokenStorageToStream(dob); + // Filter out AMRMToken before setting the tokens to the TaskManager container context. + Method getAllTokensMethod = Credentials.class.getMethod("getAllTokens"); + Credentials taskManagerCred = new Credentials(); + final Text amRmTokenKind = new Text("YARN_AM_RM_TOKEN"); Review comment: Can we refactor `YARN_AM_RM_TOKEN` to `static final ` indicator of the Utils class? it will also be used in test as well. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] walterddr commented on a change in pull request #7895: [FLINK-11126][YARN][security] Filter out AMRMToken in TaskManager‘s credentials
walterddr commented on a change in pull request #7895: [FLINK-11126][YARN][security] Filter out AMRMToken in TaskManager‘s credentials URL: https://github.com/apache/flink/pull/7895#discussion_r263529202 ## File path: flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java ## @@ -18,25 +18,54 @@ package org.apache.flink.yarn; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.testutils.CommonTestUtils; +import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; import org.apache.flink.util.TestLogger; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.File; import java.nio.file.Files; import java.nio.file.Path; +import java.util.Collection; import java.util.Collections; +import java.util.HashMap; +import java.util.Map; import java.util.stream.Stream; import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; Review comment: just curious, @klion26 is there any concrete discussion that mockito is introducing instability in the flink-yarn / filnk-yarn-test package? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on issue #7928: [FLINK-11850][zk] Tolerate concurrent child deletions when deleting owned zNode
tillrohrmann commented on issue #7928: [FLINK-11850][zk] Tolerate concurrent child deletions when deleting owned zNode URL: https://github.com/apache/flink/pull/7928#issuecomment-470638988 Thanks for the review @TisonKun and @zentol. Addressing Chesnay's last comment and then merging this PR. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #7928: [FLINK-11850][zk] Tolerate concurrent child deletions when deleting owned zNode
tillrohrmann commented on a change in pull request #7928: [FLINK-11850][zk] Tolerate concurrent child deletions when deleting owned zNode URL: https://github.com/apache/flink/pull/7928#discussion_r263508836 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java ## @@ -251,7 +251,20 @@ private void cleanupZooKeeperPaths() throws Exception { private void deleteOwnedZNode() throws Exception { // delete the HA_CLUSTER_ID znode which is owned by this cluster - client.delete().deletingChildrenIfNeeded().forPath("/"); + + // Since we are using Curator version 2.12 there is a bug in deleting the children + // if there is a concurrent delete operation. Therefore we need to add this retry + // logic. See https://issues.apache.org/jira/browse/CURATOR-430 for more information. + // The retry logic can be removed once we upgrade to Curator version >= 4.0.1. + boolean zNodeDeleted = false; + while (!zNodeDeleted) { + try { + client.delete().deletingChildrenIfNeeded().forPath("/"); + zNodeDeleted = true; + } catch (KeeperException.NoNodeException ignored) { + // concurrent delete operation. Try again. Review comment: True, will add it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on issue #7924: [FLINK-11851] Introduce dedicated io executor for ClusterEntrypoint and MiniCluster
tillrohrmann commented on issue #7924: [FLINK-11851] Introduce dedicated io executor for ClusterEntrypoint and MiniCluster URL: https://github.com/apache/flink/pull/7924#issuecomment-470637680 Thanks for the review @StefanRRichter and @jgrier. Merging this PR. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on issue #7935: [FLINK-11855] Fix race condition in EmbeddedLeaderService#GrantLeadershipCall
tillrohrmann commented on issue #7935: [FLINK-11855] Fix race condition in EmbeddedLeaderService#GrantLeadershipCall URL: https://github.com/apache/flink/pull/7935#issuecomment-470634979 Thanks for the review @zentol. I think you are right that we should also fix the `RevokeLeadershipCall`. I've added a fixup and addressed your comments. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] connord-stripe commented on issue #7168: [FLINK-6756][DataStream API] Provide Rich AsyncFunction to Scala API …
connord-stripe commented on issue #7168: [FLINK-6756][DataStream API] Provide Rich AsyncFunction to Scala API … URL: https://github.com/apache/flink/pull/7168#issuecomment-470629861 Neat. Thanks! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] azagrebin commented on issue #7938: [FLINK-10941] Keep slots which contain unconsumed result partitions (on top of #7186)
azagrebin commented on issue #7938: [FLINK-10941] Keep slots which contain unconsumed result partitions (on top of #7186) URL: https://github.com/apache/flink/pull/7938#issuecomment-470629769 cc @QiLuo-BD @zhijiangW @pnowojski This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] azagrebin commented on issue #7186: [FLINK-10941] Keep slots which contain unconsumed result partitions
azagrebin commented on issue #7186: [FLINK-10941] Keep slots which contain unconsumed result partitions URL: https://github.com/apache/flink/pull/7186#issuecomment-470629679 I agree with @zhijiangW. We discussed it also with @pnowojski, we might not merge it into 1.8.0 right away because we want to give it some exposure on running tests for couple of weeks, but the fix is still useful for subsequent minor releases of 1.8 and e.g. 1.7. Thinking more about delaying the releasing to moment of channel closing in happy scenario, when isEndOfPartitionEvent comes, there will probably be not so much to close apart of file handles which is not so much. Buffers are also pretty much flushed to netty and either already freed by netty or will be. Sorry for confusion, actually the current approach in PR is simpler than introduction of one more `isClosed` state. @QiLuo-BD I would suggest couple of more changes apart of using `Future` in task executor gateway, commented by @zhijiangW. The task executor gateway could use `NetworkEnviroment.PartitionManager` to check still registered and unreleased partitions because it is the central point to manage partitions. It would also simplify how it is queried now going through `TaskSlot` and lingering `Tasks`. Eventually, `NetworkEviroment` becomes `ShuffleService` which could decide whether producer can be released or not this way or another. We still make producer task executor depend on another consumer task executor, though, we do not see any problems with it now, I think it is still more safe to introduce an option as a feature flag. By default, task executor will wait for consumers, as this PR suggests, but users can always fallback to the previous behaviour using the option. I created a co-authored PR #7938 on top of your PR with those changes rebased on the fresh master. You could review it and we decide further. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] azagrebin commented on issue #7938: [FLINK-10941] Keep slots which contain unconsumed result partitions (on top of #7186)
azagrebin commented on issue #7938: [FLINK-10941] Keep slots which contain unconsumed result partitions (on top of #7186) URL: https://github.com/apache/flink/pull/7938#issuecomment-470629074 @flinkbot attention @pnowojski This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #7938: [FLINK-10941] Keep slots which contain unconsumed result partitions (on top of #7186)
flinkbot commented on issue #7938: [FLINK-10941] Keep slots which contain unconsumed result partitions (on top of #7186) URL: https://github.com/apache/flink/pull/7938#issuecomment-470628733 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] azagrebin opened a new pull request #7938: [FLINK-10941] Keep slots which contain unconsumed result partitions (on top of #7186)
azagrebin opened a new pull request #7938: [FLINK-10941] Keep slots which contain unconsumed result partitions (on top of #7186) URL: https://github.com/apache/flink/pull/7938 ## What is the purpose of the change More changes based on #7186. ## Brief change log The task executor gateway can use `NetworkEviroment.PartitionManager` to check still registered and unreleased partitions because it is the central point to manage partitions. It would also simplify how it is queried now going through `TaskSlot` and lingering `Tasks`. Eventually, `NetworkEviroment` becomes `ShuffleService` which could decide whether producer can be released or not this way or another. We still make producer task executor depend on another consumer task executor, though, we do not see any problems with it now, I think it is still more safe to introduce an option as a feature flag. By default, task executor will wait for consumers, as this PR suggests, but users can always fallback to the previous behaviour using the option. ## Verifying this change unit tests, e2e tests ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] azagrebin commented on issue #7921: [FLINK-11825][StateBackends] Resolve name class of StateTTL TimeCharacteristic class
azagrebin commented on issue #7921: [FLINK-11825][StateBackends] Resolve name class of StateTTL TimeCharacteristic class URL: https://github.com/apache/flink/pull/7921#issuecomment-470619215 @flinkbot approve all This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #7935: [FLINK-11855] Fix race condition in EmbeddedLeaderService#GrantLeadershipCall
tillrohrmann commented on a change in pull request #7935: [FLINK-11855] Fix race condition in EmbeddedLeaderService#GrantLeadershipCall URL: https://github.com/apache/flink/pull/7935#discussion_r263487660 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedLeaderServiceTest.java ## @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.highavailability.nonha.embedded; + +import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor; +import org.apache.flink.runtime.leaderelection.LeaderElectionService; +import org.apache.flink.util.TestLogger; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.UUID; +import java.util.concurrent.ArrayBlockingQueue; + +import static org.hamcrest.Matchers.is; + +/** + * Tests for the {@link EmbeddedLeaderService}. + */ +public class EmbeddedLeaderServiceTest extends TestLogger { + + /** +* Tests that the {@link EmbeddedLeaderService} can handle a concurrent grant +* leadership call and a shutdown. +*/ + @Test + public void testConcurrentGrantLeadershipAndShutdown() throws Exception { + final ManuallyTriggeredScheduledExecutor executor = new ManuallyTriggeredScheduledExecutor(); + final EmbeddedLeaderService embeddedLeaderService = new EmbeddedLeaderService(executor); + + try { + final LeaderElectionService leaderElectionService = embeddedLeaderService.createLeaderElectionService(); + + final ArrayBlockingQueue offeredSessionIds = new ArrayBlockingQueue<>(1); + final TestingLeaderContender contender = new TestingLeaderContender(offeredSessionIds); + + leaderElectionService.start(contender); + leaderElectionService.stop(); + + if (executor.numQueuedRunnables() > 0) { Review comment: But I think you are right that we could do this check a little bit better. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #7935: [FLINK-11855] Fix race condition in EmbeddedLeaderService#GrantLeadershipCall
tillrohrmann commented on a change in pull request #7935: [FLINK-11855] Fix race condition in EmbeddedLeaderService#GrantLeadershipCall URL: https://github.com/apache/flink/pull/7935#discussion_r263483901 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedLeaderService.java ## @@ -506,33 +515,28 @@ public void run() { private static class GrantLeadershipCall implements Runnable { - private final EmbeddedLeaderElectionService leaderElectionService; + private final LeaderContender contender; private final UUID leaderSessionId; private final Logger logger; GrantLeadershipCall( - EmbeddedLeaderElectionService leaderElectionService, + LeaderContender contender, UUID leaderSessionId, Logger logger) { - this.leaderElectionService = checkNotNull(leaderElectionService); + this.contender = checkNotNull(contender); this.leaderSessionId = checkNotNull(leaderSessionId); this.logger = checkNotNull(logger); } @Override public void run() { - leaderElectionService.isLeader = true; - - final LeaderContender contender = leaderElectionService.contender; - try { contender.grantLeadership(leaderSessionId); } catch (Throwable t) { logger.warn("Error granting leadership to contender", t); contender.handleError(t instanceof Exception ? (Exception) t : new Exception(t)); - leaderElectionService.isLeader = false; Review comment: I kept the flag because if we would remove it, then a user of the `EmbeddedLeaderElectionService#hasLeadership` could get a false positive if it guessed the current leadership id by chance. It is true that we don't reset the `isLeader` flat to false in case of a failure. The assumption is that the `leaderContender` will stop the leader election service in case of a failure. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #7935: [FLINK-11855] Fix race condition in EmbeddedLeaderService#GrantLeadershipCall
tillrohrmann commented on a change in pull request #7935: [FLINK-11855] Fix race condition in EmbeddedLeaderService#GrantLeadershipCall URL: https://github.com/apache/flink/pull/7935#discussion_r263482371 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedLeaderServiceTest.java ## @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.highavailability.nonha.embedded; + +import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor; +import org.apache.flink.runtime.leaderelection.LeaderElectionService; +import org.apache.flink.util.TestLogger; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.UUID; +import java.util.concurrent.ArrayBlockingQueue; + +import static org.hamcrest.Matchers.is; + +/** + * Tests for the {@link EmbeddedLeaderService}. + */ +public class EmbeddedLeaderServiceTest extends TestLogger { + + /** +* Tests that the {@link EmbeddedLeaderService} can handle a concurrent grant +* leadership call and a shutdown. +*/ + @Test + public void testConcurrentGrantLeadershipAndShutdown() throws Exception { + final ManuallyTriggeredScheduledExecutor executor = new ManuallyTriggeredScheduledExecutor(); + final EmbeddedLeaderService embeddedLeaderService = new EmbeddedLeaderService(executor); + + try { + final LeaderElectionService leaderElectionService = embeddedLeaderService.createLeaderElectionService(); + + final ArrayBlockingQueue offeredSessionIds = new ArrayBlockingQueue<>(1); + final TestingLeaderContender contender = new TestingLeaderContender(offeredSessionIds); + + leaderElectionService.start(contender); + leaderElectionService.stop(); + + if (executor.numQueuedRunnables() > 0) { Review comment: There are no safeguards since it is actually an implementation detail how the `LeaderContender` is granted leadership. Also that `start` will directly send a `GrantLeadershipCall` is an implementation detail on which we rely by not having this check. Strictly speaking also this check does not guarantee that if the `ManuallyTriggeredScheduledExecutor` has a runnable enqueued that this is the `GrantLeadershipCall` runnable. Therefore, I opted for this compromise. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on issue #7918: [FLINK-11846] Don't delete HA job files in case of duplicate job submission
tillrohrmann commented on issue #7918: [FLINK-11846] Don't delete HA job files in case of duplicate job submission URL: https://github.com/apache/flink/pull/7918#issuecomment-470611600 Thanks for the review @TisonKun and @StefanRRichter. Merging this PR. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zentol commented on a change in pull request #7935: [FLINK-11855] Fix race condition in EmbeddedLeaderService#GrantLeadershipCall
zentol commented on a change in pull request #7935: [FLINK-11855] Fix race condition in EmbeddedLeaderService#GrantLeadershipCall URL: https://github.com/apache/flink/pull/7935#discussion_r263465103 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedLeaderServiceTest.java ## @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.highavailability.nonha.embedded; + +import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor; +import org.apache.flink.runtime.leaderelection.LeaderElectionService; +import org.apache.flink.util.TestLogger; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.UUID; +import java.util.concurrent.ArrayBlockingQueue; + +import static org.hamcrest.Matchers.is; + +/** + * Tests for the {@link EmbeddedLeaderService}. + */ +public class EmbeddedLeaderServiceTest extends TestLogger { + + /** +* Tests that the {@link EmbeddedLeaderService} can handle a concurrent grant +* leadership call and a shutdown. +*/ + @Test + public void testConcurrentGrantLeadershipAndShutdown() throws Exception { + final ManuallyTriggeredScheduledExecutor executor = new ManuallyTriggeredScheduledExecutor(); + final EmbeddedLeaderService embeddedLeaderService = new EmbeddedLeaderService(executor); + + try { + final LeaderElectionService leaderElectionService = embeddedLeaderService.createLeaderElectionService(); + + final ArrayBlockingQueue offeredSessionIds = new ArrayBlockingQueue<>(1); + final TestingLeaderContender contender = new TestingLeaderContender(offeredSessionIds); + + leaderElectionService.start(contender); + leaderElectionService.stop(); + + if (executor.numQueuedRunnables() > 0) { Review comment: what safeguards are in place to ensure that this is usually true? I'd prefer introducing a CountdownLatch into the `ManuallyTriggeredScheduledExecutor` to allow waiting for X runnables. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zentol commented on a change in pull request #7935: [FLINK-11855] Fix race condition in EmbeddedLeaderService#GrantLeadershipCall
zentol commented on a change in pull request #7935: [FLINK-11855] Fix race condition in EmbeddedLeaderService#GrantLeadershipCall URL: https://github.com/apache/flink/pull/7935#discussion_r263479048 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedLeaderService.java ## @@ -506,33 +515,28 @@ public void run() { private static class GrantLeadershipCall implements Runnable { - private final EmbeddedLeaderElectionService leaderElectionService; + private final LeaderContender contender; private final UUID leaderSessionId; private final Logger logger; GrantLeadershipCall( - EmbeddedLeaderElectionService leaderElectionService, + LeaderContender contender, UUID leaderSessionId, Logger logger) { - this.leaderElectionService = checkNotNull(leaderElectionService); + this.contender = checkNotNull(contender); this.leaderSessionId = checkNotNull(leaderSessionId); this.logger = checkNotNull(logger); } @Override public void run() { - leaderElectionService.isLeader = true; - - final LeaderContender contender = leaderElectionService.contender; - try { contender.grantLeadership(leaderSessionId); } catch (Throwable t) { logger.warn("Error granting leadership to contender", t); contender.handleError(t instanceof Exception ? (Exception) t : new Exception(t)); - leaderElectionService.isLeader = false; Review comment: The flag is no longer reset if the grant fails. I'm wondering whether we need this flag at all. It's only being used in `EmbeddedLeaderElectionService#hasLeadership`, but shouldn't the `leaderSessionId` be sufficient for checking that? This seems especially true now that `updateLeader` updates both `isLeader` and `currentLeaderSessionId` right after another. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #7937: [BP-1.7][FLINK-11855] Fix race condition in EmbeddedLeaderService#GrantLeadershipCall
flinkbot commented on issue #7937: [BP-1.7][FLINK-11855] Fix race condition in EmbeddedLeaderService#GrantLeadershipCall URL: https://github.com/apache/flink/pull/7937#issuecomment-470601948 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann opened a new pull request #7937: [BP-1.7][FLINK-11855] Fix race condition in EmbeddedLeaderService#GrantLeadershipCall
tillrohrmann opened a new pull request #7937: [BP-1.7][FLINK-11855] Fix race condition in EmbeddedLeaderService#GrantLeadershipCall URL: https://github.com/apache/flink/pull/7937 Backport of #7935 to `release-1.7`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #7936: [BP-1.8][FLINK-11855] Fix race condition in EmbeddedLeaderService#GrantLeadershipCall
flinkbot commented on issue #7936: [BP-1.8][FLINK-11855] Fix race condition in EmbeddedLeaderService#GrantLeadershipCall URL: https://github.com/apache/flink/pull/7936#issuecomment-470601303 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann opened a new pull request #7936: [BP-1.8][FLINK-11855] Fix race condition in EmbeddedLeaderService#GrantLeadershipCall
tillrohrmann opened a new pull request #7936: [BP-1.8][FLINK-11855] Fix race condition in EmbeddedLeaderService#GrantLeadershipCall URL: https://github.com/apache/flink/pull/7936 Backport of #7935 to `release-1.8`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #7935: [FLINK-11855] Fix race condition in EmbeddedLeaderService#GrantLeadershipCall
flinkbot commented on issue #7935: [FLINK-11855] Fix race condition in EmbeddedLeaderService#GrantLeadershipCall URL: https://github.com/apache/flink/pull/7935#issuecomment-470597573 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-11855) Race condition in EmbeddedLeaderService
[ https://issues.apache.org/jira/browse/FLINK-11855?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-11855: --- Labels: pull-request-available (was: ) > Race condition in EmbeddedLeaderService > --- > > Key: FLINK-11855 > URL: https://issues.apache.org/jira/browse/FLINK-11855 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.7.2, 1.8.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: pull-request-available > Fix For: 1.7.3, 1.8.0 > > > There is a race condition in the {{EmbeddedLeaderService}} which can occur if > the {{EmbeddedLeaderService}} is shut down before the {{GrantLeadershipCall}} > has been executed. In this case, the {{contender}} is nulled which leads to a > NPE. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] tillrohrmann opened a new pull request #7935: [FLINK-11855] Fix race condition in EmbeddedLeaderService#GrantLeadershipCall
tillrohrmann opened a new pull request #7935: [FLINK-11855] Fix race condition in EmbeddedLeaderService#GrantLeadershipCall URL: https://github.com/apache/flink/pull/7935 ## What is the purpose of the change Fix the race condition between executing EmbeddedLeaderService#GrantLeadershipCall and a concurrent shutdown of the leader service by making GrantLeadershipCall not accessing mutable state outside of a lock. ## Verifying this change - Added `EmbeddedLeaderServiceTest#testConcurrentGrantLeadershipAndShutdown` ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zentol commented on a change in pull request #7928: [FLINK-11850][zk] Tolerate concurrent child deletions when deleting owned zNode
zentol commented on a change in pull request #7928: [FLINK-11850][zk] Tolerate concurrent child deletions when deleting owned zNode URL: https://github.com/apache/flink/pull/7928#discussion_r263450942 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java ## @@ -251,7 +251,20 @@ private void cleanupZooKeeperPaths() throws Exception { private void deleteOwnedZNode() throws Exception { // delete the HA_CLUSTER_ID znode which is owned by this cluster - client.delete().deletingChildrenIfNeeded().forPath("/"); + + // Since we are using Curator version 2.12 there is a bug in deleting the children + // if there is a concurrent delete operation. Therefore we need to add this retry + // logic. See https://issues.apache.org/jira/browse/CURATOR-430 for more information. + // The retry logic can be removed once we upgrade to Curator version >= 4.0.1. + boolean zNodeDeleted = false; + while (!zNodeDeleted) { + try { + client.delete().deletingChildrenIfNeeded().forPath("/"); + zNodeDeleted = true; + } catch (KeeperException.NoNodeException ignored) { + // concurrent delete operation. Try again. Review comment: we could log this on debug just in case. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Assigned] (FLINK-11798) Incorrect Kubernetes Documentation
[ https://issues.apache.org/jira/browse/FLINK-11798?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alex reassigned FLINK-11798: Assignee: Alex > Incorrect Kubernetes Documentation > -- > > Key: FLINK-11798 > URL: https://issues.apache.org/jira/browse/FLINK-11798 > Project: Flink > Issue Type: Bug > Components: Deployment / Kubernetes >Affects Versions: 1.7.2 >Reporter: Pritesh Patel >Assignee: Alex >Priority: Major > > I have been trying to use the kubernetes session cluster manifests provided > in the documentation. The -Dtaskmanager.host flag doesn't seem to pass > through, meaning it uses the pod name as the host name. This wont work. > The current docs state the args should be: > > {code:java} > args: > - taskmanager > - "-Dtaskmanager.host=$(K8S_POD_IP)" > {code} > > I did manage to get it to work by using this manifest for the taskmanager > instead. This did waste alot of time as it was very hard to find. > {code:java} > args: > - taskmanager.sh > - -Dtaskmanager.host=$(K8S_POD_IP) > - -Djobmanager.rpc.address=$(JOB_MANAGER_RPC_ADDRESS) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] desiam commented on issue #7861: [FLINK-11747] [ Connectors / ElasticSearch] Expose RestHighLevelClient to allow for custom sniffing
desiam commented on issue #7861: [FLINK-11747] [ Connectors / ElasticSearch] Expose RestHighLevelClient to allow for custom sniffing URL: https://github.com/apache/flink/pull/7861#issuecomment-470579574 @tzulitai If it's difficult to expose the RestHighLevelClient, we may look into creating our own implementation of the Elasticsearch 6 ElasticsearchSink. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-11855) Race condition in EmbeddedLeaderService
[ https://issues.apache.org/jira/browse/FLINK-11855?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann updated FLINK-11855: -- Priority: Major (was: Critical) > Race condition in EmbeddedLeaderService > --- > > Key: FLINK-11855 > URL: https://issues.apache.org/jira/browse/FLINK-11855 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.7.2, 1.8.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Fix For: 1.7.3, 1.8.0 > > > There is a race condition in the {{EmbeddedLeaderService}} which can occur if > the {{EmbeddedLeaderService}} is shut down before the {{GrantLeadershipCall}} > has been executed. In this case, the {{contender}} is nulled which leads to a > NPE. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11855) Race condition in EmbeddedLeaderService
Till Rohrmann created FLINK-11855: - Summary: Race condition in EmbeddedLeaderService Key: FLINK-11855 URL: https://issues.apache.org/jira/browse/FLINK-11855 Project: Flink Issue Type: Bug Components: Runtime / Coordination Affects Versions: 1.7.2, 1.8.0 Reporter: Till Rohrmann Assignee: Till Rohrmann Fix For: 1.7.3, 1.8.0 There is a race condition in the {{EmbeddedLeaderService}} which can occur if the {{EmbeddedLeaderService}} is shut down before the {{GrantLeadershipCall}} has been executed. In this case, the {{contender}} is nulled which leads to a NPE. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] azagrebin commented on a change in pull request #7921: [FLINK-11825][StateBackends] Resolve name class of StateTTL TimeCharacteristic class
azagrebin commented on a change in pull request #7921: [FLINK-11825][StateBackends] Resolve name class of StateTTL TimeCharacteristic class URL: https://github.com/apache/flink/pull/7921#discussion_r263432284 ## File path: flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java ## @@ -71,32 +73,61 @@ } /** +* @deprecated use {@link TtlTimeCharacteristic} to avoid class name clash +* with org.apache.flink.streaming.api.TimeCharacteristic. +* * This option configures time scale to use for ttl. */ + @Deprecated public enum TimeCharacteristic { /** Processing time, see also TimeCharacteristic.ProcessingTime. */ ProcessingTime } + /** +* This option configures time scale to use for ttl. +*/ + public enum TtlTimeCharacteristic { + /** Processing time, see also org.apache.flink.streaming.api.TimeCharacteristic.ProcessingTime. */ + ProcessingTime + } + private final UpdateType updateType; private final StateVisibility stateVisibility; - private final TimeCharacteristic timeCharacteristic; + @Deprecated + private TimeCharacteristic timeCharacteristic; + private TtlTimeCharacteristic ttlTimeCharacteristic; private final Time ttl; private final CleanupStrategies cleanupStrategies; + @Deprecated Review comment: this is private, it can be just removed This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] azagrebin commented on a change in pull request #7921: [FLINK-11825][StateBackends] Resolve name class of StateTTL TimeCharacteristic class
azagrebin commented on a change in pull request #7921: [FLINK-11825][StateBackends] Resolve name class of StateTTL TimeCharacteristic class URL: https://github.com/apache/flink/pull/7921#discussion_r263434298 ## File path: flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java ## @@ -205,15 +247,26 @@ public Builder neverReturnExpired() { * * @param timeCharacteristic The time characteristic configures time scale to use for ttl. */ + @Deprecated Review comment: when we deprecate, we should also say what to use instead in doc comment, like `FoldingState`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] azagrebin commented on a change in pull request #7921: [FLINK-11825][StateBackends] Resolve name class of StateTTL TimeCharacteristic class
azagrebin commented on a change in pull request #7921: [FLINK-11825][StateBackends] Resolve name class of StateTTL TimeCharacteristic class URL: https://github.com/apache/flink/pull/7921#discussion_r263432469 ## File path: flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java ## @@ -71,32 +73,61 @@ } /** +* @deprecated use {@link TtlTimeCharacteristic} to avoid class name clash +* with org.apache.flink.streaming.api.TimeCharacteristic. +* * This option configures time scale to use for ttl. */ + @Deprecated public enum TimeCharacteristic { /** Processing time, see also TimeCharacteristic.ProcessingTime. */ ProcessingTime } + /** +* This option configures time scale to use for ttl. +*/ + public enum TtlTimeCharacteristic { + /** Processing time, see also org.apache.flink.streaming.api.TimeCharacteristic.ProcessingTime. */ + ProcessingTime + } + private final UpdateType updateType; private final StateVisibility stateVisibility; - private final TimeCharacteristic timeCharacteristic; + @Deprecated + private TimeCharacteristic timeCharacteristic; Review comment: could we get rid of this field, also in Builder? `TimeCharacteristic` can be always converted into `TtlTimeCharacteristic` and back in relevant methods. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services