[GitHub] [flink] JingsongLi commented on a change in pull request #7944: [FLINK-11863][table-runtime-blink] Introduce channel to read and write compressed data
JingsongLi commented on a change in pull request #7944: [FLINK-11863][table-runtime-blink] Introduce channel to read and write compressed data URL: https://github.com/apache/flink/pull/7944#discussion_r263989639 ## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/io/CompressedBlockChannelReader.java ## @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.io; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.runtime.io.disk.iomanager.BlockChannelReader; +import org.apache.flink.runtime.io.disk.iomanager.BufferFileReader; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.io.disk.iomanager.RequestDoneCallback; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.BufferRecycler; +import org.apache.flink.runtime.io.network.buffer.NetworkBuffer; +import org.apache.flink.table.runtime.compression.BlockCompressionFactory; +import org.apache.flink.table.runtime.compression.BlockCompressor; +import org.apache.flink.table.runtime.compression.BlockDecompressor; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Compressed block channel reader provides a scenario where MemorySegment must be maintained. + */ +public class CompressedBlockChannelReader + implements BlockChannelReader, RequestDoneCallback, BufferRecycler { + + private final LinkedBlockingQueue blockQueue; + private final boolean copyCompress; + private final BlockDecompressor decompressor; + private final BufferFileReader reader; + private final AtomicReference cause; + private final LinkedBlockingQueue retBuffers = new LinkedBlockingQueue<>(); + + private byte[] buf; + private ByteBuffer bufWrapper; + private int offset; + private int len; + + public CompressedBlockChannelReader( + IOManager ioManager, + ID channel, + LinkedBlockingQueue blockQueue, + BlockCompressionFactory codecFactory, + int preferBlockSize, + int segmentSize) throws IOException { + this.reader = ioManager.createBufferFileReader(channel, this); + this.blockQueue = blockQueue; + copyCompress = preferBlockSize > segmentSize * 2; + int blockSize = copyCompress ? preferBlockSize : segmentSize; + this.decompressor = codecFactory.getDecompressor(); + cause = new AtomicReference<>(); + + if (copyCompress) { + this.buf = new byte[blockSize]; + this.bufWrapper = ByteBuffer.wrap(buf); + } + + BlockCompressor compressor = codecFactory.getCompressor(); + for (int i = 0; i < 2; i++) { + MemorySegment segment = MemorySegmentFactory.wrap(new byte[compressor.getMaxCompressedSize(blockSize)]); + reader.readInto(new NetworkBuffer(segment, this)); + } + } + + @Override + public void readBlock(MemorySegment segment) throws IOException { + if (cause.get() != null) { + throw cause.get(); + } + + if (copyCompress) { + int readOffset = 0; + int readLen = segment.size(); + + while (readLen > 0) { + int copy = Math.min(readLen, len - offset); + if (copy == 0) { + readBuffer(); + } else { + segment.put(readOffset, buf, offset, copy); +
[GitHub] [flink] KurtYoung merged pull request #7944: [FLINK-11863][table-runtime-blink] Introduce channel to read and write compressed data
KurtYoung merged pull request #7944: [FLINK-11863][table-runtime-blink] Introduce channel to read and write compressed data URL: https://github.com/apache/flink/pull/7944 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] KurtYoung commented on issue #7944: [FLINK-11863][table-runtime-blink] Introduce channel to read and write compressed data
KurtYoung commented on issue #7944: [FLINK-11863][table-runtime-blink] Introduce channel to read and write compressed data URL: https://github.com/apache/flink/pull/7944#issuecomment-471146491 Looks like we also need to some more refactoring after the merge. I have created https://issues.apache.org/jira/browse/FLINK-11864 to track the effort. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-11864) Let compressed channel reader/writer reuse the logic of AsynchronousFileIOChannel
Kurt Young created FLINK-11864: -- Summary: Let compressed channel reader/writer reuse the logic of AsynchronousFileIOChannel Key: FLINK-11864 URL: https://issues.apache.org/jira/browse/FLINK-11864 Project: Flink Issue Type: Improvement Reporter: Kurt Young This is a follow up issue of [Flink-11863|https://issues.apache.org/jira/browse/FLINK-11863]. The introduced `CompressedBlockChannelReader` and `CompressedBlockChannelWriter` should reuse the logic of `AsynchronousFileIOChannel` by extending from it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] link3280 commented on a change in pull request #7895: [FLINK-11126][YARN][security] Filter out AMRMToken in TaskManager‘s credentials
link3280 commented on a change in pull request #7895: [FLINK-11126][YARN][security] Filter out AMRMToken in TaskManager‘s credentials URL: https://github.com/apache/flink/pull/7895#discussion_r263984640 ## File path: flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java ## @@ -58,4 +87,84 @@ public void testDeleteApplicationFiles() throws Exception { assertThat(files.count(), equalTo(0L)); } } + + @Test + public void testCreateTaskExecutorContext() throws Exception { Review comment: Yeah, I think it's sort of complicated as a unit test and does not have good separation of concerns. Actually, the test is extended from a test in legacy mode that only verifies the local resources of TaskManagers. It's a good idea to refactor it in a util method and make it available in `flink-yarn-tests`, but I still want to to have a test for `Utils.createTaskExecutorContext`, so I prefer to separate the test into two and add an extra util method to verify tokens in `flink-yarn-tests`. What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] link3280 commented on a change in pull request #7895: [FLINK-11126][YARN][security] Filter out AMRMToken in TaskManager‘s credentials
link3280 commented on a change in pull request #7895: [FLINK-11126][YARN][security] Filter out AMRMToken in TaskManager‘s credentials URL: https://github.com/apache/flink/pull/7895#discussion_r263984640 ## File path: flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java ## @@ -58,4 +87,84 @@ public void testDeleteApplicationFiles() throws Exception { assertThat(files.count(), equalTo(0L)); } } + + @Test + public void testCreateTaskExecutorContext() throws Exception { Review comment: Yeah, I think it's sort of complicated as a unit test and does not have good separation of concerns. Actually, the test is extended from a test in legacy mode that only verifies the local resources of TaskManagers. It's a good idea to refactor it in a util method and make it available in `flink-yarn-tests`, but I still want to to have a test for `Utils.createTaskExecutorContext`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] link3280 commented on a change in pull request #7895: [FLINK-11126][YARN][security] Filter out AMRMToken in TaskManager‘s credentials
link3280 commented on a change in pull request #7895: [FLINK-11126][YARN][security] Filter out AMRMToken in TaskManager‘s credentials URL: https://github.com/apache/flink/pull/7895#discussion_r263984640 ## File path: flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java ## @@ -58,4 +87,84 @@ public void testDeleteApplicationFiles() throws Exception { assertThat(files.count(), equalTo(0L)); } } + + @Test + public void testCreateTaskExecutorContext() throws Exception { Review comment: Yeah, I think it's sort of complicated as a unit test and does not have good separation of concerns. Actually, the test is extended from a test in legacy mode that only verifies the local resources of TaskManagers. It's a good idea to refactor it in a util method and move it to `flink-yarn-tests`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] KurtYoung commented on issue #7944: [FLINK-11863][table-runtime-blink] Introduce channel to read and write compressed data
KurtYoung commented on issue #7944: [FLINK-11863][table-runtime-blink] Introduce channel to read and write compressed data URL: https://github.com/apache/flink/pull/7944#issuecomment-471141399 @JingsongLi Thanks for the reviewing, i have addressed some of your comments. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] link3280 commented on a change in pull request #7895: [FLINK-11126][YARN][security] Filter out AMRMToken in TaskManager‘s credentials
link3280 commented on a change in pull request #7895: [FLINK-11126][YARN][security] Filter out AMRMToken in TaskManager‘s credentials URL: https://github.com/apache/flink/pull/7895#discussion_r263984332 ## File path: flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java ## @@ -565,7 +565,20 @@ static ContainerLaunchContext createTaskExecutorContext( new File(fileLocation), HadoopUtils.getHadoopConfiguration(flinkConfig)); - cred.writeTokenStorageToStream(dob); + // Filter out AMRMToken before setting the tokens to the TaskManager container context. + Method getAllTokensMethod = Credentials.class.getMethod("getAllTokens"); + Credentials taskManagerCred = new Credentials(); + final Text amRmTokenKind = new Text("YARN_AM_RM_TOKEN"); Review comment: Good point, I will make it as a `static final` member. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] link3280 commented on a change in pull request #7895: [FLINK-11126][YARN][security] Filter out AMRMToken in TaskManager‘s credentials
link3280 commented on a change in pull request #7895: [FLINK-11126][YARN][security] Filter out AMRMToken in TaskManager‘s credentials URL: https://github.com/apache/flink/pull/7895#discussion_r263984234 ## File path: flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java ## @@ -18,25 +18,54 @@ package org.apache.flink.yarn; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.testutils.CommonTestUtils; +import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; import org.apache.flink.util.TestLogger; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.File; import java.nio.file.Files; import java.nio.file.Path; +import java.util.Collection; import java.util.Collections; +import java.util.HashMap; +import java.util.Map; import java.util.stream.Stream; import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; Review comment: @klion26 Thanks for the input. Although I still think the mock should be relatively safe here, I will replace it with some real instances as you suggested. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on a change in pull request #7944: [FLINK-11863][table-runtime-blink] Introduce channel to read and write compressed data
JingsongLi commented on a change in pull request #7944: [FLINK-11863][table-runtime-blink] Introduce channel to read and write compressed data URL: https://github.com/apache/flink/pull/7944#discussion_r263981676 ## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/io/CompressedHeaderlessChannelReaderInputView.java ## @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.io; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.runtime.io.disk.iomanager.AbstractChannelReaderInputView; +import org.apache.flink.runtime.io.disk.iomanager.BufferFileReader; +import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.io.disk.iomanager.RequestDoneCallback; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.BufferRecycler; +import org.apache.flink.runtime.io.network.buffer.NetworkBuffer; +import org.apache.flink.table.runtime.compression.BlockCompressionFactory; +import org.apache.flink.table.runtime.compression.BlockCompressor; +import org.apache.flink.table.runtime.compression.BlockDecompressor; + +import java.io.EOFException; +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +/** + * + */ +public class CompressedHeaderlessChannelReaderInputView + extends AbstractChannelReaderInputView + implements RequestDoneCallback, BufferRecycler { + + private final BlockDecompressor decompressor; + private final BufferFileReader reader; + private final MemorySegment uncompressedBuffer; + private final AtomicReference cause; + + private final LinkedBlockingQueue retBuffers = new LinkedBlockingQueue<>(); + + private int numBlocksRemaining; + private int currentSegmentLimit; + + public CompressedHeaderlessChannelReaderInputView( + FileIOChannel.ID id, + IOManager ioManager, + BlockCompressionFactory compressionCodecFactory, + int compressionBlockSize, + int numBlocks) throws IOException { + super(0); + this.numBlocksRemaining = numBlocks; + this.reader = ioManager.createBufferFileReader(id, this); + uncompressedBuffer = MemorySegmentFactory.wrap(new byte[compressionBlockSize]); + decompressor = compressionCodecFactory.getDecompressor(); + cause = new AtomicReference<>(); + + BlockCompressor compressor = compressionCodecFactory.getCompressor(); + for (int i = 0; i < 2; i++) { + MemorySegment segment = MemorySegmentFactory.wrap(new byte[compressor.getMaxCompressedSize( + compressionBlockSize)]); + reader.readInto(new NetworkBuffer(segment, this)); + } + } + + @Override + protected MemorySegment nextSegment(MemorySegment current) throws IOException { + if (cause.get() != null) { + throw cause.get(); + } + + // check for end-of-stream + if (this.numBlocksRemaining <= 0) { + this.reader.close(); + throw new EOFException(); + } + + try { + Buffer buffer; + while ((buffer = retBuffers.poll(1, TimeUnit.SECONDS)) == null) { + if (cause.get() != null) { Review comment: every loop should check closed too? same to AsynchronousBlockReader.getNextReturnedBlock This is an automated message from the Apache Git Service. To respond to the
[GitHub] [flink] JingsongLi commented on a change in pull request #7944: [FLINK-11863][table-runtime-blink] Introduce channel to read and write compressed data
JingsongLi commented on a change in pull request #7944: [FLINK-11863][table-runtime-blink] Introduce channel to read and write compressed data URL: https://github.com/apache/flink/pull/7944#discussion_r263981141 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/memory/AbstractPagedInputView.java ## @@ -160,6 +160,11 @@ protected final void advance() throws IOException { this.positionInSegment = this.headerLength; } + /** +* This method will run after advance(). Override it if you want to use it. +*/ + protected void afterAdvance() {} Review comment: Is afterAdvance() just to support setting initial offset of InputView? It's a little too heavy to add an method afterAdvance(). Consider override advance() in the HeaderlessChannelReaderInputView and set positionInSegment the first time? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on a change in pull request #7944: [FLINK-11863][table-runtime-blink] Introduce channel to read and write compressed data
JingsongLi commented on a change in pull request #7944: [FLINK-11863][table-runtime-blink] Introduce channel to read and write compressed data URL: https://github.com/apache/flink/pull/7944#discussion_r263981754 ## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/FileChannelUtil.java ## @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.util; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.io.disk.iomanager.AbstractChannelReaderInputView; +import org.apache.flink.runtime.io.disk.iomanager.AbstractChannelWriterOutputView; +import org.apache.flink.runtime.io.disk.iomanager.BlockChannelReader; +import org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter; +import org.apache.flink.runtime.io.disk.iomanager.BufferFileWriter; +import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel; +import org.apache.flink.runtime.io.disk.iomanager.HeaderlessChannelReaderInputView; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.table.runtime.compression.BlockCompressionFactory; +import org.apache.flink.table.runtime.io.ChannelWithMeta; +import org.apache.flink.table.runtime.io.CompressedBlockChannelReader; +import org.apache.flink.table.runtime.io.CompressedBlockChannelWriter; +import org.apache.flink.table.runtime.io.CompressedHeaderlessChannelReaderInputView; +import org.apache.flink.table.runtime.io.CompressedHeaderlessChannelWriterOutputView; +import org.apache.flink.table.runtime.io.HeaderlessChannelWriterOutputView; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.LinkedBlockingQueue; + +import static org.apache.flink.core.memory.MemorySegmentFactory.allocateUnpooledSegment; + +/** + * File channel util for runtime. + */ +public class FileChannelUtil { Review comment: some comment to method? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on a change in pull request #7944: [FLINK-11863][table-runtime-blink] Introduce channel to read and write compressed data
JingsongLi commented on a change in pull request #7944: [FLINK-11863][table-runtime-blink] Introduce channel to read and write compressed data URL: https://github.com/apache/flink/pull/7944#discussion_r263981558 ## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/io/CompressedHeaderlessChannelReaderInputView.java ## @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.io; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.runtime.io.disk.iomanager.AbstractChannelReaderInputView; +import org.apache.flink.runtime.io.disk.iomanager.BufferFileReader; +import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.io.disk.iomanager.RequestDoneCallback; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.BufferRecycler; +import org.apache.flink.runtime.io.network.buffer.NetworkBuffer; +import org.apache.flink.table.runtime.compression.BlockCompressionFactory; +import org.apache.flink.table.runtime.compression.BlockCompressor; +import org.apache.flink.table.runtime.compression.BlockDecompressor; + +import java.io.EOFException; +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Review comment: some comment This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on a change in pull request #7944: [FLINK-11863][table-runtime-blink] Introduce channel to read and write compressed data
JingsongLi commented on a change in pull request #7944: [FLINK-11863][table-runtime-blink] Introduce channel to read and write compressed data URL: https://github.com/apache/flink/pull/7944#discussion_r263981774 ## File path: flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/io/CompressedHeaderlessChannelTest.java ## @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.io; + +import org.apache.flink.runtime.io.disk.iomanager.BufferFileWriter; +import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; +import org.apache.flink.table.runtime.compression.BlockCompressionFactory; +import org.apache.flink.table.runtime.compression.Lz4BlockCompressionFactory; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.util.Random; + +import static org.junit.Assert.assertEquals; + +/** + * Tests for {@link CompressedHeaderlessChannelReaderInputView} and + * {@link CompressedHeaderlessChannelWriterOutputView}. + */ +public class CompressedHeaderlessChannelTest { Review comment: Test to CompressedBlockChannelReader and writer too? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on a change in pull request #7944: [FLINK-11863][table-runtime-blink] Introduce channel to read and write compressed data
JingsongLi commented on a change in pull request #7944: [FLINK-11863][table-runtime-blink] Introduce channel to read and write compressed data URL: https://github.com/apache/flink/pull/7944#discussion_r263981511 ## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/io/CompressedBlockChannelReader.java ## @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.io; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.runtime.io.disk.iomanager.BlockChannelReader; +import org.apache.flink.runtime.io.disk.iomanager.BufferFileReader; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.io.disk.iomanager.RequestDoneCallback; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.BufferRecycler; +import org.apache.flink.runtime.io.network.buffer.NetworkBuffer; +import org.apache.flink.table.runtime.compression.BlockCompressionFactory; +import org.apache.flink.table.runtime.compression.BlockCompressor; +import org.apache.flink.table.runtime.compression.BlockDecompressor; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Compressed block channel reader provides a scenario where MemorySegment must be maintained. + */ +public class CompressedBlockChannelReader implements BlockChannelReader, + RequestDoneCallback, + BufferRecycler { + + private final LinkedBlockingQueue blockQueue; + + private final boolean copyCompress; + private final BlockDecompressor decompressor; + private final BufferFileReader reader; + private final AtomicReference cause; + + private final LinkedBlockingQueue retBuffers = new LinkedBlockingQueue<>(); + + private byte[] buf; + private ByteBuffer bufWrapper; + private int offset; + private int len; + + public CompressedBlockChannelReader( + IOManager ioManager, ID channel, + LinkedBlockingQueue blockQueue, + BlockCompressionFactory codecFactory, int preferBlockSize, int segmentSize) throws IOException { + this.reader = ioManager.createBufferFileReader(channel, this); + this.blockQueue = blockQueue; + copyCompress = preferBlockSize > segmentSize * 2; + int blockSize = copyCompress ? preferBlockSize : segmentSize; + this.decompressor = codecFactory.getDecompressor(); + cause = new AtomicReference<>(); + + if (copyCompress) { + this.buf = new byte[blockSize]; + this.bufWrapper = ByteBuffer.wrap(buf); + } + + BlockCompressor compressor = codecFactory.getCompressor(); + for (int i = 0; i < 2; i++) { + MemorySegment segment = MemorySegmentFactory.wrap(new byte[compressor.getMaxCompressedSize(blockSize)]); + reader.readInto(new NetworkBuffer(segment, this)); + } + } + + @Override + public void readBlock(MemorySegment segment) throws IOException { + if (cause.get() != null) { + throw cause.get(); + } + + if (copyCompress) { + int readOffset = 0; + int readLen = segment.size(); + + while (readLen > 0) { + int copy = Math.min(readLen, len - offset); + if (copy == 0) { + readBuffer(); + } else { + segment.put(readOffset, buf, offset, copy); + offset += copy; +
[GitHub] [flink] JingsongLi commented on a change in pull request #7944: [FLINK-11863][table-runtime-blink] Introduce channel to read and write compressed data
JingsongLi commented on a change in pull request #7944: [FLINK-11863][table-runtime-blink] Introduce channel to read and write compressed data URL: https://github.com/apache/flink/pull/7944#discussion_r263981370 ## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/io/CompressedBlockChannelReader.java ## @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.io; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.runtime.io.disk.iomanager.BlockChannelReader; +import org.apache.flink.runtime.io.disk.iomanager.BufferFileReader; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.io.disk.iomanager.RequestDoneCallback; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.BufferRecycler; +import org.apache.flink.runtime.io.network.buffer.NetworkBuffer; +import org.apache.flink.table.runtime.compression.BlockCompressionFactory; +import org.apache.flink.table.runtime.compression.BlockCompressor; +import org.apache.flink.table.runtime.compression.BlockDecompressor; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Compressed block channel reader provides a scenario where MemorySegment must be maintained. + */ +public class CompressedBlockChannelReader implements BlockChannelReader, + RequestDoneCallback, + BufferRecycler { + + private final LinkedBlockingQueue blockQueue; + + private final boolean copyCompress; + private final BlockDecompressor decompressor; + private final BufferFileReader reader; + private final AtomicReference cause; + + private final LinkedBlockingQueue retBuffers = new LinkedBlockingQueue<>(); + + private byte[] buf; + private ByteBuffer bufWrapper; + private int offset; + private int len; + + public CompressedBlockChannelReader( + IOManager ioManager, ID channel, + LinkedBlockingQueue blockQueue, + BlockCompressionFactory codecFactory, int preferBlockSize, int segmentSize) throws IOException { + this.reader = ioManager.createBufferFileReader(channel, this); + this.blockQueue = blockQueue; + copyCompress = preferBlockSize > segmentSize * 2; Review comment: You can add a comment to explain that in order to avoid multiple copies, only when preferBlockSize > segmentSize * 2 will new a buffer to compress. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on a change in pull request #7944: [FLINK-11863][table-runtime-blink] Introduce channel to read and write compressed data
JingsongLi commented on a change in pull request #7944: [FLINK-11863][table-runtime-blink] Introduce channel to read and write compressed data URL: https://github.com/apache/flink/pull/7944#discussion_r263981423 ## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/io/CompressedBlockChannelWriter.java ## @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.io; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter; +import org.apache.flink.runtime.io.disk.iomanager.BufferFileWriter; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.io.network.buffer.BufferRecycler; +import org.apache.flink.runtime.io.network.buffer.NetworkBuffer; +import org.apache.flink.table.runtime.compression.BlockCompressionFactory; +import org.apache.flink.table.runtime.compression.BlockCompressor; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +/** + * Compressed block channel writer provides a scenario where MemorySegment must be maintained. + */ +public class CompressedBlockChannelWriter implements BlockChannelWriter, + BufferRecycler { + + private final LinkedBlockingQueue blockQueue; + private final LinkedBlockingQueue compressedBuffers = new LinkedBlockingQueue<>(); + + private final BufferFileWriter writer; + private final boolean copyCompress; + private final BlockCompressor compressor; + + private byte[] buf; + private ByteBuffer bufWrapper; + private int count; + + public CompressedBlockChannelWriter( + IOManager ioManager, ID channel, + LinkedBlockingQueue blockQueue, + BlockCompressionFactory codecFactory, int preferBlockSize, int segmentSize) throws IOException { + this.writer = ioManager.createBufferFileWriter(channel); + this.blockQueue = blockQueue; + copyCompress = preferBlockSize > segmentSize * 2; Review comment: same to BlockReader This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #7944: [FLINK-11863][table-runtime-blink] Introduce channel to read and write compressed data
flinkbot commented on issue #7944: [FLINK-11863][table-runtime-blink] Introduce channel to read and write compressed data URL: https://github.com/apache/flink/pull/7944#issuecomment-471133713 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-11863) Introduce channel to read and write compressed data
[ https://issues.apache.org/jira/browse/FLINK-11863?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-11863: --- Labels: pull-request-available (was: ) > Introduce channel to read and write compressed data > --- > > Key: FLINK-11863 > URL: https://issues.apache.org/jira/browse/FLINK-11863 > Project: Flink > Issue Type: New Feature >Reporter: Kurt Young >Assignee: Kurt Young >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] KurtYoung commented on issue #7944: [FLINK-11863][table-runtime-blink] Introduce channel to read and write compressed data
KurtYoung commented on issue #7944: [FLINK-11863][table-runtime-blink] Introduce channel to read and write compressed data URL: https://github.com/apache/flink/pull/7944#issuecomment-471133695 cc @JingsongLi This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] KurtYoung opened a new pull request #7944: [FLINK-11863][table-runtime-blink] Introduce channel to read and write compressed data
KurtYoung opened a new pull request #7944: [FLINK-11863][table-runtime-blink] Introduce channel to read and write compressed data URL: https://github.com/apache/flink/pull/7944 ## What is the purpose of the change Introduce compressed channel reader/writer, and compressed headerless channel reader/writer based on our `BlockCompressionFactory` ## Brief change log ## Verifying this change This change added tests and can be verified as follows: - Added unit test ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-11539) Add TypeSerializerSnapshot for TraversableSerializer
[ https://issues.apache.org/jira/browse/FLINK-11539?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16788325#comment-16788325 ] Jürgen Kreileder commented on FLINK-11539: -- [~aljoscha] Does this come with an expected slow-down in stream graph building due to compilation? Jobs that took seconds to start with 1.7.2 take up to 15 minutes to start with 1.8-SNAPSHOT. All "Transforming ... " steps from StreamGraphGenerator seem to be slower, some taking up to 5 seconds now. > Add TypeSerializerSnapshot for TraversableSerializer > > > Key: FLINK-11539 > URL: https://issues.apache.org/jira/browse/FLINK-11539 > Project: Flink > Issue Type: Improvement > Components: API / Type Serialization System >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > Time Spent: 20m > Remaining Estimate: 0h > > This will replace the deprecated {{TypeSerializerConfigSnapshot}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11420) Serialization of case classes containing a Map[String, Any] sometimes throws ArrayIndexOutOfBoundsException
[ https://issues.apache.org/jira/browse/FLINK-11420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16788315#comment-16788315 ] Jürgen Kreileder commented on FLINK-11420: -- Yep, concurrent access: {code:java} 2019-03-08 21:11:55,674 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask - Cleanup AsyncCheckpointRunnable for checkpoint 9 of Generate Finding Commands 377823 (1/1). 2019-03-08 21:11:55,680 INFO org.apache.flink.runtime.taskmanager.Task - Generate Finding Commands 377823 (1/1) (8bed70dd3de95f9c0c82daa19679a4be) switched from RUNNING to FAILED. java.lang.IllegalStateException: Concurrent access to KryoSerializer. Thread 1: Generate Finding Commands 377823 (1/1) , Thread 2: pool-138-thread-1 at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.enterExclusiveThread(KryoSerializer.java:630) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:242) at org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:92) at org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32) at org.apache.flink.api.scala.typeutils.TraversableSerializer.$anonfun$copy$1(TraversableSerializer.scala:90) at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:230) at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:461) at org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:90) at org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:33) at org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:92) at org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32) at org.apache.flink.api.scala.typeutils.TraversableSerializer.$anonfun$copy$1(TraversableSerializer.scala:90) at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:230) at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:461) at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:461) at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:461) at org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:90) at org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:33) at org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:92) at org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32) at org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:92) at org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32) at org.apache.flink.streaming.api.datastream.CoGroupedStreams$UnionSerializer.copy(CoGroupedStreams.java:537) at org.apache.flink.streaming.api.datastream.CoGroupedStreams$UnionSerializer.copy(CoGroupedStreams.java:506) at org.apache.flink.api.common.typeutils.base.ListSerializer.copy(ListSerializer.java:99) at org.apache.flink.api.common.typeutils.base.ListSerializer.copy(ListSerializer.java:42) at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:297) at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:321) at org.apache.flink.runtime.state.heap.HeapListState.add(HeapListState.java:95) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:391) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) at java.lang.Thread.run(Thread.java:748) {code} The slow-down in job submission is unrelated. I think it's caused by FLINK-11539. > Serialization of case classes containing a Map[String, Any] sometimes throws > ArrayIndexOutOfBoundsException > --- > > Key: FLINK-11420 > URL: https://issues.apache.org/jira/browse/FLINK-11420 > Project: Flink > Issue Type: Bug > Components: API / Type Serialization System >Affects Versions: 1.7.1 >Reporter: Jürgen Kreileder >Assignee: Dawid
[jira] [Commented] (FLINK-11420) Serialization of case classes containing a Map[String, Any] sometimes throws ArrayIndexOutOfBoundsException
[ https://issues.apache.org/jira/browse/FLINK-11420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16788302#comment-16788302 ] Dawid Wysakowicz commented on FLINK-11420: -- I wouldn't expect any of the changes introduced to fix this issue to have impact on start up time. > Serialization of case classes containing a Map[String, Any] sometimes throws > ArrayIndexOutOfBoundsException > --- > > Key: FLINK-11420 > URL: https://issues.apache.org/jira/browse/FLINK-11420 > Project: Flink > Issue Type: Bug > Components: API / Type Serialization System >Affects Versions: 1.7.1 >Reporter: Jürgen Kreileder >Assignee: Dawid Wysakowicz >Priority: Blocker > Labels: pull-request-available > Fix For: 1.7.3, 1.8.0 > > Time Spent: 1h > Remaining Estimate: 0h > > We frequently run into random ArrayIndexOutOfBounds exceptions when flink > tries to serialize Scala case classes containing a Map[String, Any] (Any > being String, Long, Int, or Boolean) with the FsStateBackend. (This probably > happens with any case class containing a type requiring Kryo, see this thread > for instance: > [http://mail-archives.apache.org/mod_mbox/flink-user/201710.mbox/%3cCANNGFpjX4gjV=df6tlfeojsb_rhwxs_ruoylqcqv2gvwqtt...@mail.gmail.com%3e]) > Disabling asynchronous snapshots seems to work around the problem, so maybe > something is not thread-safe in CaseClassSerializer. > Our objects look like this: > {code} > case class Event(timestamp: Long, [...], content: Map[String, Any] > case class EnrichedEvent(event: Event, additionalInfo: Map[String, Any]) > {code} > I've looked at a few of the exceptions in a debugger. It always happens when > serializing the right-hand side a tuple from EnrichedEvent -> Event -> > content, e.g: 13 from ("foo", 13) or false from ("bar", false). > Stacktrace: > {code:java} > java.lang.ArrayIndexOutOfBoundsException: Index -1 out of bounds for length 0 > at com.esotericsoftware.kryo.util.IntArray.pop(IntArray.java:157) > at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:822) > at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:863) > at > org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:217) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32) > at > org.apache.flink.api.scala.typeutils.TraversableSerializer.$anonfun$copy$1(TraversableSerializer.scala:69) > at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:234) > at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:465) > at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:465) > at > org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:69) > at > org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:33) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32) > at > org.apache.flink.api.common.typeutils.base.ListSerializer.copy(ListSerializer.java:99) > at > org.apache.flink.api.common.typeutils.base.ListSerializer.copy(ListSerializer.java:42) > at > org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:287) > at > org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:311) > at > org.apache.flink.runtime.state.heap.HeapListState.add(HeapListState.java:95) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:391) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) > at java.base/java.lang.Thread.run(Thread.java:834){code} > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11654) Multiple transactional KafkaProducers writing to same cluster have clashing transaction IDs
[ https://issues.apache.org/jira/browse/FLINK-11654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16788296#comment-16788296 ] Jürgen Kreileder commented on FLINK-11654: -- I agree with with [~cslotterback] that it is a usable work-around but it's basically making a UUID out of a formerly job-local UID which isn't very obvious to users. > Multiple transactional KafkaProducers writing to same cluster have clashing > transaction IDs > --- > > Key: FLINK-11654 > URL: https://issues.apache.org/jira/browse/FLINK-11654 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.7.1 >Reporter: Jürgen Kreileder >Priority: Major > Fix For: 1.9.0 > > > We run multiple jobs on a cluster which write a lot to the same Kafka topic > from identically named sinks. When EXACTLY_ONCE semantic is enabled for the > KafkaProducers we run into a lot of ProducerFencedExceptions and all jobs go > into a restart cycle. > Example exception from the Kafka log: > > {code:java} > [2019-02-18 18:05:28,485] ERROR [ReplicaManager broker=1] Error processing > append operation on partition finding-commands-dev-1-0 > (kafka.server.ReplicaManager) > org.apache.kafka.common.errors.ProducerFencedException: Producer's epoch is > no longer valid. There is probably another producer with a newer epoch. 483 > (request epoch), 484 (server epoch) > {code} > The reason for this is the way FlinkKafkaProducer initializes the > TransactionalIdsGenerator: > The IDs are only guaranteed to be unique for a single Job. But they can clash > between different Jobs (and Clusters). > > > {code:java} > --- > a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java > +++ > b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java > @@ -819,6 +819,7 @@ public class FlinkKafkaProducer > nextTransactionalIdHintState = > context.getOperatorStateStore().getUnionListState( > NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR); > transactionalIdsGenerator = new TransactionalIdsGenerator( > + // the prefix probably should include job id and maybe cluster id > getRuntimeContext().getTaskName() + "-" + > ((StreamingRuntimeContext) getRuntimeContext()).getOperatorUniqueID(), > getRuntimeContext().getIndexOfThisSubtask(), > > getRuntimeContext().getNumberOfParallelSubtasks(),{code} > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11654) Multiple transactional KafkaProducers writing to same cluster have clashing transaction IDs
[ https://issues.apache.org/jira/browse/FLINK-11654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16788274#comment-16788274 ] Tim commented on FLINK-11654: - I did try setting UID to a random 32-character hex string, but it made no difference to the `transaction.id`. Maybe I did something wrong. I'm on Flink 1.7.1, and running on a single machine with 12 TMs (and 1 slot per TM). > Multiple transactional KafkaProducers writing to same cluster have clashing > transaction IDs > --- > > Key: FLINK-11654 > URL: https://issues.apache.org/jira/browse/FLINK-11654 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.7.1 >Reporter: Jürgen Kreileder >Priority: Major > Fix For: 1.9.0 > > > We run multiple jobs on a cluster which write a lot to the same Kafka topic > from identically named sinks. When EXACTLY_ONCE semantic is enabled for the > KafkaProducers we run into a lot of ProducerFencedExceptions and all jobs go > into a restart cycle. > Example exception from the Kafka log: > > {code:java} > [2019-02-18 18:05:28,485] ERROR [ReplicaManager broker=1] Error processing > append operation on partition finding-commands-dev-1-0 > (kafka.server.ReplicaManager) > org.apache.kafka.common.errors.ProducerFencedException: Producer's epoch is > no longer valid. There is probably another producer with a newer epoch. 483 > (request epoch), 484 (server epoch) > {code} > The reason for this is the way FlinkKafkaProducer initializes the > TransactionalIdsGenerator: > The IDs are only guaranteed to be unique for a single Job. But they can clash > between different Jobs (and Clusters). > > > {code:java} > --- > a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java > +++ > b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java > @@ -819,6 +819,7 @@ public class FlinkKafkaProducer > nextTransactionalIdHintState = > context.getOperatorStateStore().getUnionListState( > NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR); > transactionalIdsGenerator = new TransactionalIdsGenerator( > + // the prefix probably should include job id and maybe cluster id > getRuntimeContext().getTaskName() + "-" + > ((StreamingRuntimeContext) getRuntimeContext()).getOperatorUniqueID(), > getRuntimeContext().getIndexOfThisSubtask(), > > getRuntimeContext().getNumberOfParallelSubtasks(),{code} > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11420) Serialization of case classes containing a Map[String, Any] sometimes throws ArrayIndexOutOfBoundsException
[ https://issues.apache.org/jira/browse/FLINK-11420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16788272#comment-16788272 ] Jürgen Kreileder commented on FLINK-11420: -- Hi [~dawidwys], I'm just playing around a bit with 1.8 without restoring any state. I'll give DEBUG a try. Testing is a bit of a pain, though: The test jobs take around 10 minutes to actually start, it took only seconds with 1.7. The culprit seems to be the new compilation code TraversableSerializer. Is that a an expected phenomenon? > Serialization of case classes containing a Map[String, Any] sometimes throws > ArrayIndexOutOfBoundsException > --- > > Key: FLINK-11420 > URL: https://issues.apache.org/jira/browse/FLINK-11420 > Project: Flink > Issue Type: Bug > Components: API / Type Serialization System >Affects Versions: 1.7.1 >Reporter: Jürgen Kreileder >Assignee: Dawid Wysakowicz >Priority: Blocker > Labels: pull-request-available > Fix For: 1.7.3, 1.8.0 > > Time Spent: 1h > Remaining Estimate: 0h > > We frequently run into random ArrayIndexOutOfBounds exceptions when flink > tries to serialize Scala case classes containing a Map[String, Any] (Any > being String, Long, Int, or Boolean) with the FsStateBackend. (This probably > happens with any case class containing a type requiring Kryo, see this thread > for instance: > [http://mail-archives.apache.org/mod_mbox/flink-user/201710.mbox/%3cCANNGFpjX4gjV=df6tlfeojsb_rhwxs_ruoylqcqv2gvwqtt...@mail.gmail.com%3e]) > Disabling asynchronous snapshots seems to work around the problem, so maybe > something is not thread-safe in CaseClassSerializer. > Our objects look like this: > {code} > case class Event(timestamp: Long, [...], content: Map[String, Any] > case class EnrichedEvent(event: Event, additionalInfo: Map[String, Any]) > {code} > I've looked at a few of the exceptions in a debugger. It always happens when > serializing the right-hand side a tuple from EnrichedEvent -> Event -> > content, e.g: 13 from ("foo", 13) or false from ("bar", false). > Stacktrace: > {code:java} > java.lang.ArrayIndexOutOfBoundsException: Index -1 out of bounds for length 0 > at com.esotericsoftware.kryo.util.IntArray.pop(IntArray.java:157) > at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:822) > at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:863) > at > org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:217) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32) > at > org.apache.flink.api.scala.typeutils.TraversableSerializer.$anonfun$copy$1(TraversableSerializer.scala:69) > at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:234) > at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:465) > at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:465) > at > org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:69) > at > org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:33) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32) > at > org.apache.flink.api.common.typeutils.base.ListSerializer.copy(ListSerializer.java:99) > at > org.apache.flink.api.common.typeutils.base.ListSerializer.copy(ListSerializer.java:42) > at > org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:287) > at > org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:311) > at > org.apache.flink.runtime.state.heap.HeapListState.add(HeapListState.java:95) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:391) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) > at java.base/java.lang.Thread.run(Thread.java:834){code} > > > -- This message was sent by Atlassian JIRA
[jira] [Commented] (FLINK-11654) Multiple transactional KafkaProducers writing to same cluster have clashing transaction IDs
[ https://issues.apache.org/jira/browse/FLINK-11654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16788251#comment-16788251 ] Dawid Wysakowicz commented on FLINK-11654: -- Adding to making this setting mandatory, we recently introduced (in 1.8) a flag that forces to set uids for all operators in a job. See [FLINK-11653] > Multiple transactional KafkaProducers writing to same cluster have clashing > transaction IDs > --- > > Key: FLINK-11654 > URL: https://issues.apache.org/jira/browse/FLINK-11654 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.7.1 >Reporter: Jürgen Kreileder >Priority: Major > Fix For: 1.9.0 > > > We run multiple jobs on a cluster which write a lot to the same Kafka topic > from identically named sinks. When EXACTLY_ONCE semantic is enabled for the > KafkaProducers we run into a lot of ProducerFencedExceptions and all jobs go > into a restart cycle. > Example exception from the Kafka log: > > {code:java} > [2019-02-18 18:05:28,485] ERROR [ReplicaManager broker=1] Error processing > append operation on partition finding-commands-dev-1-0 > (kafka.server.ReplicaManager) > org.apache.kafka.common.errors.ProducerFencedException: Producer's epoch is > no longer valid. There is probably another producer with a newer epoch. 483 > (request epoch), 484 (server epoch) > {code} > The reason for this is the way FlinkKafkaProducer initializes the > TransactionalIdsGenerator: > The IDs are only guaranteed to be unique for a single Job. But they can clash > between different Jobs (and Clusters). > > > {code:java} > --- > a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java > +++ > b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java > @@ -819,6 +819,7 @@ public class FlinkKafkaProducer > nextTransactionalIdHintState = > context.getOperatorStateStore().getUnionListState( > NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR); > transactionalIdsGenerator = new TransactionalIdsGenerator( > + // the prefix probably should include job id and maybe cluster id > getRuntimeContext().getTaskName() + "-" + > ((StreamingRuntimeContext) getRuntimeContext()).getOperatorUniqueID(), > getRuntimeContext().getIndexOfThisSubtask(), > > getRuntimeContext().getNumberOfParallelSubtasks(),{code} > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11420) Serialization of case classes containing a Map[String, Any] sometimes throws ArrayIndexOutOfBoundsException
[ https://issues.apache.org/jira/browse/FLINK-11420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16788255#comment-16788255 ] Dawid Wysakowicz commented on FLINK-11420: -- Hi [~jkreileder] Have you tried enabling DEBUG logs? Have you seen exceptions claiming concurrent access to KryoSerializer? Another idea, are you trying to restore from checkpoint taken with a previous version? I just wonder if the checkpoint might be corrupted if it was taken when this bug was still present. [~srichter] What do you think? > Serialization of case classes containing a Map[String, Any] sometimes throws > ArrayIndexOutOfBoundsException > --- > > Key: FLINK-11420 > URL: https://issues.apache.org/jira/browse/FLINK-11420 > Project: Flink > Issue Type: Bug > Components: API / Type Serialization System >Affects Versions: 1.7.1 >Reporter: Jürgen Kreileder >Assignee: Dawid Wysakowicz >Priority: Blocker > Labels: pull-request-available > Fix For: 1.7.3, 1.8.0 > > Time Spent: 1h > Remaining Estimate: 0h > > We frequently run into random ArrayIndexOutOfBounds exceptions when flink > tries to serialize Scala case classes containing a Map[String, Any] (Any > being String, Long, Int, or Boolean) with the FsStateBackend. (This probably > happens with any case class containing a type requiring Kryo, see this thread > for instance: > [http://mail-archives.apache.org/mod_mbox/flink-user/201710.mbox/%3cCANNGFpjX4gjV=df6tlfeojsb_rhwxs_ruoylqcqv2gvwqtt...@mail.gmail.com%3e]) > Disabling asynchronous snapshots seems to work around the problem, so maybe > something is not thread-safe in CaseClassSerializer. > Our objects look like this: > {code} > case class Event(timestamp: Long, [...], content: Map[String, Any] > case class EnrichedEvent(event: Event, additionalInfo: Map[String, Any]) > {code} > I've looked at a few of the exceptions in a debugger. It always happens when > serializing the right-hand side a tuple from EnrichedEvent -> Event -> > content, e.g: 13 from ("foo", 13) or false from ("bar", false). > Stacktrace: > {code:java} > java.lang.ArrayIndexOutOfBoundsException: Index -1 out of bounds for length 0 > at com.esotericsoftware.kryo.util.IntArray.pop(IntArray.java:157) > at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:822) > at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:863) > at > org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:217) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32) > at > org.apache.flink.api.scala.typeutils.TraversableSerializer.$anonfun$copy$1(TraversableSerializer.scala:69) > at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:234) > at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:465) > at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:465) > at > org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:69) > at > org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:33) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32) > at > org.apache.flink.api.common.typeutils.base.ListSerializer.copy(ListSerializer.java:99) > at > org.apache.flink.api.common.typeutils.base.ListSerializer.copy(ListSerializer.java:42) > at > org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:287) > at > org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:311) > at > org.apache.flink.runtime.state.heap.HeapListState.add(HeapListState.java:95) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:391) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) > at java.base/java.lang.Thread.run(Thread.java:834){code} > > > -- This message was sent by Atlassian JIRA
[jira] [Comment Edited] (FLINK-11654) Multiple transactional KafkaProducers writing to same cluster have clashing transaction IDs
[ https://issues.apache.org/jira/browse/FLINK-11654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16788251#comment-16788251 ] Dawid Wysakowicz edited comment on FLINK-11654 at 3/8/19 8:04 PM: -- Adding to making this setting mandatory, we recently introduced (in 1.9) a flag that forces to set uids for all operators in a job. See [FLINK-11653] was (Author: dawidwys): Adding to making this setting mandatory, we recently introduced (in 1.8) a flag that forces to set uids for all operators in a job. See [FLINK-11653] > Multiple transactional KafkaProducers writing to same cluster have clashing > transaction IDs > --- > > Key: FLINK-11654 > URL: https://issues.apache.org/jira/browse/FLINK-11654 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.7.1 >Reporter: Jürgen Kreileder >Priority: Major > Fix For: 1.9.0 > > > We run multiple jobs on a cluster which write a lot to the same Kafka topic > from identically named sinks. When EXACTLY_ONCE semantic is enabled for the > KafkaProducers we run into a lot of ProducerFencedExceptions and all jobs go > into a restart cycle. > Example exception from the Kafka log: > > {code:java} > [2019-02-18 18:05:28,485] ERROR [ReplicaManager broker=1] Error processing > append operation on partition finding-commands-dev-1-0 > (kafka.server.ReplicaManager) > org.apache.kafka.common.errors.ProducerFencedException: Producer's epoch is > no longer valid. There is probably another producer with a newer epoch. 483 > (request epoch), 484 (server epoch) > {code} > The reason for this is the way FlinkKafkaProducer initializes the > TransactionalIdsGenerator: > The IDs are only guaranteed to be unique for a single Job. But they can clash > between different Jobs (and Clusters). > > > {code:java} > --- > a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java > +++ > b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java > @@ -819,6 +819,7 @@ public class FlinkKafkaProducer > nextTransactionalIdHintState = > context.getOperatorStateStore().getUnionListState( > NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR); > transactionalIdsGenerator = new TransactionalIdsGenerator( > + // the prefix probably should include job id and maybe cluster id > getRuntimeContext().getTaskName() + "-" + > ((StreamingRuntimeContext) getRuntimeContext()).getOperatorUniqueID(), > getRuntimeContext().getIndexOfThisSubtask(), > > getRuntimeContext().getNumberOfParallelSubtasks(),{code} > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-11653) Add configuration to enforce custom UID's on datastream
[ https://issues.apache.org/jira/browse/FLINK-11653?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dawid Wysakowicz closed FLINK-11653. Resolution: Fixed Fix Version/s: 1.9.0 Implemented in 9c329484f9e8da58b3f0ac4e48348ba3e781dd08 > Add configuration to enforce custom UID's on datastream > --- > > Key: FLINK-11653 > URL: https://issues.apache.org/jira/browse/FLINK-11653 > Project: Flink > Issue Type: Improvement > Components: API / DataStream >Reporter: Seth Wiesman >Assignee: Seth Wiesman >Priority: Minor > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 0.5h > Remaining Estimate: 0h > > Current best practice when deploying Flink applications to production is to > set a custom UID, using DataStream#uid, so jobs can resume from savepoints > even if they job graph has been modified. Flink should contain a > configuration that can allow users to fail submission if their program > contains an operator without a custom UID; enforcing best practices similarly > to #disableGenericTypes. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11654) Multiple transactional KafkaProducers writing to same cluster have clashing transaction IDs
[ https://issues.apache.org/jira/browse/FLINK-11654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16788250#comment-16788250 ] Dawid Wysakowicz commented on FLINK-11654: -- The workaround is to set different uids https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/state/savepoints.html#assigning-operator-ids for the kafka producers. I would be against introducing additional configuration parameter. This would introduce additional complexity not just in case of code, but also it would introduce additional corner cases while restoring state for an operator. What should happen in a user tries to restore state after changing the {{TransactionIdSeed}}. I think the operator id can serve this use cases very well. I agree though we should improve documentation around this topic. > Multiple transactional KafkaProducers writing to same cluster have clashing > transaction IDs > --- > > Key: FLINK-11654 > URL: https://issues.apache.org/jira/browse/FLINK-11654 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.7.1 >Reporter: Jürgen Kreileder >Priority: Major > Fix For: 1.9.0 > > > We run multiple jobs on a cluster which write a lot to the same Kafka topic > from identically named sinks. When EXACTLY_ONCE semantic is enabled for the > KafkaProducers we run into a lot of ProducerFencedExceptions and all jobs go > into a restart cycle. > Example exception from the Kafka log: > > {code:java} > [2019-02-18 18:05:28,485] ERROR [ReplicaManager broker=1] Error processing > append operation on partition finding-commands-dev-1-0 > (kafka.server.ReplicaManager) > org.apache.kafka.common.errors.ProducerFencedException: Producer's epoch is > no longer valid. There is probably another producer with a newer epoch. 483 > (request epoch), 484 (server epoch) > {code} > The reason for this is the way FlinkKafkaProducer initializes the > TransactionalIdsGenerator: > The IDs are only guaranteed to be unique for a single Job. But they can clash > between different Jobs (and Clusters). > > > {code:java} > --- > a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java > +++ > b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java > @@ -819,6 +819,7 @@ public class FlinkKafkaProducer > nextTransactionalIdHintState = > context.getOperatorStateStore().getUnionListState( > NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR); > transactionalIdsGenerator = new TransactionalIdsGenerator( > + // the prefix probably should include job id and maybe cluster id > getRuntimeContext().getTaskName() + "-" + > ((StreamingRuntimeContext) getRuntimeContext()).getOperatorUniqueID(), > getRuntimeContext().getIndexOfThisSubtask(), > > getRuntimeContext().getNumberOfParallelSubtasks(),{code} > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-11654) Multiple transactional KafkaProducers writing to same cluster have clashing transaction IDs
[ https://issues.apache.org/jira/browse/FLINK-11654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16788247#comment-16788247 ] Chris Slotterback edited comment on FLINK-11654 at 3/8/19 7:59 PM: --- Setting the {{UID}} does work in my testing to avoid transactionId collisions, as long as it remains unique across *every* producer writing to that topic. In a multi-datacenter configuration, I had collisions occurring across multiple flink clusters. I do think that if setting the {{UID}} operator (or a seed) is the the accepted workaround, it could certainly be more obvious or even set as required for the users to configure to use {{Semantics.EXACTLY_ONCE}} producers was (Author: cslotterback): I do think that if setting the {{UID}} operator (or a seed) is the the accepted workaround, it could certainly be more obvious or even set as required for the users to configure to use {{Semantics.EXACTLY_ONCE}} producers > Multiple transactional KafkaProducers writing to same cluster have clashing > transaction IDs > --- > > Key: FLINK-11654 > URL: https://issues.apache.org/jira/browse/FLINK-11654 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.7.1 >Reporter: Jürgen Kreileder >Priority: Major > Fix For: 1.9.0 > > > We run multiple jobs on a cluster which write a lot to the same Kafka topic > from identically named sinks. When EXACTLY_ONCE semantic is enabled for the > KafkaProducers we run into a lot of ProducerFencedExceptions and all jobs go > into a restart cycle. > Example exception from the Kafka log: > > {code:java} > [2019-02-18 18:05:28,485] ERROR [ReplicaManager broker=1] Error processing > append operation on partition finding-commands-dev-1-0 > (kafka.server.ReplicaManager) > org.apache.kafka.common.errors.ProducerFencedException: Producer's epoch is > no longer valid. There is probably another producer with a newer epoch. 483 > (request epoch), 484 (server epoch) > {code} > The reason for this is the way FlinkKafkaProducer initializes the > TransactionalIdsGenerator: > The IDs are only guaranteed to be unique for a single Job. But they can clash > between different Jobs (and Clusters). > > > {code:java} > --- > a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java > +++ > b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java > @@ -819,6 +819,7 @@ public class FlinkKafkaProducer > nextTransactionalIdHintState = > context.getOperatorStateStore().getUnionListState( > NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR); > transactionalIdsGenerator = new TransactionalIdsGenerator( > + // the prefix probably should include job id and maybe cluster id > getRuntimeContext().getTaskName() + "-" + > ((StreamingRuntimeContext) getRuntimeContext()).getOperatorUniqueID(), > getRuntimeContext().getIndexOfThisSubtask(), > > getRuntimeContext().getNumberOfParallelSubtasks(),{code} > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11654) Multiple transactional KafkaProducers writing to same cluster have clashing transaction IDs
[ https://issues.apache.org/jira/browse/FLINK-11654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16788247#comment-16788247 ] Chris Slotterback commented on FLINK-11654: --- I do think that if setting the {{UID}} operator (or a seed) is the the accepted workaround, it could certainly be more obvious or even set as required for the users to configure to use {{Semantics.EXACTLY_ONCE}} producers > Multiple transactional KafkaProducers writing to same cluster have clashing > transaction IDs > --- > > Key: FLINK-11654 > URL: https://issues.apache.org/jira/browse/FLINK-11654 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.7.1 >Reporter: Jürgen Kreileder >Priority: Major > Fix For: 1.9.0 > > > We run multiple jobs on a cluster which write a lot to the same Kafka topic > from identically named sinks. When EXACTLY_ONCE semantic is enabled for the > KafkaProducers we run into a lot of ProducerFencedExceptions and all jobs go > into a restart cycle. > Example exception from the Kafka log: > > {code:java} > [2019-02-18 18:05:28,485] ERROR [ReplicaManager broker=1] Error processing > append operation on partition finding-commands-dev-1-0 > (kafka.server.ReplicaManager) > org.apache.kafka.common.errors.ProducerFencedException: Producer's epoch is > no longer valid. There is probably another producer with a newer epoch. 483 > (request epoch), 484 (server epoch) > {code} > The reason for this is the way FlinkKafkaProducer initializes the > TransactionalIdsGenerator: > The IDs are only guaranteed to be unique for a single Job. But they can clash > between different Jobs (and Clusters). > > > {code:java} > --- > a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java > +++ > b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java > @@ -819,6 +819,7 @@ public class FlinkKafkaProducer > nextTransactionalIdHintState = > context.getOperatorStateStore().getUnionListState( > NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR); > transactionalIdsGenerator = new TransactionalIdsGenerator( > + // the prefix probably should include job id and maybe cluster id > getRuntimeContext().getTaskName() + "-" + > ((StreamingRuntimeContext) getRuntimeContext()).getOperatorUniqueID(), > getRuntimeContext().getIndexOfThisSubtask(), > > getRuntimeContext().getNumberOfParallelSubtasks(),{code} > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11654) Multiple transactional KafkaProducers writing to same cluster have clashing transaction IDs
[ https://issues.apache.org/jira/browse/FLINK-11654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16788239#comment-16788239 ] Tim commented on FLINK-11654: - It is not clear to me what the _recommended_ workaround is. Is it to name the task so that different jobs will end up with different transaction IDs? FWIW - I think the TransactionIdSeed is a good idea in that it makes it clear to the user that a token is used to uniquely identify the pool of KafkaProducers. I have not thought about if and how that would impact recovery and savepoints though. > Multiple transactional KafkaProducers writing to same cluster have clashing > transaction IDs > --- > > Key: FLINK-11654 > URL: https://issues.apache.org/jira/browse/FLINK-11654 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.7.1 >Reporter: Jürgen Kreileder >Priority: Major > Fix For: 1.9.0 > > > We run multiple jobs on a cluster which write a lot to the same Kafka topic > from identically named sinks. When EXACTLY_ONCE semantic is enabled for the > KafkaProducers we run into a lot of ProducerFencedExceptions and all jobs go > into a restart cycle. > Example exception from the Kafka log: > > {code:java} > [2019-02-18 18:05:28,485] ERROR [ReplicaManager broker=1] Error processing > append operation on partition finding-commands-dev-1-0 > (kafka.server.ReplicaManager) > org.apache.kafka.common.errors.ProducerFencedException: Producer's epoch is > no longer valid. There is probably another producer with a newer epoch. 483 > (request epoch), 484 (server epoch) > {code} > The reason for this is the way FlinkKafkaProducer initializes the > TransactionalIdsGenerator: > The IDs are only guaranteed to be unique for a single Job. But they can clash > between different Jobs (and Clusters). > > > {code:java} > --- > a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java > +++ > b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java > @@ -819,6 +819,7 @@ public class FlinkKafkaProducer > nextTransactionalIdHintState = > context.getOperatorStateStore().getUnionListState( > NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR); > transactionalIdsGenerator = new TransactionalIdsGenerator( > + // the prefix probably should include job id and maybe cluster id > getRuntimeContext().getTaskName() + "-" + > ((StreamingRuntimeContext) getRuntimeContext()).getOperatorUniqueID(), > getRuntimeContext().getIndexOfThisSubtask(), > > getRuntimeContext().getNumberOfParallelSubtasks(),{code} > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-11654) Multiple transactional KafkaProducers writing to same cluster have clashing transaction IDs
[ https://issues.apache.org/jira/browse/FLINK-11654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16788239#comment-16788239 ] Tim edited comment on FLINK-11654 at 3/8/19 7:40 PM: - It is not clear to me what the _recommended_ workaround is. Is it to name the task so that different jobs will end up with different transaction IDs? FWIW - I think the TransactionIdSeed is a good idea in that it makes it clear to the user that a token is used to uniquely identify the pool of KafkaProducers. I have not thought about if and how that would impact recovery and savepoints though. Also, adding my findings as well. http://mail-archives.apache.org/mod_mbox/flink-user/201903.mbox/%3C9E3F033F-0CB0-4659-A487-39BB728C4F01%40comcast.com%3E was (Author: victtim): It is not clear to me what the _recommended_ workaround is. Is it to name the task so that different jobs will end up with different transaction IDs? FWIW - I think the TransactionIdSeed is a good idea in that it makes it clear to the user that a token is used to uniquely identify the pool of KafkaProducers. I have not thought about if and how that would impact recovery and savepoints though. > Multiple transactional KafkaProducers writing to same cluster have clashing > transaction IDs > --- > > Key: FLINK-11654 > URL: https://issues.apache.org/jira/browse/FLINK-11654 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.7.1 >Reporter: Jürgen Kreileder >Priority: Major > Fix For: 1.9.0 > > > We run multiple jobs on a cluster which write a lot to the same Kafka topic > from identically named sinks. When EXACTLY_ONCE semantic is enabled for the > KafkaProducers we run into a lot of ProducerFencedExceptions and all jobs go > into a restart cycle. > Example exception from the Kafka log: > > {code:java} > [2019-02-18 18:05:28,485] ERROR [ReplicaManager broker=1] Error processing > append operation on partition finding-commands-dev-1-0 > (kafka.server.ReplicaManager) > org.apache.kafka.common.errors.ProducerFencedException: Producer's epoch is > no longer valid. There is probably another producer with a newer epoch. 483 > (request epoch), 484 (server epoch) > {code} > The reason for this is the way FlinkKafkaProducer initializes the > TransactionalIdsGenerator: > The IDs are only guaranteed to be unique for a single Job. But they can clash > between different Jobs (and Clusters). > > > {code:java} > --- > a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java > +++ > b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java > @@ -819,6 +819,7 @@ public class FlinkKafkaProducer > nextTransactionalIdHintState = > context.getOperatorStateStore().getUnionListState( > NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR); > transactionalIdsGenerator = new TransactionalIdsGenerator( > + // the prefix probably should include job id and maybe cluster id > getRuntimeContext().getTaskName() + "-" + > ((StreamingRuntimeContext) getRuntimeContext()).getOperatorUniqueID(), > getRuntimeContext().getIndexOfThisSubtask(), > > getRuntimeContext().getNumberOfParallelSubtasks(),{code} > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-5479) Per-partition watermarks in FlinkKafkaConsumer should consider idle partitions
[ https://issues.apache.org/jira/browse/FLINK-5479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16787991#comment-16787991 ] Padarn Wilson commented on FLINK-5479: -- Hi all - Is this still an open issue that anyone is working on? Was thinking of taking a crack at it, but thought I'd check here first. > Per-partition watermarks in FlinkKafkaConsumer should consider idle partitions > -- > > Key: FLINK-5479 > URL: https://issues.apache.org/jira/browse/FLINK-5479 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka >Reporter: Tzu-Li (Gordon) Tai >Priority: Major > Fix For: 1.8.0 > > > Reported in ML: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-topic-partition-skewness-causes-watermark-not-being-emitted-td11008.html > Similar to what's happening to idle sources blocking watermark progression in > downstream operators (see FLINK-5017), the per-partition watermark mechanism > in {{FlinkKafkaConsumer}} is also being blocked of progressing watermarks > when a partition is idle. The watermark of idle partitions is always > {{Long.MIN_VALUE}}, therefore the overall min watermark across all partitions > of a consumer subtask will never proceed. > It's normally not a common case to have Kafka partitions not producing any > data, but it'll probably be good to handle this as well. I think we should > have a localized solution similar to FLINK-5017 for the per-partition > watermarks in {{AbstractFetcher}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] Xeli commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
Xeli commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r263821841 ## File path: flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSubscriberFactoryForEmulator.java ## @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.gcp.pubsub; + +import org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubSubscriberFactory; + +import com.google.api.gax.core.NoCredentialsProvider; +import com.google.api.gax.grpc.GrpcTransportChannel; +import com.google.api.gax.rpc.FixedTransportChannelProvider; +import com.google.auth.Credentials; +import com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub; +import com.google.cloud.pubsub.v1.stub.SubscriberStub; +import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings; +import io.grpc.ManagedChannel; +import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder; +import io.grpc.netty.shaded.io.netty.channel.EventLoopGroup; + +import java.io.IOException; + +/** + * A convenience PubSubSubscriberFactory that can be used to connect to a PubSub emulator. + * The PubSub emulators do not support SSL or Credentials and as such this SubscriberStub does not require or provide this. + */ +public class PubSubSubscriberFactoryForEmulator implements PubSubSubscriberFactory { Review comment: Do you mean `flink-test-utils` ? That sounds like a good idea. What do I need to change to make sure it ends up there? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Reopened] (FLINK-11420) Serialization of case classes containing a Map[String, Any] sometimes throws ArrayIndexOutOfBoundsException
[ https://issues.apache.org/jira/browse/FLINK-11420?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jürgen Kreileder reopened FLINK-11420: -- Just made a test run with 1.8-SNAPSHOT (commit 9d87fe1) and I've seen a few of those exceptions again: {code:java} java.lang.ArrayIndexOutOfBoundsException: -1 at com.esotericsoftware.kryo.util.IntArray.pop(IntArray.java:157) at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:822) at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:863) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:248) at org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:92) at org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32) at org.apache.flink.api.scala.typeutils.TraversableSerializer.$anonfun$copy$1(TraversableSerializer.scala:90) at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:230) at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:461) at org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:90) at org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:33) at org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:92) at org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32) at org.apache.flink.api.scala.typeutils.TraversableSerializer.$anonfun$copy$1(TraversableSerializer.scala:90) at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:230) at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:461) at org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:90) at org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:33) at org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:92) at org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32) at org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:92) at org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32) at org.apache.flink.streaming.api.datastream.CoGroupedStreams$UnionSerializer.copy(CoGroupedStreams.java:537) at org.apache.flink.streaming.api.datastream.CoGroupedStreams$UnionSerializer.copy(CoGroupedStreams.java:506) at org.apache.flink.api.common.typeutils.base.ListSerializer.copy(ListSerializer.java:99) at org.apache.flink.api.common.typeutils.base.ListSerializer.copy(ListSerializer.java:42) at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:297) at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:321) at org.apache.flink.runtime.state.heap.HeapListState.add(HeapListState.java:95) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:391) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) at java.lang.Thread.run(Thread.java:748) {code} > Serialization of case classes containing a Map[String, Any] sometimes throws > ArrayIndexOutOfBoundsException > --- > > Key: FLINK-11420 > URL: https://issues.apache.org/jira/browse/FLINK-11420 > Project: Flink > Issue Type: Bug > Components: API / Type Serialization System >Affects Versions: 1.7.1 >Reporter: Jürgen Kreileder >Assignee: Dawid Wysakowicz >Priority: Blocker > Labels: pull-request-available > Fix For: 1.7.3, 1.8.0 > > Time Spent: 1h > Remaining Estimate: 0h > > We frequently run into random ArrayIndexOutOfBounds exceptions when flink > tries to serialize Scala case classes containing a Map[String, Any] (Any > being String, Long, Int, or Boolean) with the FsStateBackend. (This probably > happens with any case class containing a type requiring Kryo, see this thread > for instance: >
[jira] [Created] (FLINK-11863) Introduce channel to read and write compressed data
Kurt Young created FLINK-11863: -- Summary: Introduce channel to read and write compressed data Key: FLINK-11863 URL: https://issues.apache.org/jira/browse/FLINK-11863 Project: Flink Issue Type: New Feature Reporter: Kurt Young Assignee: Kurt Young -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11861) JobMasterTriggerSavepointIT case is not executed
[ https://issues.apache.org/jira/browse/FLINK-11861?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-11861: --- Labels: pull-request-available (was: ) > JobMasterTriggerSavepointIT case is not executed > > > Key: FLINK-11861 > URL: https://issues.apache.org/jira/browse/FLINK-11861 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.7.2 >Reporter: shuai.xu >Assignee: TisonKun >Priority: Major > Labels: pull-request-available > > The > [JobMasterTriggerSavepointIT|https://github.com/apache/flink/blob/master/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointIT.java] > will not be executed as the case name does not follow the style ***ITCase. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] flinkbot commented on issue #7943: [FLINK-11861][tests] Fix JobMasterTriggerSavepointIT not executed
flinkbot commented on issue #7943: [FLINK-11861][tests] Fix JobMasterTriggerSavepointIT not executed URL: https://github.com/apache/flink/pull/7943#issuecomment-470903120 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] TisonKun opened a new pull request #7943: [FLINK-11861][tests] Fix JobMasterTriggerSavepointIT not executed
TisonKun opened a new pull request #7943: [FLINK-11861][tests] Fix JobMasterTriggerSavepointIT not executed URL: https://github.com/apache/flink/pull/7943 ## What is the purpose of the change Reported by @shuai-xu , `JobMasterTriggerSavepointIT` doesn't follow the pattern "*ITCase.*" and thus is not executed in `tests` profile(it should be). Rename `JobMasterTriggerSavepointIT` to `JobMasterTriggerSavepointITCase` should resolve this issue. ``` integration-tests integration-test test **/*ITCase.* false ``` ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector:(no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) cc @tillrohrmann This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Assigned] (FLINK-11861) JobMasterTriggerSavepointIT case is not executed
[ https://issues.apache.org/jira/browse/FLINK-11861?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] TisonKun reassigned FLINK-11861: Assignee: TisonKun > JobMasterTriggerSavepointIT case is not executed > > > Key: FLINK-11861 > URL: https://issues.apache.org/jira/browse/FLINK-11861 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.7.2 >Reporter: shuai.xu >Assignee: TisonKun >Priority: Major > > The > [JobMasterTriggerSavepointIT|https://github.com/apache/flink/blob/master/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointIT.java] > will not be executed as the case name does not follow the style ***ITCase. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] zhijiangW commented on a change in pull request #7713: [FLINK-10995][network] Copy intermediate serialization results only once for broadcast mode
zhijiangW commented on a change in pull request #7713: [FLINK-10995][network] Copy intermediate serialization results only once for broadcast mode URL: https://github.com/apache/flink/pull/7713#discussion_r263749333 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/BroadcastRecordWriter.java ## @@ -44,4 +52,67 @@ public BroadcastRecordWriter( public void emit(T record) throws IOException, InterruptedException { broadcastEmit(record); } + + @Override + public void broadcastEmit(T record) throws IOException, InterruptedException { Review comment: Yes, you are right. We do not need hardcode the size of latency marker. :) I have submitted the codes in a separate commit. I am not sure whether this change is suitable based on our comments, so I only refactored the main processes and ignored the tests with annotations. After your review and confirm, I would refactor the commits and tests. I think it is better to explain some thoughts below: In order to reuse the main process of `RecordWriter#copyFromSerializerToTargetChannel`, the `randomTriggered` field is still maintained in `BroadcastRecordWriter`, but I think the logic is easy to understand than the first version. After abstracting the `RecordWriter` to separate two implementations, the changes seem more complex, because there are many abstract methods involved in `BufferBuilder` and `ChannelSelector` which are the main differences between two implementations. I am not sure you could accept the way of magic 0 in `BroadcastRecordWriter#broadcastEmit` for reusing the common codes. If we do not pass the target channel in broadcast operation, we might need to realize all the related processes in `BroadcastRecordWriter`. I think my first version seems more easy to follow if we only change the process of `BroadcastRecordWriter#randomEmit` as now. Maybe you have other good ways. :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #7713: [FLINK-10995][network] Copy intermediate serialization results only once for broadcast mode
zhijiangW commented on a change in pull request #7713: [FLINK-10995][network] Copy intermediate serialization results only once for broadcast mode URL: https://github.com/apache/flink/pull/7713#discussion_r263749333 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/BroadcastRecordWriter.java ## @@ -44,4 +52,67 @@ public BroadcastRecordWriter( public void emit(T record) throws IOException, InterruptedException { broadcastEmit(record); } + + @Override + public void broadcastEmit(T record) throws IOException, InterruptedException { Review comment: Yes, you are right. We do not need hardcode the size of latency marker. :) I have submitted the codes in a separate commit. I am not sure whether this changes is suitable based on our comments, so I only refactored the main processes and ignored the tests with annotations. After your review and confirm, I would refactor the commits and tests. I think it is better to explain some thoughts below: In order to reuse the main process of `RecordWriter#copyFromSerializerToTargetChannel`, the `randomTriggered` field is still maintained in `BroadcastRecordWriter`, but I think the logic is easy to understand than the first version. After abstracting the `RecordWriter` to separate two implementations, the changes seem more complex, because there are many abstract methods involved in `BufferBuilder` and `ChannelSelector` which are the main differences between two implementations. I am not sure you could accept the way of magic 0 in `BroadcastRecordWriter#broadcastEmit` for reusing the common codes. If we do not pass the target channel in broadcast operation, we might need to realize all the related processes in `BroadcastRecordWriter`. I think my first version seems more easy to follow if we only change the process of `BroadcastRecordWriter#randomEmit` as now. Maybe you have other good ways. :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-11818) Provide pipe transformation function for DataSet API
[ https://issues.apache.org/jira/browse/FLINK-11818?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16787763#comment-16787763 ] vinoyang commented on FLINK-11818: -- Hi [~hequn8128] , In fact, my idea is not much different from the current implementation of Spark. 1) We can provide multiple overloaded methods called pipe for the DataSet object. E.g, p{{ipe(String cmd)/pipe(String cmd, Map env)...}}, Flink inputs the external program and gets the output of the external program as a new DataSet. [1] [2] 2) I think its semantics are similar to Spark. [1]: [https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala] [2]: [https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala] What do you think? cc [~fhueske] [~till.rohrmann] > Provide pipe transformation function for DataSet API > > > Key: FLINK-11818 > URL: https://issues.apache.org/jira/browse/FLINK-11818 > Project: Flink > Issue Type: Improvement > Components: API / DataSet >Reporter: vinoyang >Assignee: vinoyang >Priority: Major > > We have some business requirements that require the data handled by Flink to > interact with some external programs (such as Python/Perl/shell scripts). > There is no such function in the existing DataSet API, although it can be > implemented by the map function, but it is not concise. It would be helpful > if we could provide a pipe[1] function like Spark. > [1]: > https://spark.apache.org/docs/latest/rdd-programming-guide.html#transformations -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11862) many differents query on same stream cause second condition of where of second query error
[ https://issues.apache.org/jira/browse/FLINK-11862?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhengbm updated FLINK-11862: Description: List fields = Lists.newArrayList("rawMessage","timestamp"); Schema schema = new Schema(); for (int i = 0; i < fields.size(); i++) { schema.field(fields.get(i), Types.STRING()).from(fields.get(i)); } tableEnvironment.connect(new Kafka() .version("0.8") .properties(properties) .topic("raw_playtime_h5_source") .startFromLatest() ) .withFormat(new Json().failOnMissingField(false).deriveSchema()) .withSchema(schema) .inAppendMode() .registerTableSource("t1"); Table table2 = tableEnvironment .sqlQuery("select maps,`timestamp`,CARDINALITY(maps) AS maps_length ,1 as flash from t1 ,LATERAL TABLE(split(rawMessage,' t')) as T(maps) "); tableEnvironment.registerTable("t2", table2); Table table = tableEnvironment.sqlQuery("select `timestamp`,maps_length from t2 where maps_length>0"); TypeInformation typeInformation = table.getSchema().toRowType(); String[] columns = table.getSchema().getFieldNames(); DataStream dataStream = tableEnvironment .toAppendStream(table, typeInformation) .map(new PhysicTransformMap(columns, 0)); dataStream.print(); try { env.execute(); } catch (Exception e) { e.printStackTrace(); } {color:#d04437}noted:message of kafka : \{"timestamp" : "","rawMessage":"xxx\txxx\t\t"}(if I delete where condition of second query will ok,for "select `timestamp`,maps_length from t2 " or "select `timestamp`,maps_length from t2 where timestamp>0" is also ok){color} {color:#d04437}next is trace of exception{color} Exception in thread "main" org.apache.flink.table.codegen.CodeGenException: Invalid input access. at org.apache.flink.table.codegen.CodeGenerator$$anonfun$15.apply(CodeGenerator.scala:587) at org.apache.flink.table.codegen.CodeGenerator$$anonfun$15.apply(CodeGenerator.scala:587) at scala.Option.getOrElse(Option.scala:120) at org.apache.flink.table.codegen.CodeGenerator.visitInputRef(CodeGenerator.scala:587) at org.apache.flink.table.codegen.CodeGenerator.visitInputRef(CodeGenerator.scala:66) at org.apache.calcite.rex.RexInputRef.accept(RexInputRef.java:112) at org.apache.flink.table.codegen.CodeGenerator$$anonfun$16.apply(CodeGenerator.scala:754) at org.apache.flink.table.codegen.CodeGenerator$$anonfun$16.apply(CodeGenerator.scala:744) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at scala.collection.TraversableLike$class.map(TraversableLike.scala:245) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:744) at org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:66) at org.apache.calcite.rex.RexCall.accept(RexCall.java:107) at org.apache.flink.table.codegen.CodeGenerator$$anonfun$16.apply(CodeGenerator.scala:754) at org.apache.flink.table.codegen.CodeGenerator$$anonfun$16.apply(CodeGenerator.scala:744) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at scala.collection.TraversableLike$class.map(TraversableLike.scala:245) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:744) at org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:66) at org.apache.calcite.rex.RexCall.accept(RexCall.java:107) at org.apache.flink.table.codegen.CodeGenerator.generateExpression(CodeGenerator.scala:247) at org.apache.flink.table.plan.nodes.CommonCorrelate$class.generateCollector(CommonCorrelate.scala:155) at org.apache.flink.table.plan.nodes.datastream.DataStreamCorrelate.generateCollector(DataStreamCorrelate.scala:38) at org.apache.flink.table.plan.nodes.datastream.DataStreamCorrelate.translateToPlan(DataStreamCorrelate.scala:116) at org.apache.flink.table.plan.nodes.datastream.DataStreamCalc.translateToPlan(DataStreamCalc.scala:97) at org.apache.flink.table.api.StreamTableEnvironment.translateToCRow(StreamTableEnvironment.scala:967) at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:894) at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:864) at org.apache.flink.table.api.java.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:224) at
[jira] [Updated] (FLINK-11862) many differents query on same stream cause second condition of where of second query error
[ https://issues.apache.org/jira/browse/FLINK-11862?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhengbm updated FLINK-11862: Summary: many differents query on same stream cause second condition of where of second query error (was: 在同一条流上进行多次不同的sql,第二个sql的where条件不可用) > many differents query on same stream cause second condition of where of > second query error > -- > > Key: FLINK-11862 > URL: https://issues.apache.org/jira/browse/FLINK-11862 > Project: Flink > Issue Type: Bug > Components: API / Table SQL >Affects Versions: 1.7.2 > Environment: flink 1.7版本 java 1.8 >Reporter: zhengbm >Priority: Major > > List fields = Lists.newArrayList("rawMessage","timestamp"); > Schema schema = new Schema(); > for (int i = 0; i < fields.size(); i++) > { schema.field(fields.get(i), Types.STRING()).from(fields.get(i)); } > tableEnvironment.connect(new Kafka() > .version("0.8") > .properties(properties) > .topic("raw_playtime_h5_source") > .startFromLatest() > ) > .withFormat(new Json().failOnMissingField(false).deriveSchema()) > .withSchema(schema) > .inAppendMode() > .registerTableSource("t1"); > Table table2 = tableEnvironment > .sqlQuery("select maps,`timestamp`,CARDINALITY(maps) AS maps_length ,1 as > flash from t1 ,LATERAL TABLE(split(rawMessage,' > t')) as T(maps) "); > tableEnvironment.registerTable("t2", table2); > Table table = tableEnvironment.sqlQuery("select `timestamp`,maps_length from > t2 where maps_length>0"); > TypeInformation typeInformation = table.getSchema().toRowType(); > String[] columns = table.getSchema().getFieldNames(); > DataStream dataStream = tableEnvironment > .toAppendStream(table, typeInformation) > .map(new PhysicTransformMap(columns, 0)); > dataStream.print(); > try > { env.execute(); } > catch (Exception e) > { e.printStackTrace(); } > {color:#d04437}注:kafka中的数据流格式如下\{"timestamp" : > "","rawMessage":"xxx\txxx\t\t"}(如果where条件换成timestamp则可用,或者把where条件去掉程序运行正常)函数split是把一条string切分为map结构{color} > 报错信息如下 > Exception in thread "main" org.apache.flink.table.codegen.CodeGenException: > Invalid input access. > at > org.apache.flink.table.codegen.CodeGenerator$$anonfun$15.apply(CodeGenerator.scala:587) > at > org.apache.flink.table.codegen.CodeGenerator$$anonfun$15.apply(CodeGenerator.scala:587) > at scala.Option.getOrElse(Option.scala:120) > at > org.apache.flink.table.codegen.CodeGenerator.visitInputRef(CodeGenerator.scala:587) > at > org.apache.flink.table.codegen.CodeGenerator.visitInputRef(CodeGenerator.scala:66) > at org.apache.calcite.rex.RexInputRef.accept(RexInputRef.java:112) > at > org.apache.flink.table.codegen.CodeGenerator$$anonfun$16.apply(CodeGenerator.scala:754) > at > org.apache.flink.table.codegen.CodeGenerator$$anonfun$16.apply(CodeGenerator.scala:744) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:245) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:744) > at > org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:66) > at org.apache.calcite.rex.RexCall.accept(RexCall.java:107) > at > org.apache.flink.table.codegen.CodeGenerator$$anonfun$16.apply(CodeGenerator.scala:754) > at > org.apache.flink.table.codegen.CodeGenerator$$anonfun$16.apply(CodeGenerator.scala:744) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:245) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:744) > at > org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:66) > at org.apache.calcite.rex.RexCall.accept(RexCall.java:107) > at > org.apache.flink.table.codegen.CodeGenerator.generateExpression(CodeGenerator.scala:247) > at > org.apache.flink.table.plan.nodes.CommonCorrelate$class.generateCollector(CommonCorrelate.scala:155) > at >
[jira] [Updated] (FLINK-11862) 在同一条流上进行多次不同的sql,第二个sql的where条件不可用
[ https://issues.apache.org/jira/browse/FLINK-11862?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhengbm updated FLINK-11862: Description: List fields = Lists.newArrayList("rawMessage","timestamp"); Schema schema = new Schema(); for (int i = 0; i < fields.size(); i++) { schema.field(fields.get(i), Types.STRING()).from(fields.get(i)); } tableEnvironment.connect(new Kafka() .version("0.8") .properties(properties) .topic("raw_playtime_h5_source") .startFromLatest() ) .withFormat(new Json().failOnMissingField(false).deriveSchema()) .withSchema(schema) .inAppendMode() .registerTableSource("t1"); Table table2 = tableEnvironment .sqlQuery("select maps,`timestamp`,CARDINALITY(maps) AS maps_length ,1 as flash from t1 ,LATERAL TABLE(split(rawMessage,' t')) as T(maps) "); tableEnvironment.registerTable("t2", table2); Table table = tableEnvironment.sqlQuery("select `timestamp`,maps_length from t2 where maps_length>0"); TypeInformation typeInformation = table.getSchema().toRowType(); String[] columns = table.getSchema().getFieldNames(); DataStream dataStream = tableEnvironment .toAppendStream(table, typeInformation) .map(new PhysicTransformMap(columns, 0)); dataStream.print(); try { env.execute(); } catch (Exception e) { e.printStackTrace(); } {color:#d04437}注:kafka中的数据流格式如下\{"timestamp" : "","rawMessage":"xxx\txxx\t\t"}(如果where条件换成timestamp则可用,或者把where条件去掉程序运行正常)函数split是把一条string切分为map结构{color} 报错信息如下 Exception in thread "main" org.apache.flink.table.codegen.CodeGenException: Invalid input access. at org.apache.flink.table.codegen.CodeGenerator$$anonfun$15.apply(CodeGenerator.scala:587) at org.apache.flink.table.codegen.CodeGenerator$$anonfun$15.apply(CodeGenerator.scala:587) at scala.Option.getOrElse(Option.scala:120) at org.apache.flink.table.codegen.CodeGenerator.visitInputRef(CodeGenerator.scala:587) at org.apache.flink.table.codegen.CodeGenerator.visitInputRef(CodeGenerator.scala:66) at org.apache.calcite.rex.RexInputRef.accept(RexInputRef.java:112) at org.apache.flink.table.codegen.CodeGenerator$$anonfun$16.apply(CodeGenerator.scala:754) at org.apache.flink.table.codegen.CodeGenerator$$anonfun$16.apply(CodeGenerator.scala:744) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at scala.collection.TraversableLike$class.map(TraversableLike.scala:245) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:744) at org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:66) at org.apache.calcite.rex.RexCall.accept(RexCall.java:107) at org.apache.flink.table.codegen.CodeGenerator$$anonfun$16.apply(CodeGenerator.scala:754) at org.apache.flink.table.codegen.CodeGenerator$$anonfun$16.apply(CodeGenerator.scala:744) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at scala.collection.TraversableLike$class.map(TraversableLike.scala:245) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:744) at org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:66) at org.apache.calcite.rex.RexCall.accept(RexCall.java:107) at org.apache.flink.table.codegen.CodeGenerator.generateExpression(CodeGenerator.scala:247) at org.apache.flink.table.plan.nodes.CommonCorrelate$class.generateCollector(CommonCorrelate.scala:155) at org.apache.flink.table.plan.nodes.datastream.DataStreamCorrelate.generateCollector(DataStreamCorrelate.scala:38) at org.apache.flink.table.plan.nodes.datastream.DataStreamCorrelate.translateToPlan(DataStreamCorrelate.scala:116) at org.apache.flink.table.plan.nodes.datastream.DataStreamCalc.translateToPlan(DataStreamCalc.scala:97) at org.apache.flink.table.api.StreamTableEnvironment.translateToCRow(StreamTableEnvironment.scala:967) at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:894) at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:864) at org.apache.flink.table.api.java.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:224) at org.apache.flink.table.api.java.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:173) was: List fields = Lists.newArrayList("rawMessage","timestamp");
[jira] [Updated] (FLINK-11862) 在同一条流上进行多次不同的sql,第二个sql的where条件不可用
[ https://issues.apache.org/jira/browse/FLINK-11862?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhengbm updated FLINK-11862: Description: List fields = Lists.newArrayList("rawMessage","timestamp"); Schema schema = new Schema(); for (int i = 0; i < fields.size(); i++) { schema.field(fields.get(i), Types.STRING()).from(fields.get(i)); } tableEnvironment.connect(new Kafka() .version("0.8") .properties(properties) .topic("raw_playtime_h5_source") .startFromLatest() ) .withFormat(new Json().failOnMissingField(false).deriveSchema()) .withSchema(schema) .inAppendMode() .registerTableSource("t1"); Table table2 = tableEnvironment .sqlQuery("select maps,`timestamp`,CARDINALITY(maps) AS maps_length ,1 as flash from t1 ,LATERAL TABLE(split(rawMessage,' t')) as T(maps) "); tableEnvironment.registerTable("t2", table2); Table table = tableEnvironment.sqlQuery("select `timestamp`,maps_length from t2 where maps_length>0"); TypeInformation typeInformation = table.getSchema().toRowType(); String[] columns = table.getSchema().getFieldNames(); DataStream dataStream = tableEnvironment .toAppendStream(table, typeInformation) .map(new PhysicTransformMap(columns, 0)); dataStream.print(); try { env.execute(); } catch (Exception e) { e.printStackTrace(); } {color:#d04437}注:kafka中的数据流格式如下\{"timestamp" : "","rawMessage":"xxx\txxx\t\t"}(如果where条件换成timestamp则可用,或者把where条件去掉程序运行正常){color} 报错信息如下 Exception in thread "main" org.apache.flink.table.codegen.CodeGenException: Invalid input access. at org.apache.flink.table.codegen.CodeGenerator$$anonfun$15.apply(CodeGenerator.scala:587) at org.apache.flink.table.codegen.CodeGenerator$$anonfun$15.apply(CodeGenerator.scala:587) at scala.Option.getOrElse(Option.scala:120) at org.apache.flink.table.codegen.CodeGenerator.visitInputRef(CodeGenerator.scala:587) at org.apache.flink.table.codegen.CodeGenerator.visitInputRef(CodeGenerator.scala:66) at org.apache.calcite.rex.RexInputRef.accept(RexInputRef.java:112) at org.apache.flink.table.codegen.CodeGenerator$$anonfun$16.apply(CodeGenerator.scala:754) at org.apache.flink.table.codegen.CodeGenerator$$anonfun$16.apply(CodeGenerator.scala:744) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at scala.collection.TraversableLike$class.map(TraversableLike.scala:245) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:744) at org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:66) at org.apache.calcite.rex.RexCall.accept(RexCall.java:107) at org.apache.flink.table.codegen.CodeGenerator$$anonfun$16.apply(CodeGenerator.scala:754) at org.apache.flink.table.codegen.CodeGenerator$$anonfun$16.apply(CodeGenerator.scala:744) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at scala.collection.TraversableLike$class.map(TraversableLike.scala:245) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:744) at org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:66) at org.apache.calcite.rex.RexCall.accept(RexCall.java:107) at org.apache.flink.table.codegen.CodeGenerator.generateExpression(CodeGenerator.scala:247) at org.apache.flink.table.plan.nodes.CommonCorrelate$class.generateCollector(CommonCorrelate.scala:155) at org.apache.flink.table.plan.nodes.datastream.DataStreamCorrelate.generateCollector(DataStreamCorrelate.scala:38) at org.apache.flink.table.plan.nodes.datastream.DataStreamCorrelate.translateToPlan(DataStreamCorrelate.scala:116) at org.apache.flink.table.plan.nodes.datastream.DataStreamCalc.translateToPlan(DataStreamCalc.scala:97) at org.apache.flink.table.api.StreamTableEnvironment.translateToCRow(StreamTableEnvironment.scala:967) at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:894) at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:864) at org.apache.flink.table.api.java.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:224) at org.apache.flink.table.api.java.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:173) was: List fields = Lists.newArrayList("rawMessage","timestamp"); Schema schema = new Schema();
[jira] [Created] (FLINK-11862) 在同一条流上进行多次不同的sql,第二个sql的where条件不可用
zhengbm created FLINK-11862: --- Summary: 在同一条流上进行多次不同的sql,第二个sql的where条件不可用 Key: FLINK-11862 URL: https://issues.apache.org/jira/browse/FLINK-11862 Project: Flink Issue Type: Bug Components: API / Table SQL Affects Versions: 1.7.2 Environment: flink 1.7版本 java 1.8 Reporter: zhengbm List fields = Lists.newArrayList("rawMessage","timestamp"); Schema schema = new Schema(); for (int i = 0; i < fields.size(); i++) { schema.field(fields.get(i), Types.STRING()).from(fields.get(i)); } tableEnvironment.connect(new Kafka() .version("0.8") .properties(properties) .topic("raw_playtime_h5_source") .startFromLatest() ) .withFormat(new Json().failOnMissingField(false).deriveSchema()) .withSchema(schema) .inAppendMode() .registerTableSource("t1"); Table table2 = tableEnvironment .sqlQuery("select maps,`timestamp`,CARDINALITY(maps) AS maps_length ,1 as flash from t1 ,LATERAL TABLE(split(rawMessage,'\\t')) as T(maps) "); tableEnvironment.registerTable("t2", table2); Table table = tableEnvironment.sqlQuery("select `timestamp`,maps_length from t2 where maps_length>0"); TypeInformation typeInformation = table.getSchema().toRowType(); String[] columns = table.getSchema().getFieldNames(); DataStream dataStream = tableEnvironment .toAppendStream(table, typeInformation) .map(new PhysicTransformMap(columns, 0)); dataStream.print(); try { env.execute(); } catch (Exception e) { e.printStackTrace(); } 注:kafka中的数据流格式如下\{"timestamp" : "","rawMessage":"xxx\txxx\t\t"} Exception in thread "main" org.apache.flink.table.codegen.CodeGenException: Invalid input access. at org.apache.flink.table.codegen.CodeGenerator$$anonfun$15.apply(CodeGenerator.scala:587) at org.apache.flink.table.codegen.CodeGenerator$$anonfun$15.apply(CodeGenerator.scala:587) at scala.Option.getOrElse(Option.scala:120) at org.apache.flink.table.codegen.CodeGenerator.visitInputRef(CodeGenerator.scala:587) at org.apache.flink.table.codegen.CodeGenerator.visitInputRef(CodeGenerator.scala:66) at org.apache.calcite.rex.RexInputRef.accept(RexInputRef.java:112) at org.apache.flink.table.codegen.CodeGenerator$$anonfun$16.apply(CodeGenerator.scala:754) at org.apache.flink.table.codegen.CodeGenerator$$anonfun$16.apply(CodeGenerator.scala:744) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at scala.collection.TraversableLike$class.map(TraversableLike.scala:245) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:744) at org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:66) at org.apache.calcite.rex.RexCall.accept(RexCall.java:107) at org.apache.flink.table.codegen.CodeGenerator$$anonfun$16.apply(CodeGenerator.scala:754) at org.apache.flink.table.codegen.CodeGenerator$$anonfun$16.apply(CodeGenerator.scala:744) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at scala.collection.TraversableLike$class.map(TraversableLike.scala:245) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:744) at org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:66) at org.apache.calcite.rex.RexCall.accept(RexCall.java:107) at org.apache.flink.table.codegen.CodeGenerator.generateExpression(CodeGenerator.scala:247) at org.apache.flink.table.plan.nodes.CommonCorrelate$class.generateCollector(CommonCorrelate.scala:155) at org.apache.flink.table.plan.nodes.datastream.DataStreamCorrelate.generateCollector(DataStreamCorrelate.scala:38) at org.apache.flink.table.plan.nodes.datastream.DataStreamCorrelate.translateToPlan(DataStreamCorrelate.scala:116) at org.apache.flink.table.plan.nodes.datastream.DataStreamCalc.translateToPlan(DataStreamCalc.scala:97) at org.apache.flink.table.api.StreamTableEnvironment.translateToCRow(StreamTableEnvironment.scala:967) at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:894) at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:864) at org.apache.flink.table.api.java.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:224) at
[jira] [Created] (FLINK-11861) JobMasterTriggerSavepointIT case is not executed
shuai.xu created FLINK-11861: Summary: JobMasterTriggerSavepointIT case is not executed Key: FLINK-11861 URL: https://issues.apache.org/jira/browse/FLINK-11861 Project: Flink Issue Type: Bug Components: Tests Affects Versions: 1.7.2 Reporter: shuai.xu The [JobMasterTriggerSavepointIT|https://github.com/apache/flink/blob/master/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointIT.java] will not be executed as the case name does not follow the style ***ITCase. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-11858) Introduce block compressor/decompressor for batch table runtime
[ https://issues.apache.org/jira/browse/FLINK-11858?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kurt Young closed FLINK-11858. -- Resolution: Implemented Fix Version/s: 1.9.0 fixed in 2345bbe0c8cfc1af75b8faef243dcde11777d5b7 > Introduce block compressor/decompressor for batch table runtime > --- > > Key: FLINK-11858 > URL: https://issues.apache.org/jira/browse/FLINK-11858 > Project: Flink > Issue Type: Improvement > Components: Runtime / Operators >Reporter: Kurt Young >Assignee: Kurt Young >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 10m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] KurtYoung closed pull request #7941: [FLINK-11858][table-runtime-blink] Introduce block compression to batch table runtime
KurtYoung closed pull request #7941: [FLINK-11858][table-runtime-blink] Introduce block compression to batch table runtime URL: https://github.com/apache/flink/pull/7941 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10575) Remove deprecated ExecutionGraphBuilder.buildGraph method
[ https://issues.apache.org/jira/browse/FLINK-10575?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16787698#comment-16787698 ] TisonKun commented on FLINK-10575: -- [~till.rohrmann] Besides remove the deprecated method, I notice that parameter {{prior}} is always {{null}}. It is still valid concept that we might attach a job graph to an existing execution graph? {code:java} /** * Builds the ExecutionGraph from the JobGraph. * If a prior execution graph exists, the JobGraph will be attached. If no prior execution * graph exists, then the JobGraph will become attach to a new empty execution graph. */ public static ExecutionGraph buildGraph( @Nullable ExecutionGraph prior, {code} > Remove deprecated ExecutionGraphBuilder.buildGraph method > - > > Key: FLINK-10575 > URL: https://issues.apache.org/jira/browse/FLINK-10575 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Affects Versions: 1.7.0 >Reporter: JIN SUN >Assignee: JIN SUN >Priority: Major > > ExecutionGraphBuilder is not a public API and we should able to remove > deprecated method such as: > @Deprecated > public static ExecutionGraph buildGraph > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-11837) Improve internal data format
[ https://issues.apache.org/jira/browse/FLINK-11837?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kurt Young closed FLINK-11837. -- Resolution: Implemented Fix Version/s: 1.9.0 fixed in 043540bfd29ff1886c83c71c7b178d1261fa35bd > Improve internal data format > > > Key: FLINK-11837 > URL: https://issues.apache.org/jira/browse/FLINK-11837 > Project: Flink > Issue Type: New Feature > Components: Runtime / Operators >Reporter: Jingsong Lee >Assignee: Jingsong Lee >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 20m > Remaining Estimate: 0h > > In FLINK-11701 , we introduce an abstract set of internal data formats to > table-runtime-blink. This JIRA is a complement to it. > Introduce Decimal: Scale of this object is specified by the user, not > automatically determined(like BigDecimal). > Introduce BinaryGeneric(GenericType): We don't care about specific types, we > work through serializers. > Introduce LazyBinaryFormat: It can exist in two formats: binary or java object -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] KurtYoung closed pull request #7913: [FLINK-11837][table-runtime-blink] Improve internal data format
KurtYoung closed pull request #7913: [FLINK-11837][table-runtime-blink] Improve internal data format URL: https://github.com/apache/flink/pull/7913 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] KurtYoung commented on issue #7913: [FLINK-11837][table-runtime-blink] Improve internal data format
KurtYoung commented on issue #7913: [FLINK-11837][table-runtime-blink] Improve internal data format URL: https://github.com/apache/flink/pull/7913#issuecomment-470860725 LGTM, merging... This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10575) Remove deprecated ExecutionGraphBuilder.buildGraph method
[ https://issues.apache.org/jira/browse/FLINK-10575?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16787697#comment-16787697 ] TisonKun commented on FLINK-10575: -- [~isunjin] We can process this JIRA now. Since you looks like inactive for some times, I'd like to take over it in days. > Remove deprecated ExecutionGraphBuilder.buildGraph method > - > > Key: FLINK-10575 > URL: https://issues.apache.org/jira/browse/FLINK-10575 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Affects Versions: 1.7.0 >Reporter: JIN SUN >Assignee: JIN SUN >Priority: Major > > ExecutionGraphBuilder is not a public API and we should able to remove > deprecated method such as: > @Deprecated > public static ExecutionGraph buildGraph > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11857) Introduce BinaryExternalSorter to batch table runtime
[ https://issues.apache.org/jira/browse/FLINK-11857?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kurt Young updated FLINK-11857: --- Description: We need a sorter to take full advantage of the high performance of the Binary format. > Introduce BinaryExternalSorter to batch table runtime > - > > Key: FLINK-11857 > URL: https://issues.apache.org/jira/browse/FLINK-11857 > Project: Flink > Issue Type: New Feature > Components: Runtime / Operators >Reporter: Jingsong Lee >Assignee: Jingsong Lee >Priority: Major > > We need a sorter to take full advantage of the high performance of the Binary > format. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11857) Introduce BinaryExternalSorter to batch table runtime
[ https://issues.apache.org/jira/browse/FLINK-11857?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kurt Young updated FLINK-11857: --- Environment: (was: We need a sorter to take full advantage of the high performance of the Binary format.) > Introduce BinaryExternalSorter to batch table runtime > - > > Key: FLINK-11857 > URL: https://issues.apache.org/jira/browse/FLINK-11857 > Project: Flink > Issue Type: New Feature > Components: Runtime / Operators >Reporter: Jingsong Lee >Assignee: Jingsong Lee >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] KurtYoung commented on a change in pull request #7931: [FLINK-11854] [table-planner-blink] Introduce batch physical nodes
KurtYoung commented on a change in pull request #7931: [FLINK-11854] [table-planner-blink] Introduce batch physical nodes URL: https://github.com/apache/flink/pull/7931#discussion_r263702948 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/RelNodeUtil.scala ## @@ -0,0 +1,630 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.plan.util + +import org.apache.flink.api.common.operators.Order +import org.apache.flink.table.CalcitePair +import org.apache.flink.table.`type`.InternalType +import org.apache.flink.table.api.TableException +import org.apache.flink.table.dataview.DataViewSpec +import org.apache.flink.table.functions.{AggregateFunction, UserDefinedFunction} +import org.apache.flink.table.plan.FlinkJoinRelType +import org.apache.flink.table.plan.nodes.ExpressionFormat +import org.apache.flink.table.plan.nodes.ExpressionFormat.ExpressionFormat +import org.apache.flink.table.plan.nodes.physical.batch.BatchPhysicalRel +import org.apache.flink.table.typeutils.BinaryRowSerializer + +import org.apache.calcite.plan.{RelOptCost, RelOptPlanner} +import org.apache.calcite.rel.RelFieldCollation.Direction +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.core.Window.Group +import org.apache.calcite.rel.core.{AggregateCall, Calc, Window} +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.calcite.rel.{RelFieldCollation, RelNode, RelWriter} +import org.apache.calcite.rex.{RexCall, RexInputRef, RexLiteral, RexNode, RexProgram, RexWindowBound} +import org.apache.calcite.sql.SqlKind +import org.apache.calcite.util.mapping.IntPair + +import java.util + +import scala.collection.JavaConversions._ +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer + +/** + * Utility methods for RelNode. + */ +object RelNodeUtil { Review comment: BTW, this class name is also too big This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] KurtYoung commented on a change in pull request #7931: [FLINK-11854] [table-planner-blink] Introduce batch physical nodes
KurtYoung commented on a change in pull request #7931: [FLINK-11854] [table-planner-blink] Introduce batch physical nodes URL: https://github.com/apache/flink/pull/7931#discussion_r263702820 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/RelNodeUtil.scala ## @@ -0,0 +1,630 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.plan.util + +import org.apache.flink.api.common.operators.Order +import org.apache.flink.table.CalcitePair +import org.apache.flink.table.`type`.InternalType +import org.apache.flink.table.api.TableException +import org.apache.flink.table.dataview.DataViewSpec +import org.apache.flink.table.functions.{AggregateFunction, UserDefinedFunction} +import org.apache.flink.table.plan.FlinkJoinRelType +import org.apache.flink.table.plan.nodes.ExpressionFormat +import org.apache.flink.table.plan.nodes.ExpressionFormat.ExpressionFormat +import org.apache.flink.table.plan.nodes.physical.batch.BatchPhysicalRel +import org.apache.flink.table.typeutils.BinaryRowSerializer + +import org.apache.calcite.plan.{RelOptCost, RelOptPlanner} +import org.apache.calcite.rel.RelFieldCollation.Direction +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.core.Window.Group +import org.apache.calcite.rel.core.{AggregateCall, Calc, Window} +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.calcite.rel.{RelFieldCollation, RelNode, RelWriter} +import org.apache.calcite.rex.{RexCall, RexInputRef, RexLiteral, RexNode, RexProgram, RexWindowBound} +import org.apache.calcite.sql.SqlKind +import org.apache.calcite.util.mapping.IntPair + +import java.util + +import scala.collection.JavaConversions._ +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer + +/** + * Utility methods for RelNode. + */ +object RelNodeUtil { Review comment: I think this utility class should focus on operator name formatting, things like compute cost should be operator specific This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] asfgit merged pull request #7923: [FLINK-11703][table-planner-blink] Introduce TableEnvironments and support registerDataStream and sqlQuery
asfgit merged pull request #7923: [FLINK-11703][table-planner-blink] Introduce TableEnvironments and support registerDataStream and sqlQuery URL: https://github.com/apache/flink/pull/7923 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Closed] (FLINK-11703) Introduce TableEnvironment and support registerDataStream and sqlQuery
[ https://issues.apache.org/jira/browse/FLINK-11703?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu closed FLINK-11703. --- Resolution: Fixed Fixed in 1.9: 586b890237bc2516fa202bfb8c89f5dea4209143 > Introduce TableEnvironment and support registerDataStream and sqlQuery > -- > > Key: FLINK-11703 > URL: https://issues.apache.org/jira/browse/FLINK-11703 > Project: Flink > Issue Type: New Feature > Components: SQL / Planner >Reporter: Jark Wu >Assignee: Jark Wu >Priority: Major > Labels: pull-request-available > Time Spent: 0.5h > Remaining Estimate: 0h > > In order to support plan test, we need the ability to register a DataStream > (or TableSource) and perform sql query on it. > This issue is aiming to introduce a basic and simple TableEnvironment which > only support {{registerDataStream}} and {{sqlQuery}}. > In the context of FLINK-11067 and FLINK-11068 (convert TableEnvironments and > Tables to interfaces), the introduced {{TableEnvironment}} in > {{table-planner-blink}} is a temporary solution and will be refactored/moved > to a {{BlinkPlanner}}, once FLINK-11067, FLINK-11068 is done. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11068) Convert the API classes *Table, *Window to interfaces
[ https://issues.apache.org/jira/browse/FLINK-11068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16787669#comment-16787669 ] Hequn Cheng commented on FLINK-11068: - [~twalthr] Hi, I have almost finished this issue. I think it's good to sync with you first, then submit the PR once the Expression PR is finished. The main issues need to be confirmed with you are as follows: 1. Create only one Table interface in flink-table-api-java and rename the current Table to TableImpl. 2. Considering {{OverWindowedTable window(OverWindow... overWindows)}} in Table, the {{WindowedTable window(Window window)}} may be confused to users. From the API, it looks like OverWindow is a subclass of Window, however, the Window is actually a GroupWindow, thus maybe we can change the api from {code:java} WindowedTable window(Window window); {code} to {code:java} GroupWindowedTable window(GroupWindow groupWindow); {code} and the two APIs are as follows(when we put them together) which I think is more clear: {code:java} GroupWindowedTable window(GroupWindow groupWindow); OverWindowedTable window(OverWindow... overWindows); {code} 3.TableConversions.scala and other conversion classes will stay in the current folder for now and be moved into the api module once TableEnvironment has been converted into an interface. Would be great to have your suggestions. Thank you! > Convert the API classes *Table, *Window to interfaces > - > > Key: FLINK-11068 > URL: https://issues.apache.org/jira/browse/FLINK-11068 > Project: Flink > Issue Type: Improvement > Components: API / Table SQL >Reporter: Timo Walther >Assignee: Hequn Cheng >Priority: Major > > A more detailed description can be found in > [FLIP-32|https://cwiki.apache.org/confluence/display/FLINK/FLIP-32%3A+Restructure+flink-table+for+future+contributions]. > This includes: Table, GroupedTable, WindowedTable, WindowGroupedTable, > OverWindowedTable, Window, OverWindow > We can keep the "Table" Scala implementation in a planner module until it has > been converted to Java. > We can add a method to the planner later to give us a concrete instance. This > is one possibility to have a smooth transition period instead of changing all > classes at once. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] wuchong commented on issue #7923: [FLINK-11703][table-planner-blink] Introduce TableEnvironments and support registerDataStream and sqlQuery
wuchong commented on issue #7923: [FLINK-11703][table-planner-blink] Introduce TableEnvironments and support registerDataStream and sqlQuery URL: https://github.com/apache/flink/pull/7923#issuecomment-470849645 The failed python test is not related to this PR. Merging... This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on issue #7941: [FLINK-11858][table-runtime-blink] Introduce block compression to batch table runtime
JingsongLi commented on issue #7941: [FLINK-11858][table-runtime-blink] Introduce block compression to batch table runtime URL: https://github.com/apache/flink/pull/7941#issuecomment-470848379 +1 LGTM This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] KurtYoung commented on issue #7941: [FLINK-11858][table-runtime-blink] Introduce block compression to batch table runtime
KurtYoung commented on issue #7941: [FLINK-11858][table-runtime-blink] Introduce block compression to batch table runtime URL: https://github.com/apache/flink/pull/7941#issuecomment-470848112 Hi @JingsongLi , thanks for the review, i have addressed your comment. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on a change in pull request #7913: [FLINK-11837][table-runtime-blink] Improve internal data format
JingsongLi commented on a change in pull request #7913: [FLINK-11837][table-runtime-blink] Improve internal data format URL: https://github.com/apache/flink/pull/7913#discussion_r263696844 ## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/JoinedRow.java ## @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License.You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.dataformat; + +/** + * Join two row to one row. + */ +public final class JoinedRow implements BaseRow { + + private BaseRow row1; + private BaseRow row2; + private byte header; + + public JoinedRow() {} + + public JoinedRow(BaseRow row1, BaseRow row2) { + this.row1 = row1; + this.row2 = row2; + } + + public JoinedRow replace(BaseRow row1, BaseRow row2) { + this.row1 = row1; + this.row2 = row2; + return this; + } + + @Override + public int getArity() { + return row1.getArity() + row2.getArity(); + } + + @Override + public byte getHeader() { + return header; + } + + @Override + public void setHeader(byte header) { + this.header = header; + } + + @Override + public boolean isNullAt(int i) { + if (i < row1.getArity()) { + return row1.isNullAt(i); + } else { + return row2.isNullAt(i - row1.getArity()); + } + } + + @Override + public boolean getBoolean(int i) { + if (i < row1.getArity()) { + return row1.getBoolean(i); + } else { + return row2.getBoolean(i - row1.getArity()); + } + } + + @Override + public byte getByte(int i) { + if (i < row1.getArity()) { + return row1.getByte(i); + } else { + return row2.getByte(i - row1.getArity()); + } + } + + @Override + public short getShort(int i) { + if (i < row1.getArity()) { + return row1.getShort(i); + } else { + return row2.getShort(i - row1.getArity()); + } + } + + @Override + public int getInt(int i) { + if (i < row1.getArity()) { + return row1.getInt(i); + } else { + return row2.getInt(i - row1.getArity()); + } + } + + @Override + public long getLong(int i) { + if (i < row1.getArity()) { + return row1.getLong(i); + } else { + return row2.getLong(i - row1.getArity()); + } + } + + @Override + public float getFloat(int i) { + if (i < row1.getArity()) { + return row1.getFloat(i); + } else { + return row2.getFloat(i - row1.getArity()); + } + } + + @Override + public double getDouble(int i) { + if (i < row1.getArity()) { + return row1.getDouble(i); + } else { + return row2.getDouble(i - row1.getArity()); + } + } + + @Override + public char getChar(int i) { + if (i < row1.getArity()) { + return row1.getChar(i); + } else { + return row2.getChar(i - row1.getArity()); + } + } + + @Override + public Decimal getDecimal(int i, int precision, int scale) { + if (i < row1.getArity()) { + return row1.getDecimal(i, precision, scale); + } else { + return row2.getDecimal(i - row1.getArity(), precision, scale); + } + } + + @Override + public BinaryGeneric getGeneric(int i) { + if (i < row1.getArity()) { +