[GitHub] [flink] JingsongLi commented on a change in pull request #7944: [FLINK-11863][table-runtime-blink] Introduce channel to read and write compressed data

2019-03-08 Thread GitBox
JingsongLi commented on a change in pull request #7944: 
[FLINK-11863][table-runtime-blink] Introduce channel to read and write 
compressed data
URL: https://github.com/apache/flink/pull/7944#discussion_r263989639
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/io/CompressedBlockChannelReader.java
 ##
 @@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.io;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.disk.iomanager.BlockChannelReader;
+import org.apache.flink.runtime.io.disk.iomanager.BufferFileReader;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.RequestDoneCallback;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+import org.apache.flink.table.runtime.compression.BlockCompressionFactory;
+import org.apache.flink.table.runtime.compression.BlockCompressor;
+import org.apache.flink.table.runtime.compression.BlockDecompressor;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Compressed block channel reader provides a scenario where MemorySegment 
must be maintained.
+ */
+public class CompressedBlockChannelReader
+   implements BlockChannelReader, 
RequestDoneCallback, BufferRecycler {
+
+   private final LinkedBlockingQueue blockQueue;
+   private final boolean copyCompress;
+   private final BlockDecompressor decompressor;
+   private final BufferFileReader reader;
+   private final AtomicReference cause;
+   private final LinkedBlockingQueue retBuffers = new 
LinkedBlockingQueue<>();
+
+   private byte[] buf;
+   private ByteBuffer bufWrapper;
+   private int offset;
+   private int len;
+
+   public CompressedBlockChannelReader(
+   IOManager ioManager,
+   ID channel,
+   LinkedBlockingQueue blockQueue,
+   BlockCompressionFactory codecFactory,
+   int preferBlockSize,
+   int segmentSize) throws IOException {
+   this.reader = ioManager.createBufferFileReader(channel, this);
+   this.blockQueue = blockQueue;
+   copyCompress = preferBlockSize > segmentSize * 2;
+   int blockSize = copyCompress ? preferBlockSize : segmentSize;
+   this.decompressor = codecFactory.getDecompressor();
+   cause = new AtomicReference<>();
+
+   if (copyCompress) {
+   this.buf = new byte[blockSize];
+   this.bufWrapper = ByteBuffer.wrap(buf);
+   }
+
+   BlockCompressor compressor = codecFactory.getCompressor();
+   for (int i = 0; i < 2; i++) {
+   MemorySegment segment = MemorySegmentFactory.wrap(new 
byte[compressor.getMaxCompressedSize(blockSize)]);
+   reader.readInto(new NetworkBuffer(segment, this));
+   }
+   }
+
+   @Override
+   public void readBlock(MemorySegment segment) throws IOException {
+   if (cause.get() != null) {
+   throw cause.get();
+   }
+
+   if (copyCompress) {
+   int readOffset = 0;
+   int readLen = segment.size();
+
+   while (readLen > 0) {
+   int copy = Math.min(readLen, len - offset);
+   if (copy == 0) {
+   readBuffer();
+   } else {
+   segment.put(readOffset, buf, offset, 
copy);
+  

[GitHub] [flink] KurtYoung merged pull request #7944: [FLINK-11863][table-runtime-blink] Introduce channel to read and write compressed data

2019-03-08 Thread GitBox
KurtYoung merged pull request #7944: [FLINK-11863][table-runtime-blink] 
Introduce channel to read and write compressed data
URL: https://github.com/apache/flink/pull/7944
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on issue #7944: [FLINK-11863][table-runtime-blink] Introduce channel to read and write compressed data

2019-03-08 Thread GitBox
KurtYoung commented on issue #7944: [FLINK-11863][table-runtime-blink] 
Introduce channel to read and write compressed data
URL: https://github.com/apache/flink/pull/7944#issuecomment-471146491
 
 
   Looks like we also need to some more refactoring after the merge. I have 
created https://issues.apache.org/jira/browse/FLINK-11864 to track the effort. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-11864) Let compressed channel reader/writer reuse the logic of AsynchronousFileIOChannel

2019-03-08 Thread Kurt Young (JIRA)
Kurt Young created FLINK-11864:
--

 Summary: Let compressed channel reader/writer reuse the logic of 
AsynchronousFileIOChannel
 Key: FLINK-11864
 URL: https://issues.apache.org/jira/browse/FLINK-11864
 Project: Flink
  Issue Type: Improvement
Reporter: Kurt Young


This is a follow up issue of 
[Flink-11863|https://issues.apache.org/jira/browse/FLINK-11863]. The introduced 
`CompressedBlockChannelReader` and `CompressedBlockChannelWriter` should reuse 
the logic of `AsynchronousFileIOChannel` by extending from it. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] link3280 commented on a change in pull request #7895: [FLINK-11126][YARN][security] Filter out AMRMToken in TaskManager‘s credentials

2019-03-08 Thread GitBox
link3280 commented on a change in pull request #7895: 
[FLINK-11126][YARN][security] Filter out AMRMToken in TaskManager‘s credentials
URL: https://github.com/apache/flink/pull/7895#discussion_r263984640
 
 

 ##
 File path: flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java
 ##
 @@ -58,4 +87,84 @@ public void testDeleteApplicationFiles() throws Exception {
assertThat(files.count(), equalTo(0L));
}
}
+
+   @Test
+   public void testCreateTaskExecutorContext() throws Exception {
 
 Review comment:
   Yeah, I think it's sort of complicated as a unit test and does not have good 
separation of concerns. Actually, the test is extended from a test in legacy 
mode that only verifies the local resources of TaskManagers. It's a good idea 
to refactor it in a util method and make it available in `flink-yarn-tests`, 
but I still want to to have a test for `Utils.createTaskExecutorContext`, so I 
prefer to separate the test into two and add an extra util method to verify 
tokens in `flink-yarn-tests`. What do you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] link3280 commented on a change in pull request #7895: [FLINK-11126][YARN][security] Filter out AMRMToken in TaskManager‘s credentials

2019-03-08 Thread GitBox
link3280 commented on a change in pull request #7895: 
[FLINK-11126][YARN][security] Filter out AMRMToken in TaskManager‘s credentials
URL: https://github.com/apache/flink/pull/7895#discussion_r263984640
 
 

 ##
 File path: flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java
 ##
 @@ -58,4 +87,84 @@ public void testDeleteApplicationFiles() throws Exception {
assertThat(files.count(), equalTo(0L));
}
}
+
+   @Test
+   public void testCreateTaskExecutorContext() throws Exception {
 
 Review comment:
   Yeah, I think it's sort of complicated as a unit test and does not have good 
separation of concerns. Actually, the test is extended from a test in legacy 
mode that only verifies the local resources of TaskManagers. It's a good idea 
to refactor it in a util method and make it available in `flink-yarn-tests`, 
but I still want to to have a test for `Utils.createTaskExecutorContext`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] link3280 commented on a change in pull request #7895: [FLINK-11126][YARN][security] Filter out AMRMToken in TaskManager‘s credentials

2019-03-08 Thread GitBox
link3280 commented on a change in pull request #7895: 
[FLINK-11126][YARN][security] Filter out AMRMToken in TaskManager‘s credentials
URL: https://github.com/apache/flink/pull/7895#discussion_r263984640
 
 

 ##
 File path: flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java
 ##
 @@ -58,4 +87,84 @@ public void testDeleteApplicationFiles() throws Exception {
assertThat(files.count(), equalTo(0L));
}
}
+
+   @Test
+   public void testCreateTaskExecutorContext() throws Exception {
 
 Review comment:
   Yeah, I think it's sort of complicated as a unit test and does not have good 
separation of concerns. Actually, the test is extended from a test in legacy 
mode that only verifies the local resources of TaskManagers. It's a good idea 
to refactor it in a util method and move it to `flink-yarn-tests`. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on issue #7944: [FLINK-11863][table-runtime-blink] Introduce channel to read and write compressed data

2019-03-08 Thread GitBox
KurtYoung commented on issue #7944: [FLINK-11863][table-runtime-blink] 
Introduce channel to read and write compressed data
URL: https://github.com/apache/flink/pull/7944#issuecomment-471141399
 
 
   @JingsongLi Thanks for the reviewing, i have addressed some of your 
comments. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] link3280 commented on a change in pull request #7895: [FLINK-11126][YARN][security] Filter out AMRMToken in TaskManager‘s credentials

2019-03-08 Thread GitBox
link3280 commented on a change in pull request #7895: 
[FLINK-11126][YARN][security] Filter out AMRMToken in TaskManager‘s credentials
URL: https://github.com/apache/flink/pull/7895#discussion_r263984332
 
 

 ##
 File path: flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
 ##
 @@ -565,7 +565,20 @@ static ContainerLaunchContext createTaskExecutorContext(
new File(fileLocation),

HadoopUtils.getHadoopConfiguration(flinkConfig));
 
-   cred.writeTokenStorageToStream(dob);
+   // Filter out AMRMToken before setting the 
tokens to the TaskManager container context.
+   Method getAllTokensMethod = 
Credentials.class.getMethod("getAllTokens");
+   Credentials taskManagerCred = new Credentials();
+   final Text amRmTokenKind = new 
Text("YARN_AM_RM_TOKEN");
 
 Review comment:
   Good point, I will make it as a `static final` member.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] link3280 commented on a change in pull request #7895: [FLINK-11126][YARN][security] Filter out AMRMToken in TaskManager‘s credentials

2019-03-08 Thread GitBox
link3280 commented on a change in pull request #7895: 
[FLINK-11126][YARN][security] Filter out AMRMToken in TaskManager‘s credentials
URL: https://github.com/apache/flink/pull/7895#discussion_r263984234
 
 

 ##
 File path: flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java
 ##
 @@ -18,25 +18,54 @@
 
 package org.apache.flink.yarn;
 
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.CommonTestUtils;
+import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
 import org.apache.flink.util.TestLogger;
 
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.File;
 import java.nio.file.Files;
 import java.nio.file.Path;
+import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.stream.Stream;
 
 import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
 
 Review comment:
   @klion26 Thanks for the input. Although I still think the mock should be 
relatively safe here, I will replace it with some real instances as you 
suggested. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #7944: [FLINK-11863][table-runtime-blink] Introduce channel to read and write compressed data

2019-03-08 Thread GitBox
JingsongLi commented on a change in pull request #7944: 
[FLINK-11863][table-runtime-blink] Introduce channel to read and write 
compressed data
URL: https://github.com/apache/flink/pull/7944#discussion_r263981676
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/io/CompressedHeaderlessChannelReaderInputView.java
 ##
 @@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.io;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import 
org.apache.flink.runtime.io.disk.iomanager.AbstractChannelReaderInputView;
+import org.apache.flink.runtime.io.disk.iomanager.BufferFileReader;
+import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.RequestDoneCallback;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+import org.apache.flink.table.runtime.compression.BlockCompressionFactory;
+import org.apache.flink.table.runtime.compression.BlockCompressor;
+import org.apache.flink.table.runtime.compression.BlockDecompressor;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ *
+ */
+public class CompressedHeaderlessChannelReaderInputView
+   extends AbstractChannelReaderInputView
+   implements RequestDoneCallback, BufferRecycler {
+
+   private final BlockDecompressor decompressor;
+   private final BufferFileReader reader;
+   private final MemorySegment uncompressedBuffer;
+   private final AtomicReference cause;
+
+   private final LinkedBlockingQueue retBuffers = new 
LinkedBlockingQueue<>();
+
+   private int numBlocksRemaining;
+   private int currentSegmentLimit;
+
+   public CompressedHeaderlessChannelReaderInputView(
+   FileIOChannel.ID id,
+   IOManager ioManager,
+   BlockCompressionFactory compressionCodecFactory,
+   int compressionBlockSize,
+   int numBlocks) throws IOException {
+   super(0);
+   this.numBlocksRemaining = numBlocks;
+   this.reader = ioManager.createBufferFileReader(id, this);
+   uncompressedBuffer = MemorySegmentFactory.wrap(new 
byte[compressionBlockSize]);
+   decompressor = compressionCodecFactory.getDecompressor();
+   cause = new AtomicReference<>();
+
+   BlockCompressor compressor = 
compressionCodecFactory.getCompressor();
+   for (int i = 0; i < 2; i++) {
+   MemorySegment segment = MemorySegmentFactory.wrap(new 
byte[compressor.getMaxCompressedSize(
+   compressionBlockSize)]);
+   reader.readInto(new NetworkBuffer(segment, this));
+   }
+   }
+
+   @Override
+   protected MemorySegment nextSegment(MemorySegment current) throws 
IOException {
+   if (cause.get() != null) {
+   throw cause.get();
+   }
+
+   // check for end-of-stream
+   if (this.numBlocksRemaining <= 0) {
+   this.reader.close();
+   throw new EOFException();
+   }
+
+   try {
+   Buffer buffer;
+   while ((buffer = retBuffers.poll(1, TimeUnit.SECONDS)) 
== null) {
+   if (cause.get() != null) {
 
 Review comment:
   every loop should check closed too?
   same to AsynchronousBlockReader.getNextReturnedBlock


This is an automated message from the Apache Git Service.
To respond to the 

[GitHub] [flink] JingsongLi commented on a change in pull request #7944: [FLINK-11863][table-runtime-blink] Introduce channel to read and write compressed data

2019-03-08 Thread GitBox
JingsongLi commented on a change in pull request #7944: 
[FLINK-11863][table-runtime-blink] Introduce channel to read and write 
compressed data
URL: https://github.com/apache/flink/pull/7944#discussion_r263981141
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/memory/AbstractPagedInputView.java
 ##
 @@ -160,6 +160,11 @@ protected final void advance() throws IOException {
this.positionInSegment = this.headerLength;
}
 
+   /**
+* This method will run after advance(). Override it if you want to use 
it.
+*/
+   protected void afterAdvance() {}
 
 Review comment:
   Is afterAdvance() just to support setting initial offset of InputView? It's 
a little too heavy to add an method afterAdvance(). Consider override advance() 
in the HeaderlessChannelReaderInputView and set positionInSegment the first 
time?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #7944: [FLINK-11863][table-runtime-blink] Introduce channel to read and write compressed data

2019-03-08 Thread GitBox
JingsongLi commented on a change in pull request #7944: 
[FLINK-11863][table-runtime-blink] Introduce channel to read and write 
compressed data
URL: https://github.com/apache/flink/pull/7944#discussion_r263981754
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/FileChannelUtil.java
 ##
 @@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.util;
+
+import org.apache.flink.core.memory.MemorySegment;
+import 
org.apache.flink.runtime.io.disk.iomanager.AbstractChannelReaderInputView;
+import 
org.apache.flink.runtime.io.disk.iomanager.AbstractChannelWriterOutputView;
+import org.apache.flink.runtime.io.disk.iomanager.BlockChannelReader;
+import org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter;
+import org.apache.flink.runtime.io.disk.iomanager.BufferFileWriter;
+import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
+import 
org.apache.flink.runtime.io.disk.iomanager.HeaderlessChannelReaderInputView;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.table.runtime.compression.BlockCompressionFactory;
+import org.apache.flink.table.runtime.io.ChannelWithMeta;
+import org.apache.flink.table.runtime.io.CompressedBlockChannelReader;
+import org.apache.flink.table.runtime.io.CompressedBlockChannelWriter;
+import 
org.apache.flink.table.runtime.io.CompressedHeaderlessChannelReaderInputView;
+import 
org.apache.flink.table.runtime.io.CompressedHeaderlessChannelWriterOutputView;
+import org.apache.flink.table.runtime.io.HeaderlessChannelWriterOutputView;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import static 
org.apache.flink.core.memory.MemorySegmentFactory.allocateUnpooledSegment;
+
+/**
+ * File channel util for runtime.
+ */
+public class FileChannelUtil {
 
 Review comment:
   some comment to method?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #7944: [FLINK-11863][table-runtime-blink] Introduce channel to read and write compressed data

2019-03-08 Thread GitBox
JingsongLi commented on a change in pull request #7944: 
[FLINK-11863][table-runtime-blink] Introduce channel to read and write 
compressed data
URL: https://github.com/apache/flink/pull/7944#discussion_r263981558
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/io/CompressedHeaderlessChannelReaderInputView.java
 ##
 @@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.io;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import 
org.apache.flink.runtime.io.disk.iomanager.AbstractChannelReaderInputView;
+import org.apache.flink.runtime.io.disk.iomanager.BufferFileReader;
+import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.RequestDoneCallback;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+import org.apache.flink.table.runtime.compression.BlockCompressionFactory;
+import org.apache.flink.table.runtime.compression.BlockCompressor;
+import org.apache.flink.table.runtime.compression.BlockDecompressor;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ *
 
 Review comment:
   some comment


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #7944: [FLINK-11863][table-runtime-blink] Introduce channel to read and write compressed data

2019-03-08 Thread GitBox
JingsongLi commented on a change in pull request #7944: 
[FLINK-11863][table-runtime-blink] Introduce channel to read and write 
compressed data
URL: https://github.com/apache/flink/pull/7944#discussion_r263981774
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/io/CompressedHeaderlessChannelTest.java
 ##
 @@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.io;
+
+import org.apache.flink.runtime.io.disk.iomanager.BufferFileWriter;
+import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.table.runtime.compression.BlockCompressionFactory;
+import org.apache.flink.table.runtime.compression.Lz4BlockCompressionFactory;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link CompressedHeaderlessChannelReaderInputView} and
+ * {@link CompressedHeaderlessChannelWriterOutputView}.
+ */
+public class CompressedHeaderlessChannelTest {
 
 Review comment:
   Test to CompressedBlockChannelReader and writer too?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #7944: [FLINK-11863][table-runtime-blink] Introduce channel to read and write compressed data

2019-03-08 Thread GitBox
JingsongLi commented on a change in pull request #7944: 
[FLINK-11863][table-runtime-blink] Introduce channel to read and write 
compressed data
URL: https://github.com/apache/flink/pull/7944#discussion_r263981511
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/io/CompressedBlockChannelReader.java
 ##
 @@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.io;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.disk.iomanager.BlockChannelReader;
+import org.apache.flink.runtime.io.disk.iomanager.BufferFileReader;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.RequestDoneCallback;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+import org.apache.flink.table.runtime.compression.BlockCompressionFactory;
+import org.apache.flink.table.runtime.compression.BlockCompressor;
+import org.apache.flink.table.runtime.compression.BlockDecompressor;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Compressed block channel reader provides a scenario where MemorySegment 
must be maintained.
+ */
+public class CompressedBlockChannelReader implements 
BlockChannelReader,
+   RequestDoneCallback,
+   BufferRecycler {
+
+   private final LinkedBlockingQueue blockQueue;
+
+   private final boolean copyCompress;
+   private final BlockDecompressor decompressor;
+   private final BufferFileReader reader;
+   private final AtomicReference cause;
+
+   private final LinkedBlockingQueue retBuffers = new 
LinkedBlockingQueue<>();
+
+   private byte[] buf;
+   private ByteBuffer bufWrapper;
+   private int offset;
+   private int len;
+
+   public CompressedBlockChannelReader(
+   IOManager ioManager, ID channel,
+   LinkedBlockingQueue blockQueue,
+   BlockCompressionFactory codecFactory, int 
preferBlockSize, int segmentSize) throws IOException {
+   this.reader = ioManager.createBufferFileReader(channel, this);
+   this.blockQueue = blockQueue;
+   copyCompress = preferBlockSize > segmentSize * 2;
+   int blockSize = copyCompress ? preferBlockSize : segmentSize;
+   this.decompressor = codecFactory.getDecompressor();
+   cause = new AtomicReference<>();
+
+   if (copyCompress) {
+   this.buf = new byte[blockSize];
+   this.bufWrapper = ByteBuffer.wrap(buf);
+   }
+
+   BlockCompressor compressor = codecFactory.getCompressor();
+   for (int i = 0; i < 2; i++) {
+   MemorySegment segment = MemorySegmentFactory.wrap(new 
byte[compressor.getMaxCompressedSize(blockSize)]);
+   reader.readInto(new NetworkBuffer(segment, this));
+   }
+   }
+
+   @Override
+   public void readBlock(MemorySegment segment) throws IOException {
+   if (cause.get() != null) {
+   throw cause.get();
+   }
+
+   if (copyCompress) {
+   int readOffset = 0;
+   int readLen = segment.size();
+
+   while (readLen > 0) {
+   int copy = Math.min(readLen, len - offset);
+   if (copy == 0) {
+   readBuffer();
+   } else {
+   segment.put(readOffset, buf, offset, 
copy);
+   offset += copy;
+ 

[GitHub] [flink] JingsongLi commented on a change in pull request #7944: [FLINK-11863][table-runtime-blink] Introduce channel to read and write compressed data

2019-03-08 Thread GitBox
JingsongLi commented on a change in pull request #7944: 
[FLINK-11863][table-runtime-blink] Introduce channel to read and write 
compressed data
URL: https://github.com/apache/flink/pull/7944#discussion_r263981370
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/io/CompressedBlockChannelReader.java
 ##
 @@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.io;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.disk.iomanager.BlockChannelReader;
+import org.apache.flink.runtime.io.disk.iomanager.BufferFileReader;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.RequestDoneCallback;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+import org.apache.flink.table.runtime.compression.BlockCompressionFactory;
+import org.apache.flink.table.runtime.compression.BlockCompressor;
+import org.apache.flink.table.runtime.compression.BlockDecompressor;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Compressed block channel reader provides a scenario where MemorySegment 
must be maintained.
+ */
+public class CompressedBlockChannelReader implements 
BlockChannelReader,
+   RequestDoneCallback,
+   BufferRecycler {
+
+   private final LinkedBlockingQueue blockQueue;
+
+   private final boolean copyCompress;
+   private final BlockDecompressor decompressor;
+   private final BufferFileReader reader;
+   private final AtomicReference cause;
+
+   private final LinkedBlockingQueue retBuffers = new 
LinkedBlockingQueue<>();
+
+   private byte[] buf;
+   private ByteBuffer bufWrapper;
+   private int offset;
+   private int len;
+
+   public CompressedBlockChannelReader(
+   IOManager ioManager, ID channel,
+   LinkedBlockingQueue blockQueue,
+   BlockCompressionFactory codecFactory, int 
preferBlockSize, int segmentSize) throws IOException {
+   this.reader = ioManager.createBufferFileReader(channel, this);
+   this.blockQueue = blockQueue;
+   copyCompress = preferBlockSize > segmentSize * 2;
 
 Review comment:
   You can add a comment to explain that in order to avoid multiple copies, 
only when preferBlockSize > segmentSize * 2 will new a buffer to compress.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #7944: [FLINK-11863][table-runtime-blink] Introduce channel to read and write compressed data

2019-03-08 Thread GitBox
JingsongLi commented on a change in pull request #7944: 
[FLINK-11863][table-runtime-blink] Introduce channel to read and write 
compressed data
URL: https://github.com/apache/flink/pull/7944#discussion_r263981423
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/io/CompressedBlockChannelWriter.java
 ##
 @@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.io;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter;
+import org.apache.flink.runtime.io.disk.iomanager.BufferFileWriter;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+import org.apache.flink.table.runtime.compression.BlockCompressionFactory;
+import org.apache.flink.table.runtime.compression.BlockCompressor;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Compressed block channel writer provides a scenario where MemorySegment 
must be maintained.
+ */
+public class CompressedBlockChannelWriter implements 
BlockChannelWriter,
+   BufferRecycler {
+
+   private final LinkedBlockingQueue blockQueue;
+   private final LinkedBlockingQueue compressedBuffers = 
new LinkedBlockingQueue<>();
+
+   private final BufferFileWriter writer;
+   private final boolean copyCompress;
+   private final BlockCompressor compressor;
+
+   private byte[] buf;
+   private ByteBuffer bufWrapper;
+   private int count;
+
+   public CompressedBlockChannelWriter(
+   IOManager ioManager, ID channel,
+   LinkedBlockingQueue blockQueue,
+   BlockCompressionFactory codecFactory, int 
preferBlockSize, int segmentSize) throws IOException {
+   this.writer = ioManager.createBufferFileWriter(channel);
+   this.blockQueue = blockQueue;
+   copyCompress = preferBlockSize > segmentSize * 2;
 
 Review comment:
   same to BlockReader


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #7944: [FLINK-11863][table-runtime-blink] Introduce channel to read and write compressed data

2019-03-08 Thread GitBox
flinkbot commented on issue #7944: [FLINK-11863][table-runtime-blink] Introduce 
channel to read and write compressed data
URL: https://github.com/apache/flink/pull/7944#issuecomment-471133713
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-11863) Introduce channel to read and write compressed data

2019-03-08 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11863?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-11863:
---
Labels: pull-request-available  (was: )

> Introduce channel to read and write compressed data
> ---
>
> Key: FLINK-11863
> URL: https://issues.apache.org/jira/browse/FLINK-11863
> Project: Flink
>  Issue Type: New Feature
>Reporter: Kurt Young
>Assignee: Kurt Young
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] KurtYoung commented on issue #7944: [FLINK-11863][table-runtime-blink] Introduce channel to read and write compressed data

2019-03-08 Thread GitBox
KurtYoung commented on issue #7944: [FLINK-11863][table-runtime-blink] 
Introduce channel to read and write compressed data
URL: https://github.com/apache/flink/pull/7944#issuecomment-471133695
 
 
   cc @JingsongLi 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung opened a new pull request #7944: [FLINK-11863][table-runtime-blink] Introduce channel to read and write compressed data

2019-03-08 Thread GitBox
KurtYoung opened a new pull request #7944: [FLINK-11863][table-runtime-blink] 
Introduce channel to read and write compressed data
URL: https://github.com/apache/flink/pull/7944
 
 
   
   
   ## What is the purpose of the change
   
   Introduce compressed channel reader/writer, and compressed headerless 
channel reader/writer based on our `BlockCompressionFactory`
   ## Brief change log
   
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
 - Added unit test
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11539) Add TypeSerializerSnapshot for TraversableSerializer

2019-03-08 Thread JIRA


[ 
https://issues.apache.org/jira/browse/FLINK-11539?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16788325#comment-16788325
 ] 

Jürgen Kreileder commented on FLINK-11539:
--

[~aljoscha] Does this come with an expected slow-down in stream graph building 
due to compilation? Jobs that took seconds to start with 1.7.2 take up to 15 
minutes to start with 1.8-SNAPSHOT.

All "Transforming ... " steps from StreamGraphGenerator seem to be slower, some 
taking up to 5 seconds now.

> Add TypeSerializerSnapshot for TraversableSerializer
> 
>
> Key: FLINK-11539
> URL: https://issues.apache.org/jira/browse/FLINK-11539
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Type Serialization System
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> This will replace the deprecated {{TypeSerializerConfigSnapshot}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11420) Serialization of case classes containing a Map[String, Any] sometimes throws ArrayIndexOutOfBoundsException

2019-03-08 Thread JIRA


[ 
https://issues.apache.org/jira/browse/FLINK-11420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16788315#comment-16788315
 ] 

Jürgen Kreileder commented on FLINK-11420:
--

Yep, concurrent access:
{code:java}
2019-03-08 21:11:55,674 DEBUG 
org.apache.flink.streaming.runtime.tasks.StreamTask           - Cleanup 
AsyncCheckpointRunnable for checkpoint 9 of Generate Finding Commands 377823 
(1/1).
2019-03-08 21:11:55,680 INFO  org.apache.flink.runtime.taskmanager.Task         
            - Generate Finding Commands 377823 (1/1) 
(8bed70dd3de95f9c0c82daa19679a4be) switched from RUNNING to FAILED.
java.lang.IllegalStateException: Concurrent access to KryoSerializer. Thread 1: 
Generate Finding Commands 377823 (1/1) , Thread 2: pool-138-thread-1
        at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.enterExclusiveThread(KryoSerializer.java:630)
        at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:242)
        at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:92)
        at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
        at 
org.apache.flink.api.scala.typeutils.TraversableSerializer.$anonfun$copy$1(TraversableSerializer.scala:90)
        at 
scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:230)
        at 
scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:461)
        at 
org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:90)
        at 
org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:33)
        at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:92)
        at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
        at 
org.apache.flink.api.scala.typeutils.TraversableSerializer.$anonfun$copy$1(TraversableSerializer.scala:90)
        at 
scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:230)
        at 
scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:461)
        at 
scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:461)
        at 
scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:461)
        at 
org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:90)
        at 
org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:33)
        at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:92)
        at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
        at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:92)
        at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
        at 
org.apache.flink.streaming.api.datastream.CoGroupedStreams$UnionSerializer.copy(CoGroupedStreams.java:537)
        at 
org.apache.flink.streaming.api.datastream.CoGroupedStreams$UnionSerializer.copy(CoGroupedStreams.java:506)
        at 
org.apache.flink.api.common.typeutils.base.ListSerializer.copy(ListSerializer.java:99)
        at 
org.apache.flink.api.common.typeutils.base.ListSerializer.copy(ListSerializer.java:42)
        at 
org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:297)
        at 
org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:321)
        at 
org.apache.flink.runtime.state.heap.HeapListState.add(HeapListState.java:95)
        at 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:391)
        at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
        at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
        at java.lang.Thread.run(Thread.java:748)
{code}
The slow-down in job submission is unrelated. I think it's caused by 
FLINK-11539.

> Serialization of case classes containing a Map[String, Any] sometimes throws 
> ArrayIndexOutOfBoundsException
> ---
>
> Key: FLINK-11420
> URL: https://issues.apache.org/jira/browse/FLINK-11420
> Project: Flink
>  Issue Type: Bug
>  Components: API / Type Serialization System
>Affects Versions: 1.7.1
>Reporter: Jürgen Kreileder
>Assignee: Dawid 

[jira] [Commented] (FLINK-11420) Serialization of case classes containing a Map[String, Any] sometimes throws ArrayIndexOutOfBoundsException

2019-03-08 Thread Dawid Wysakowicz (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16788302#comment-16788302
 ] 

Dawid Wysakowicz commented on FLINK-11420:
--

I wouldn't expect any of the changes introduced to fix this issue to have 
impact on start up time.

> Serialization of case classes containing a Map[String, Any] sometimes throws 
> ArrayIndexOutOfBoundsException
> ---
>
> Key: FLINK-11420
> URL: https://issues.apache.org/jira/browse/FLINK-11420
> Project: Flink
>  Issue Type: Bug
>  Components: API / Type Serialization System
>Affects Versions: 1.7.1
>Reporter: Jürgen Kreileder
>Assignee: Dawid Wysakowicz
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.7.3, 1.8.0
>
>  Time Spent: 1h
>  Remaining Estimate: 0h
>
> We frequently run into random ArrayIndexOutOfBounds exceptions when flink 
> tries to serialize Scala case classes containing a Map[String, Any] (Any 
> being String, Long, Int, or Boolean) with the FsStateBackend. (This probably 
> happens with any case class containing a type requiring Kryo, see this thread 
> for instance: 
> [http://mail-archives.apache.org/mod_mbox/flink-user/201710.mbox/%3cCANNGFpjX4gjV=df6tlfeojsb_rhwxs_ruoylqcqv2gvwqtt...@mail.gmail.com%3e])
> Disabling asynchronous snapshots seems to work around the problem, so maybe 
> something is not thread-safe in CaseClassSerializer.
> Our objects look like this:
> {code}
> case class Event(timestamp: Long, [...], content: Map[String, Any]
> case class EnrichedEvent(event: Event, additionalInfo: Map[String, Any])
> {code}
> I've looked at a few of the exceptions in a debugger. It always happens when 
> serializing the right-hand side a tuple from EnrichedEvent -> Event -> 
> content, e.g: 13 from ("foo", 13) or false from ("bar", false).
> Stacktrace:
> {code:java}
> java.lang.ArrayIndexOutOfBoundsException: Index -1 out of bounds for length 0
>  at com.esotericsoftware.kryo.util.IntArray.pop(IntArray.java:157)
>  at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:822)
>  at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:863)
>  at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:217)
>  at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101)
>  at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
>  at 
> org.apache.flink.api.scala.typeutils.TraversableSerializer.$anonfun$copy$1(TraversableSerializer.scala:69)
>  at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:234)
>  at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:465)
>  at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:465)
>  at 
> org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:69)
>  at 
> org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:33)
>  at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101)
>  at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
>  at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101)
>  at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
>  at 
> org.apache.flink.api.common.typeutils.base.ListSerializer.copy(ListSerializer.java:99)
>  at 
> org.apache.flink.api.common.typeutils.base.ListSerializer.copy(ListSerializer.java:42)
>  at 
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:287)
>  at 
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:311)
>  at 
> org.apache.flink.runtime.state.heap.HeapListState.add(HeapListState.java:95)
>  at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:391)
>  at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
>  at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
>  at java.base/java.lang.Thread.run(Thread.java:834){code}
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11654) Multiple transactional KafkaProducers writing to same cluster have clashing transaction IDs

2019-03-08 Thread JIRA


[ 
https://issues.apache.org/jira/browse/FLINK-11654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16788296#comment-16788296
 ] 

Jürgen Kreileder commented on FLINK-11654:
--

I agree with with [~cslotterback] that it is a usable work-around but it's 
basically making a UUID out of a formerly job-local UID which isn't very 
obvious to users.

> Multiple transactional KafkaProducers writing to same cluster have clashing 
> transaction IDs
> ---
>
> Key: FLINK-11654
> URL: https://issues.apache.org/jira/browse/FLINK-11654
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.7.1
>Reporter: Jürgen Kreileder
>Priority: Major
> Fix For: 1.9.0
>
>
> We run multiple jobs on a cluster which write a lot to the same Kafka topic 
> from identically named sinks. When EXACTLY_ONCE semantic is enabled for the 
> KafkaProducers we run into a lot of ProducerFencedExceptions and all jobs go 
> into a restart cycle.
> Example exception from the Kafka log:
>  
> {code:java}
> [2019-02-18 18:05:28,485] ERROR [ReplicaManager broker=1] Error processing 
> append operation on partition finding-commands-dev-1-0 
> (kafka.server.ReplicaManager)
> org.apache.kafka.common.errors.ProducerFencedException: Producer's epoch is 
> no longer valid. There is probably another producer with a newer epoch. 483 
> (request epoch), 484 (server epoch)
> {code}
> The reason for this is the way FlinkKafkaProducer initializes the 
> TransactionalIdsGenerator:
> The IDs are only guaranteed to be unique for a single Job. But they can clash 
> between different Jobs (and Clusters).
>  
>  
> {code:java}
> --- 
> a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
> +++ 
> b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
> @@ -819,6 +819,7 @@ public class FlinkKafkaProducer
>                 nextTransactionalIdHintState = 
> context.getOperatorStateStore().getUnionListState(
>                         NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR);
>                 transactionalIdsGenerator = new TransactionalIdsGenerator(
> + // the prefix probably should include job id and maybe cluster id
>                         getRuntimeContext().getTaskName() + "-" + 
> ((StreamingRuntimeContext) getRuntimeContext()).getOperatorUniqueID(),
>                         getRuntimeContext().getIndexOfThisSubtask(),
>                         
> getRuntimeContext().getNumberOfParallelSubtasks(),{code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11654) Multiple transactional KafkaProducers writing to same cluster have clashing transaction IDs

2019-03-08 Thread Tim (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16788274#comment-16788274
 ] 

Tim commented on FLINK-11654:
-

I did try setting UID to a random 32-character hex string, but it made no 
difference to the `transaction.id`.   Maybe I did something wrong.   I'm on 
Flink 1.7.1, and running on a single machine with 12 TMs (and 1 slot per TM).

> Multiple transactional KafkaProducers writing to same cluster have clashing 
> transaction IDs
> ---
>
> Key: FLINK-11654
> URL: https://issues.apache.org/jira/browse/FLINK-11654
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.7.1
>Reporter: Jürgen Kreileder
>Priority: Major
> Fix For: 1.9.0
>
>
> We run multiple jobs on a cluster which write a lot to the same Kafka topic 
> from identically named sinks. When EXACTLY_ONCE semantic is enabled for the 
> KafkaProducers we run into a lot of ProducerFencedExceptions and all jobs go 
> into a restart cycle.
> Example exception from the Kafka log:
>  
> {code:java}
> [2019-02-18 18:05:28,485] ERROR [ReplicaManager broker=1] Error processing 
> append operation on partition finding-commands-dev-1-0 
> (kafka.server.ReplicaManager)
> org.apache.kafka.common.errors.ProducerFencedException: Producer's epoch is 
> no longer valid. There is probably another producer with a newer epoch. 483 
> (request epoch), 484 (server epoch)
> {code}
> The reason for this is the way FlinkKafkaProducer initializes the 
> TransactionalIdsGenerator:
> The IDs are only guaranteed to be unique for a single Job. But they can clash 
> between different Jobs (and Clusters).
>  
>  
> {code:java}
> --- 
> a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
> +++ 
> b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
> @@ -819,6 +819,7 @@ public class FlinkKafkaProducer
>                 nextTransactionalIdHintState = 
> context.getOperatorStateStore().getUnionListState(
>                         NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR);
>                 transactionalIdsGenerator = new TransactionalIdsGenerator(
> + // the prefix probably should include job id and maybe cluster id
>                         getRuntimeContext().getTaskName() + "-" + 
> ((StreamingRuntimeContext) getRuntimeContext()).getOperatorUniqueID(),
>                         getRuntimeContext().getIndexOfThisSubtask(),
>                         
> getRuntimeContext().getNumberOfParallelSubtasks(),{code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11420) Serialization of case classes containing a Map[String, Any] sometimes throws ArrayIndexOutOfBoundsException

2019-03-08 Thread JIRA


[ 
https://issues.apache.org/jira/browse/FLINK-11420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16788272#comment-16788272
 ] 

Jürgen Kreileder commented on FLINK-11420:
--

Hi [~dawidwys],

I'm just playing around a bit with 1.8 without restoring any state. I'll give 
DEBUG a try.

Testing is a bit of a pain, though: The test jobs take around 10 minutes to 
actually start, it took only seconds with 1.7. The culprit seems to be the new 
compilation code TraversableSerializer. Is that a an expected phenomenon? 

> Serialization of case classes containing a Map[String, Any] sometimes throws 
> ArrayIndexOutOfBoundsException
> ---
>
> Key: FLINK-11420
> URL: https://issues.apache.org/jira/browse/FLINK-11420
> Project: Flink
>  Issue Type: Bug
>  Components: API / Type Serialization System
>Affects Versions: 1.7.1
>Reporter: Jürgen Kreileder
>Assignee: Dawid Wysakowicz
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.7.3, 1.8.0
>
>  Time Spent: 1h
>  Remaining Estimate: 0h
>
> We frequently run into random ArrayIndexOutOfBounds exceptions when flink 
> tries to serialize Scala case classes containing a Map[String, Any] (Any 
> being String, Long, Int, or Boolean) with the FsStateBackend. (This probably 
> happens with any case class containing a type requiring Kryo, see this thread 
> for instance: 
> [http://mail-archives.apache.org/mod_mbox/flink-user/201710.mbox/%3cCANNGFpjX4gjV=df6tlfeojsb_rhwxs_ruoylqcqv2gvwqtt...@mail.gmail.com%3e])
> Disabling asynchronous snapshots seems to work around the problem, so maybe 
> something is not thread-safe in CaseClassSerializer.
> Our objects look like this:
> {code}
> case class Event(timestamp: Long, [...], content: Map[String, Any]
> case class EnrichedEvent(event: Event, additionalInfo: Map[String, Any])
> {code}
> I've looked at a few of the exceptions in a debugger. It always happens when 
> serializing the right-hand side a tuple from EnrichedEvent -> Event -> 
> content, e.g: 13 from ("foo", 13) or false from ("bar", false).
> Stacktrace:
> {code:java}
> java.lang.ArrayIndexOutOfBoundsException: Index -1 out of bounds for length 0
>  at com.esotericsoftware.kryo.util.IntArray.pop(IntArray.java:157)
>  at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:822)
>  at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:863)
>  at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:217)
>  at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101)
>  at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
>  at 
> org.apache.flink.api.scala.typeutils.TraversableSerializer.$anonfun$copy$1(TraversableSerializer.scala:69)
>  at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:234)
>  at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:465)
>  at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:465)
>  at 
> org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:69)
>  at 
> org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:33)
>  at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101)
>  at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
>  at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101)
>  at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
>  at 
> org.apache.flink.api.common.typeutils.base.ListSerializer.copy(ListSerializer.java:99)
>  at 
> org.apache.flink.api.common.typeutils.base.ListSerializer.copy(ListSerializer.java:42)
>  at 
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:287)
>  at 
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:311)
>  at 
> org.apache.flink.runtime.state.heap.HeapListState.add(HeapListState.java:95)
>  at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:391)
>  at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
>  at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
>  at java.base/java.lang.Thread.run(Thread.java:834){code}
>  
>  
>  



--
This message was sent by Atlassian JIRA

[jira] [Commented] (FLINK-11654) Multiple transactional KafkaProducers writing to same cluster have clashing transaction IDs

2019-03-08 Thread Dawid Wysakowicz (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16788251#comment-16788251
 ] 

Dawid Wysakowicz commented on FLINK-11654:
--

Adding to making this setting mandatory, we recently introduced (in 1.8) a flag 
that forces to set uids for all operators in a job. See [FLINK-11653]

> Multiple transactional KafkaProducers writing to same cluster have clashing 
> transaction IDs
> ---
>
> Key: FLINK-11654
> URL: https://issues.apache.org/jira/browse/FLINK-11654
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.7.1
>Reporter: Jürgen Kreileder
>Priority: Major
> Fix For: 1.9.0
>
>
> We run multiple jobs on a cluster which write a lot to the same Kafka topic 
> from identically named sinks. When EXACTLY_ONCE semantic is enabled for the 
> KafkaProducers we run into a lot of ProducerFencedExceptions and all jobs go 
> into a restart cycle.
> Example exception from the Kafka log:
>  
> {code:java}
> [2019-02-18 18:05:28,485] ERROR [ReplicaManager broker=1] Error processing 
> append operation on partition finding-commands-dev-1-0 
> (kafka.server.ReplicaManager)
> org.apache.kafka.common.errors.ProducerFencedException: Producer's epoch is 
> no longer valid. There is probably another producer with a newer epoch. 483 
> (request epoch), 484 (server epoch)
> {code}
> The reason for this is the way FlinkKafkaProducer initializes the 
> TransactionalIdsGenerator:
> The IDs are only guaranteed to be unique for a single Job. But they can clash 
> between different Jobs (and Clusters).
>  
>  
> {code:java}
> --- 
> a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
> +++ 
> b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
> @@ -819,6 +819,7 @@ public class FlinkKafkaProducer
>                 nextTransactionalIdHintState = 
> context.getOperatorStateStore().getUnionListState(
>                         NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR);
>                 transactionalIdsGenerator = new TransactionalIdsGenerator(
> + // the prefix probably should include job id and maybe cluster id
>                         getRuntimeContext().getTaskName() + "-" + 
> ((StreamingRuntimeContext) getRuntimeContext()).getOperatorUniqueID(),
>                         getRuntimeContext().getIndexOfThisSubtask(),
>                         
> getRuntimeContext().getNumberOfParallelSubtasks(),{code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11420) Serialization of case classes containing a Map[String, Any] sometimes throws ArrayIndexOutOfBoundsException

2019-03-08 Thread Dawid Wysakowicz (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16788255#comment-16788255
 ] 

Dawid Wysakowicz commented on FLINK-11420:
--

Hi [~jkreileder]
Have you tried enabling DEBUG logs? Have you seen exceptions claiming 
concurrent access to KryoSerializer?

Another idea, are you trying to restore from checkpoint taken with a previous 
version? I just wonder if the checkpoint might be corrupted if it was taken 
when this bug was still present. [~srichter] What do you think?

> Serialization of case classes containing a Map[String, Any] sometimes throws 
> ArrayIndexOutOfBoundsException
> ---
>
> Key: FLINK-11420
> URL: https://issues.apache.org/jira/browse/FLINK-11420
> Project: Flink
>  Issue Type: Bug
>  Components: API / Type Serialization System
>Affects Versions: 1.7.1
>Reporter: Jürgen Kreileder
>Assignee: Dawid Wysakowicz
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.7.3, 1.8.0
>
>  Time Spent: 1h
>  Remaining Estimate: 0h
>
> We frequently run into random ArrayIndexOutOfBounds exceptions when flink 
> tries to serialize Scala case classes containing a Map[String, Any] (Any 
> being String, Long, Int, or Boolean) with the FsStateBackend. (This probably 
> happens with any case class containing a type requiring Kryo, see this thread 
> for instance: 
> [http://mail-archives.apache.org/mod_mbox/flink-user/201710.mbox/%3cCANNGFpjX4gjV=df6tlfeojsb_rhwxs_ruoylqcqv2gvwqtt...@mail.gmail.com%3e])
> Disabling asynchronous snapshots seems to work around the problem, so maybe 
> something is not thread-safe in CaseClassSerializer.
> Our objects look like this:
> {code}
> case class Event(timestamp: Long, [...], content: Map[String, Any]
> case class EnrichedEvent(event: Event, additionalInfo: Map[String, Any])
> {code}
> I've looked at a few of the exceptions in a debugger. It always happens when 
> serializing the right-hand side a tuple from EnrichedEvent -> Event -> 
> content, e.g: 13 from ("foo", 13) or false from ("bar", false).
> Stacktrace:
> {code:java}
> java.lang.ArrayIndexOutOfBoundsException: Index -1 out of bounds for length 0
>  at com.esotericsoftware.kryo.util.IntArray.pop(IntArray.java:157)
>  at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:822)
>  at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:863)
>  at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:217)
>  at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101)
>  at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
>  at 
> org.apache.flink.api.scala.typeutils.TraversableSerializer.$anonfun$copy$1(TraversableSerializer.scala:69)
>  at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:234)
>  at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:465)
>  at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:465)
>  at 
> org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:69)
>  at 
> org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:33)
>  at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101)
>  at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
>  at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101)
>  at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
>  at 
> org.apache.flink.api.common.typeutils.base.ListSerializer.copy(ListSerializer.java:99)
>  at 
> org.apache.flink.api.common.typeutils.base.ListSerializer.copy(ListSerializer.java:42)
>  at 
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:287)
>  at 
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:311)
>  at 
> org.apache.flink.runtime.state.heap.HeapListState.add(HeapListState.java:95)
>  at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:391)
>  at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
>  at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
>  at java.base/java.lang.Thread.run(Thread.java:834){code}
>  
>  
>  



--
This message was sent by Atlassian JIRA

[jira] [Comment Edited] (FLINK-11654) Multiple transactional KafkaProducers writing to same cluster have clashing transaction IDs

2019-03-08 Thread Dawid Wysakowicz (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16788251#comment-16788251
 ] 

Dawid Wysakowicz edited comment on FLINK-11654 at 3/8/19 8:04 PM:
--

Adding to making this setting mandatory, we recently introduced (in 1.9) a flag 
that forces to set uids for all operators in a job. See [FLINK-11653]


was (Author: dawidwys):
Adding to making this setting mandatory, we recently introduced (in 1.8) a flag 
that forces to set uids for all operators in a job. See [FLINK-11653]

> Multiple transactional KafkaProducers writing to same cluster have clashing 
> transaction IDs
> ---
>
> Key: FLINK-11654
> URL: https://issues.apache.org/jira/browse/FLINK-11654
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.7.1
>Reporter: Jürgen Kreileder
>Priority: Major
> Fix For: 1.9.0
>
>
> We run multiple jobs on a cluster which write a lot to the same Kafka topic 
> from identically named sinks. When EXACTLY_ONCE semantic is enabled for the 
> KafkaProducers we run into a lot of ProducerFencedExceptions and all jobs go 
> into a restart cycle.
> Example exception from the Kafka log:
>  
> {code:java}
> [2019-02-18 18:05:28,485] ERROR [ReplicaManager broker=1] Error processing 
> append operation on partition finding-commands-dev-1-0 
> (kafka.server.ReplicaManager)
> org.apache.kafka.common.errors.ProducerFencedException: Producer's epoch is 
> no longer valid. There is probably another producer with a newer epoch. 483 
> (request epoch), 484 (server epoch)
> {code}
> The reason for this is the way FlinkKafkaProducer initializes the 
> TransactionalIdsGenerator:
> The IDs are only guaranteed to be unique for a single Job. But they can clash 
> between different Jobs (and Clusters).
>  
>  
> {code:java}
> --- 
> a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
> +++ 
> b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
> @@ -819,6 +819,7 @@ public class FlinkKafkaProducer
>                 nextTransactionalIdHintState = 
> context.getOperatorStateStore().getUnionListState(
>                         NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR);
>                 transactionalIdsGenerator = new TransactionalIdsGenerator(
> + // the prefix probably should include job id and maybe cluster id
>                         getRuntimeContext().getTaskName() + "-" + 
> ((StreamingRuntimeContext) getRuntimeContext()).getOperatorUniqueID(),
>                         getRuntimeContext().getIndexOfThisSubtask(),
>                         
> getRuntimeContext().getNumberOfParallelSubtasks(),{code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-11653) Add configuration to enforce custom UID's on datastream

2019-03-08 Thread Dawid Wysakowicz (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11653?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dawid Wysakowicz closed FLINK-11653.

   Resolution: Fixed
Fix Version/s: 1.9.0

Implemented in 9c329484f9e8da58b3f0ac4e48348ba3e781dd08

> Add configuration to enforce custom UID's on datastream
> ---
>
> Key: FLINK-11653
> URL: https://issues.apache.org/jira/browse/FLINK-11653
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream
>Reporter: Seth Wiesman
>Assignee: Seth Wiesman
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> Current best practice when deploying Flink applications to production is to 
> set a custom UID, using DataStream#uid, so jobs can resume from savepoints 
> even if they job graph has been modified. Flink should contain a 
> configuration that can allow users to fail submission if their program 
> contains an operator without a custom UID; enforcing best practices similarly 
> to #disableGenericTypes.  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11654) Multiple transactional KafkaProducers writing to same cluster have clashing transaction IDs

2019-03-08 Thread Dawid Wysakowicz (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16788250#comment-16788250
 ] 

Dawid Wysakowicz commented on FLINK-11654:
--

The workaround is to set different uids 
https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/state/savepoints.html#assigning-operator-ids
 for the kafka producers.

I would be against introducing additional configuration parameter. This would 
introduce additional complexity not just in case of code, but also it would 
introduce additional corner cases while restoring state for an operator. What 
should happen in a user tries to restore state after changing the 
{{TransactionIdSeed}}. I think the operator id can serve this use cases very 
well. I agree though we should improve documentation around this topic.

> Multiple transactional KafkaProducers writing to same cluster have clashing 
> transaction IDs
> ---
>
> Key: FLINK-11654
> URL: https://issues.apache.org/jira/browse/FLINK-11654
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.7.1
>Reporter: Jürgen Kreileder
>Priority: Major
> Fix For: 1.9.0
>
>
> We run multiple jobs on a cluster which write a lot to the same Kafka topic 
> from identically named sinks. When EXACTLY_ONCE semantic is enabled for the 
> KafkaProducers we run into a lot of ProducerFencedExceptions and all jobs go 
> into a restart cycle.
> Example exception from the Kafka log:
>  
> {code:java}
> [2019-02-18 18:05:28,485] ERROR [ReplicaManager broker=1] Error processing 
> append operation on partition finding-commands-dev-1-0 
> (kafka.server.ReplicaManager)
> org.apache.kafka.common.errors.ProducerFencedException: Producer's epoch is 
> no longer valid. There is probably another producer with a newer epoch. 483 
> (request epoch), 484 (server epoch)
> {code}
> The reason for this is the way FlinkKafkaProducer initializes the 
> TransactionalIdsGenerator:
> The IDs are only guaranteed to be unique for a single Job. But they can clash 
> between different Jobs (and Clusters).
>  
>  
> {code:java}
> --- 
> a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
> +++ 
> b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
> @@ -819,6 +819,7 @@ public class FlinkKafkaProducer
>                 nextTransactionalIdHintState = 
> context.getOperatorStateStore().getUnionListState(
>                         NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR);
>                 transactionalIdsGenerator = new TransactionalIdsGenerator(
> + // the prefix probably should include job id and maybe cluster id
>                         getRuntimeContext().getTaskName() + "-" + 
> ((StreamingRuntimeContext) getRuntimeContext()).getOperatorUniqueID(),
>                         getRuntimeContext().getIndexOfThisSubtask(),
>                         
> getRuntimeContext().getNumberOfParallelSubtasks(),{code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-11654) Multiple transactional KafkaProducers writing to same cluster have clashing transaction IDs

2019-03-08 Thread Chris Slotterback (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16788247#comment-16788247
 ] 

Chris Slotterback edited comment on FLINK-11654 at 3/8/19 7:59 PM:
---

Setting the {{UID}} does work in my testing to avoid transactionId collisions, 
as long as it remains unique across *every* producer writing to that topic. In 
a multi-datacenter configuration, I had collisions occurring across multiple 
flink clusters.

I do think that if setting the {{UID}} operator (or a seed) is the the accepted 
workaround, it could certainly be more obvious or even set as required for the 
users to configure to use {{Semantics.EXACTLY_ONCE}} producers


was (Author: cslotterback):
I do think that if setting the {{UID}} operator (or a seed) is the the accepted 
workaround, it could certainly be more obvious or even set as required for the 
users to configure to use {{Semantics.EXACTLY_ONCE}} producers

> Multiple transactional KafkaProducers writing to same cluster have clashing 
> transaction IDs
> ---
>
> Key: FLINK-11654
> URL: https://issues.apache.org/jira/browse/FLINK-11654
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.7.1
>Reporter: Jürgen Kreileder
>Priority: Major
> Fix For: 1.9.0
>
>
> We run multiple jobs on a cluster which write a lot to the same Kafka topic 
> from identically named sinks. When EXACTLY_ONCE semantic is enabled for the 
> KafkaProducers we run into a lot of ProducerFencedExceptions and all jobs go 
> into a restart cycle.
> Example exception from the Kafka log:
>  
> {code:java}
> [2019-02-18 18:05:28,485] ERROR [ReplicaManager broker=1] Error processing 
> append operation on partition finding-commands-dev-1-0 
> (kafka.server.ReplicaManager)
> org.apache.kafka.common.errors.ProducerFencedException: Producer's epoch is 
> no longer valid. There is probably another producer with a newer epoch. 483 
> (request epoch), 484 (server epoch)
> {code}
> The reason for this is the way FlinkKafkaProducer initializes the 
> TransactionalIdsGenerator:
> The IDs are only guaranteed to be unique for a single Job. But they can clash 
> between different Jobs (and Clusters).
>  
>  
> {code:java}
> --- 
> a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
> +++ 
> b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
> @@ -819,6 +819,7 @@ public class FlinkKafkaProducer
>                 nextTransactionalIdHintState = 
> context.getOperatorStateStore().getUnionListState(
>                         NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR);
>                 transactionalIdsGenerator = new TransactionalIdsGenerator(
> + // the prefix probably should include job id and maybe cluster id
>                         getRuntimeContext().getTaskName() + "-" + 
> ((StreamingRuntimeContext) getRuntimeContext()).getOperatorUniqueID(),
>                         getRuntimeContext().getIndexOfThisSubtask(),
>                         
> getRuntimeContext().getNumberOfParallelSubtasks(),{code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11654) Multiple transactional KafkaProducers writing to same cluster have clashing transaction IDs

2019-03-08 Thread Chris Slotterback (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16788247#comment-16788247
 ] 

Chris Slotterback commented on FLINK-11654:
---

I do think that if setting the {{UID}} operator (or a seed) is the the accepted 
workaround, it could certainly be more obvious or even set as required for the 
users to configure to use {{Semantics.EXACTLY_ONCE}} producers

> Multiple transactional KafkaProducers writing to same cluster have clashing 
> transaction IDs
> ---
>
> Key: FLINK-11654
> URL: https://issues.apache.org/jira/browse/FLINK-11654
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.7.1
>Reporter: Jürgen Kreileder
>Priority: Major
> Fix For: 1.9.0
>
>
> We run multiple jobs on a cluster which write a lot to the same Kafka topic 
> from identically named sinks. When EXACTLY_ONCE semantic is enabled for the 
> KafkaProducers we run into a lot of ProducerFencedExceptions and all jobs go 
> into a restart cycle.
> Example exception from the Kafka log:
>  
> {code:java}
> [2019-02-18 18:05:28,485] ERROR [ReplicaManager broker=1] Error processing 
> append operation on partition finding-commands-dev-1-0 
> (kafka.server.ReplicaManager)
> org.apache.kafka.common.errors.ProducerFencedException: Producer's epoch is 
> no longer valid. There is probably another producer with a newer epoch. 483 
> (request epoch), 484 (server epoch)
> {code}
> The reason for this is the way FlinkKafkaProducer initializes the 
> TransactionalIdsGenerator:
> The IDs are only guaranteed to be unique for a single Job. But they can clash 
> between different Jobs (and Clusters).
>  
>  
> {code:java}
> --- 
> a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
> +++ 
> b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
> @@ -819,6 +819,7 @@ public class FlinkKafkaProducer
>                 nextTransactionalIdHintState = 
> context.getOperatorStateStore().getUnionListState(
>                         NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR);
>                 transactionalIdsGenerator = new TransactionalIdsGenerator(
> + // the prefix probably should include job id and maybe cluster id
>                         getRuntimeContext().getTaskName() + "-" + 
> ((StreamingRuntimeContext) getRuntimeContext()).getOperatorUniqueID(),
>                         getRuntimeContext().getIndexOfThisSubtask(),
>                         
> getRuntimeContext().getNumberOfParallelSubtasks(),{code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11654) Multiple transactional KafkaProducers writing to same cluster have clashing transaction IDs

2019-03-08 Thread Tim (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16788239#comment-16788239
 ] 

Tim commented on FLINK-11654:
-

It is not clear to me what the _recommended_ workaround is.   Is it to name the 
task so that different jobs will end up with different transaction IDs?

FWIW - I think the TransactionIdSeed is a good idea in that it makes it clear 
to the user that a token is used to uniquely identify the pool of 
KafkaProducers.   I have not thought about if and how that would impact 
recovery and savepoints though.

> Multiple transactional KafkaProducers writing to same cluster have clashing 
> transaction IDs
> ---
>
> Key: FLINK-11654
> URL: https://issues.apache.org/jira/browse/FLINK-11654
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.7.1
>Reporter: Jürgen Kreileder
>Priority: Major
> Fix For: 1.9.0
>
>
> We run multiple jobs on a cluster which write a lot to the same Kafka topic 
> from identically named sinks. When EXACTLY_ONCE semantic is enabled for the 
> KafkaProducers we run into a lot of ProducerFencedExceptions and all jobs go 
> into a restart cycle.
> Example exception from the Kafka log:
>  
> {code:java}
> [2019-02-18 18:05:28,485] ERROR [ReplicaManager broker=1] Error processing 
> append operation on partition finding-commands-dev-1-0 
> (kafka.server.ReplicaManager)
> org.apache.kafka.common.errors.ProducerFencedException: Producer's epoch is 
> no longer valid. There is probably another producer with a newer epoch. 483 
> (request epoch), 484 (server epoch)
> {code}
> The reason for this is the way FlinkKafkaProducer initializes the 
> TransactionalIdsGenerator:
> The IDs are only guaranteed to be unique for a single Job. But they can clash 
> between different Jobs (and Clusters).
>  
>  
> {code:java}
> --- 
> a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
> +++ 
> b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
> @@ -819,6 +819,7 @@ public class FlinkKafkaProducer
>                 nextTransactionalIdHintState = 
> context.getOperatorStateStore().getUnionListState(
>                         NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR);
>                 transactionalIdsGenerator = new TransactionalIdsGenerator(
> + // the prefix probably should include job id and maybe cluster id
>                         getRuntimeContext().getTaskName() + "-" + 
> ((StreamingRuntimeContext) getRuntimeContext()).getOperatorUniqueID(),
>                         getRuntimeContext().getIndexOfThisSubtask(),
>                         
> getRuntimeContext().getNumberOfParallelSubtasks(),{code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-11654) Multiple transactional KafkaProducers writing to same cluster have clashing transaction IDs

2019-03-08 Thread Tim (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16788239#comment-16788239
 ] 

Tim edited comment on FLINK-11654 at 3/8/19 7:40 PM:
-

It is not clear to me what the _recommended_ workaround is.   Is it to name the 
task so that different jobs will end up with different transaction IDs?

FWIW - I think the TransactionIdSeed is a good idea in that it makes it clear 
to the user that a token is used to uniquely identify the pool of 
KafkaProducers.   I have not thought about if and how that would impact 
recovery and savepoints though.

Also, adding my findings as well.   
http://mail-archives.apache.org/mod_mbox/flink-user/201903.mbox/%3C9E3F033F-0CB0-4659-A487-39BB728C4F01%40comcast.com%3E


was (Author: victtim):
It is not clear to me what the _recommended_ workaround is.   Is it to name the 
task so that different jobs will end up with different transaction IDs?

FWIW - I think the TransactionIdSeed is a good idea in that it makes it clear 
to the user that a token is used to uniquely identify the pool of 
KafkaProducers.   I have not thought about if and how that would impact 
recovery and savepoints though.

> Multiple transactional KafkaProducers writing to same cluster have clashing 
> transaction IDs
> ---
>
> Key: FLINK-11654
> URL: https://issues.apache.org/jira/browse/FLINK-11654
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.7.1
>Reporter: Jürgen Kreileder
>Priority: Major
> Fix For: 1.9.0
>
>
> We run multiple jobs on a cluster which write a lot to the same Kafka topic 
> from identically named sinks. When EXACTLY_ONCE semantic is enabled for the 
> KafkaProducers we run into a lot of ProducerFencedExceptions and all jobs go 
> into a restart cycle.
> Example exception from the Kafka log:
>  
> {code:java}
> [2019-02-18 18:05:28,485] ERROR [ReplicaManager broker=1] Error processing 
> append operation on partition finding-commands-dev-1-0 
> (kafka.server.ReplicaManager)
> org.apache.kafka.common.errors.ProducerFencedException: Producer's epoch is 
> no longer valid. There is probably another producer with a newer epoch. 483 
> (request epoch), 484 (server epoch)
> {code}
> The reason for this is the way FlinkKafkaProducer initializes the 
> TransactionalIdsGenerator:
> The IDs are only guaranteed to be unique for a single Job. But they can clash 
> between different Jobs (and Clusters).
>  
>  
> {code:java}
> --- 
> a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
> +++ 
> b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
> @@ -819,6 +819,7 @@ public class FlinkKafkaProducer
>                 nextTransactionalIdHintState = 
> context.getOperatorStateStore().getUnionListState(
>                         NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR);
>                 transactionalIdsGenerator = new TransactionalIdsGenerator(
> + // the prefix probably should include job id and maybe cluster id
>                         getRuntimeContext().getTaskName() + "-" + 
> ((StreamingRuntimeContext) getRuntimeContext()).getOperatorUniqueID(),
>                         getRuntimeContext().getIndexOfThisSubtask(),
>                         
> getRuntimeContext().getNumberOfParallelSubtasks(),{code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-5479) Per-partition watermarks in FlinkKafkaConsumer should consider idle partitions

2019-03-08 Thread Padarn Wilson (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-5479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16787991#comment-16787991
 ] 

Padarn Wilson commented on FLINK-5479:
--

Hi all - Is this still an open issue that anyone is working on? Was thinking of 
taking a crack at it, but thought I'd check here first.

> Per-partition watermarks in FlinkKafkaConsumer should consider idle partitions
> --
>
> Key: FLINK-5479
> URL: https://issues.apache.org/jira/browse/FLINK-5479
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Reporter: Tzu-Li (Gordon) Tai
>Priority: Major
> Fix For: 1.8.0
>
>
> Reported in ML: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-topic-partition-skewness-causes-watermark-not-being-emitted-td11008.html
> Similar to what's happening to idle sources blocking watermark progression in 
> downstream operators (see FLINK-5017), the per-partition watermark mechanism 
> in {{FlinkKafkaConsumer}} is also being blocked of progressing watermarks 
> when a partition is idle. The watermark of idle partitions is always 
> {{Long.MIN_VALUE}}, therefore the overall min watermark across all partitions 
> of a consumer subtask will never proceed.
> It's normally not a common case to have Kafka partitions not producing any 
> data, but it'll probably be good to handle this as well. I think we should 
> have a localized solution similar to FLINK-5017 for the per-partition 
> watermarks in {{AbstractFetcher}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] Xeli commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)

2019-03-08 Thread GitBox
Xeli commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added 
PubSub source connector with support for checkpointing (ATLEAST_ONCE)
URL: https://github.com/apache/flink/pull/6594#discussion_r263821841
 
 

 ##
 File path: 
flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSubscriberFactoryForEmulator.java
 ##
 @@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.gcp.pubsub;
+
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubSubscriberFactory;
+
+import com.google.api.gax.core.NoCredentialsProvider;
+import com.google.api.gax.grpc.GrpcTransportChannel;
+import com.google.api.gax.rpc.FixedTransportChannelProvider;
+import com.google.auth.Credentials;
+import com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub;
+import com.google.cloud.pubsub.v1.stub.SubscriberStub;
+import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings;
+import io.grpc.ManagedChannel;
+import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder;
+import io.grpc.netty.shaded.io.netty.channel.EventLoopGroup;
+
+import java.io.IOException;
+
+/**
+ * A convenience PubSubSubscriberFactory that can be used to connect to a 
PubSub emulator.
+ * The PubSub emulators do not support SSL or Credentials and as such this 
SubscriberStub does not require or provide this.
+ */
+public class PubSubSubscriberFactoryForEmulator implements 
PubSubSubscriberFactory {
 
 Review comment:
   Do you mean `flink-test-utils` ? That sounds like a good idea. What do I 
need to change to make sure it ends up there?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Reopened] (FLINK-11420) Serialization of case classes containing a Map[String, Any] sometimes throws ArrayIndexOutOfBoundsException

2019-03-08 Thread JIRA


 [ 
https://issues.apache.org/jira/browse/FLINK-11420?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jürgen Kreileder reopened FLINK-11420:
--

Just made a test run with 1.8-SNAPSHOT (commit 9d87fe1) and I've seen a few of 
those exceptions again:
{code:java}
java.lang.ArrayIndexOutOfBoundsException: -1
at com.esotericsoftware.kryo.util.IntArray.pop(IntArray.java:157)
at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:822)
at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:863)
at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:248)
at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:92)
at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
at 
org.apache.flink.api.scala.typeutils.TraversableSerializer.$anonfun$copy$1(TraversableSerializer.scala:90)
at 
scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:230)
at 
scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:461)
at 
org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:90)
at 
org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:33)
at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:92)
at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
at 
org.apache.flink.api.scala.typeutils.TraversableSerializer.$anonfun$copy$1(TraversableSerializer.scala:90)
at 
scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:230)
at 
scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:461)
at 
org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:90)
at 
org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:33)
at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:92)
at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:92)
at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
at 
org.apache.flink.streaming.api.datastream.CoGroupedStreams$UnionSerializer.copy(CoGroupedStreams.java:537)
at 
org.apache.flink.streaming.api.datastream.CoGroupedStreams$UnionSerializer.copy(CoGroupedStreams.java:506)
at 
org.apache.flink.api.common.typeutils.base.ListSerializer.copy(ListSerializer.java:99)
at 
org.apache.flink.api.common.typeutils.base.ListSerializer.copy(ListSerializer.java:42)
at 
org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:297)
at 
org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:321)
at 
org.apache.flink.runtime.state.heap.HeapListState.add(HeapListState.java:95)
at 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:391)
at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)
{code}

> Serialization of case classes containing a Map[String, Any] sometimes throws 
> ArrayIndexOutOfBoundsException
> ---
>
> Key: FLINK-11420
> URL: https://issues.apache.org/jira/browse/FLINK-11420
> Project: Flink
>  Issue Type: Bug
>  Components: API / Type Serialization System
>Affects Versions: 1.7.1
>Reporter: Jürgen Kreileder
>Assignee: Dawid Wysakowicz
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.7.3, 1.8.0
>
>  Time Spent: 1h
>  Remaining Estimate: 0h
>
> We frequently run into random ArrayIndexOutOfBounds exceptions when flink 
> tries to serialize Scala case classes containing a Map[String, Any] (Any 
> being String, Long, Int, or Boolean) with the FsStateBackend. (This probably 
> happens with any case class containing a type requiring Kryo, see this thread 
> for instance: 
> 

[jira] [Created] (FLINK-11863) Introduce channel to read and write compressed data

2019-03-08 Thread Kurt Young (JIRA)
Kurt Young created FLINK-11863:
--

 Summary: Introduce channel to read and write compressed data
 Key: FLINK-11863
 URL: https://issues.apache.org/jira/browse/FLINK-11863
 Project: Flink
  Issue Type: New Feature
Reporter: Kurt Young
Assignee: Kurt Young






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-11861) JobMasterTriggerSavepointIT case is not executed

2019-03-08 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11861?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-11861:
---
Labels: pull-request-available  (was: )

> JobMasterTriggerSavepointIT case is not executed
> 
>
> Key: FLINK-11861
> URL: https://issues.apache.org/jira/browse/FLINK-11861
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.7.2
>Reporter: shuai.xu
>Assignee: TisonKun
>Priority: Major
>  Labels: pull-request-available
>
> The 
> [JobMasterTriggerSavepointIT|https://github.com/apache/flink/blob/master/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointIT.java]
>  will not be executed as the case name does not follow the style ***ITCase.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] flinkbot commented on issue #7943: [FLINK-11861][tests] Fix JobMasterTriggerSavepointIT not executed

2019-03-08 Thread GitBox
flinkbot commented on issue #7943: [FLINK-11861][tests] Fix 
JobMasterTriggerSavepointIT not executed
URL: https://github.com/apache/flink/pull/7943#issuecomment-470903120
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] TisonKun opened a new pull request #7943: [FLINK-11861][tests] Fix JobMasterTriggerSavepointIT not executed

2019-03-08 Thread GitBox
TisonKun opened a new pull request #7943: [FLINK-11861][tests] Fix 
JobMasterTriggerSavepointIT not executed
URL: https://github.com/apache/flink/pull/7943
 
 
   ## What is the purpose of the change
   
   Reported by @shuai-xu , `JobMasterTriggerSavepointIT` doesn't follow the 
pattern "*ITCase.*" and thus is not executed in `tests` profile(it should be).
   
   Rename `JobMasterTriggerSavepointIT` to `JobMasterTriggerSavepointITCase` 
should resolve this issue.
   
   ```
  
integration-tests
integration-test

test




**/*ITCase.*


false


   ```
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector:(no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   
   cc @tillrohrmann 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Assigned] (FLINK-11861) JobMasterTriggerSavepointIT case is not executed

2019-03-08 Thread TisonKun (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11861?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

TisonKun reassigned FLINK-11861:


Assignee: TisonKun

> JobMasterTriggerSavepointIT case is not executed
> 
>
> Key: FLINK-11861
> URL: https://issues.apache.org/jira/browse/FLINK-11861
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.7.2
>Reporter: shuai.xu
>Assignee: TisonKun
>Priority: Major
>
> The 
> [JobMasterTriggerSavepointIT|https://github.com/apache/flink/blob/master/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointIT.java]
>  will not be executed as the case name does not follow the style ***ITCase.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] zhijiangW commented on a change in pull request #7713: [FLINK-10995][network] Copy intermediate serialization results only once for broadcast mode

2019-03-08 Thread GitBox
zhijiangW commented on a change in pull request #7713: [FLINK-10995][network] 
Copy intermediate serialization results only once for broadcast mode
URL: https://github.com/apache/flink/pull/7713#discussion_r263749333
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/BroadcastRecordWriter.java
 ##
 @@ -44,4 +52,67 @@ public BroadcastRecordWriter(
public void emit(T record) throws IOException, InterruptedException {
broadcastEmit(record);
}
+
+   @Override
+   public void broadcastEmit(T record) throws IOException, 
InterruptedException {
 
 Review comment:
   Yes, you are right. We do not need hardcode the size of latency marker. :)
   
   I have submitted the codes in a separate commit. I am not sure whether this 
change is suitable based on our comments, so I only refactored the main 
processes and ignored the tests with annotations. After your review and 
confirm, I would refactor the commits and tests.
   
   I think it is better to explain some thoughts below:
   
   In order to reuse the main process of 
`RecordWriter#copyFromSerializerToTargetChannel`, the `randomTriggered` field 
is still maintained in `BroadcastRecordWriter`, but I think the logic is easy 
to understand than the first version.
   
   After abstracting the `RecordWriter` to separate two implementations, the 
changes seem more complex, because there are many abstract methods involved in 
`BufferBuilder` and `ChannelSelector` which are the main differences between 
two implementations. 
   
   I am not sure you could accept the way of magic 0 in 
`BroadcastRecordWriter#broadcastEmit` for reusing the common codes. If we do 
not pass the target channel in broadcast operation, we might need to realize 
all the related processes in `BroadcastRecordWriter`.
   
   I think my first version seems more easy to follow if we only change the 
process of `BroadcastRecordWriter#randomEmit` as now. Maybe you have other good 
ways. :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #7713: [FLINK-10995][network] Copy intermediate serialization results only once for broadcast mode

2019-03-08 Thread GitBox
zhijiangW commented on a change in pull request #7713: [FLINK-10995][network] 
Copy intermediate serialization results only once for broadcast mode
URL: https://github.com/apache/flink/pull/7713#discussion_r263749333
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/BroadcastRecordWriter.java
 ##
 @@ -44,4 +52,67 @@ public BroadcastRecordWriter(
public void emit(T record) throws IOException, InterruptedException {
broadcastEmit(record);
}
+
+   @Override
+   public void broadcastEmit(T record) throws IOException, 
InterruptedException {
 
 Review comment:
   Yes, you are right. We do not need hardcode the size of latency marker. :)
   
   I have submitted the codes in a separate commit. I am not sure whether this 
changes is suitable based on our comments, so I only refactored the main 
processes and ignored the tests with annotations. After your review and 
confirm, I would refactor the commits and tests.
   
   I think it is better to explain some thoughts below:
   
   In order to reuse the main process of 
`RecordWriter#copyFromSerializerToTargetChannel`, the `randomTriggered` field 
is still maintained in `BroadcastRecordWriter`, but I think the logic is easy 
to understand than the first version.
   
   After abstracting the `RecordWriter` to separate two implementations, the 
changes seem more complex, because there are many abstract methods involved in 
`BufferBuilder` and `ChannelSelector` which are the main differences between 
two implementations. 
   
   I am not sure you could accept the way of magic 0 in 
`BroadcastRecordWriter#broadcastEmit` for reusing the common codes. If we do 
not pass the target channel in broadcast operation, we might need to realize 
all the related processes in `BroadcastRecordWriter`.
   
   I think my first version seems more easy to follow if we only change the 
process of `BroadcastRecordWriter#randomEmit` as now. Maybe you have other good 
ways. :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11818) Provide pipe transformation function for DataSet API

2019-03-08 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11818?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16787763#comment-16787763
 ] 

vinoyang commented on FLINK-11818:
--

Hi [~hequn8128] , In fact, my idea is not much different from the current 
implementation of Spark.

1) We can provide multiple overloaded methods called pipe for the DataSet 
object. E.g, p{{ipe(String cmd)/pipe(String cmd, Map env)...}}, 
 Flink inputs the external program and gets the output of the external program 
as a new DataSet. [1]  [2]

2) I think its semantics are similar to Spark.

 

[1]: 
[https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala]

[2]: 
[https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala]

 

What do you think? cc [~fhueske] [~till.rohrmann]

 

> Provide pipe transformation function for DataSet API
> 
>
> Key: FLINK-11818
> URL: https://issues.apache.org/jira/browse/FLINK-11818
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataSet
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Major
>
> We have some business requirements that require the data handled by Flink to 
> interact with some external programs (such as Python/Perl/shell scripts). 
> There is no such function in the existing DataSet API, although it can be 
> implemented by the map function, but it is not concise. It would be helpful 
> if we could provide a pipe[1] function like Spark.
> [1]: 
> https://spark.apache.org/docs/latest/rdd-programming-guide.html#transformations



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-11862) many differents query on same stream cause second condition of where of second query error

2019-03-08 Thread zhengbm (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11862?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhengbm updated FLINK-11862:

Description: 
List fields = Lists.newArrayList("rawMessage","timestamp");
 Schema schema = new Schema();
 for (int i = 0; i < fields.size(); i++)

{ schema.field(fields.get(i), Types.STRING()).from(fields.get(i)); }

tableEnvironment.connect(new Kafka()
 .version("0.8")
 .properties(properties)
 .topic("raw_playtime_h5_source")
 .startFromLatest()
 )
 .withFormat(new Json().failOnMissingField(false).deriveSchema())
 .withSchema(schema)
 .inAppendMode()
 .registerTableSource("t1");

Table table2 = tableEnvironment
 .sqlQuery("select maps,`timestamp`,CARDINALITY(maps) AS maps_length ,1 as 
flash from t1 ,LATERAL TABLE(split(rawMessage,'
 t')) as T(maps) ");

tableEnvironment.registerTable("t2", table2);

Table table = tableEnvironment.sqlQuery("select `timestamp`,maps_length from t2 
where maps_length>0");

TypeInformation typeInformation = table.getSchema().toRowType();

String[] columns = table.getSchema().getFieldNames();
 DataStream dataStream = tableEnvironment
 .toAppendStream(table, typeInformation)
 .map(new PhysicTransformMap(columns, 0));

dataStream.print();

try

{ env.execute(); }

catch (Exception e)

{ e.printStackTrace(); }

{color:#d04437}noted:message of kafka : \{"timestamp" : 
"","rawMessage":"xxx\txxx\t\t"}(if I delete where condition of second 
query will ok,for "select `timestamp`,maps_length from t2 " or "select 
`timestamp`,maps_length from t2 where timestamp>0" is also ok){color}

{color:#d04437}next is trace of exception{color}

Exception in thread "main" org.apache.flink.table.codegen.CodeGenException: 
Invalid input access.
 at 
org.apache.flink.table.codegen.CodeGenerator$$anonfun$15.apply(CodeGenerator.scala:587)
 at 
org.apache.flink.table.codegen.CodeGenerator$$anonfun$15.apply(CodeGenerator.scala:587)
 at scala.Option.getOrElse(Option.scala:120)
 at 
org.apache.flink.table.codegen.CodeGenerator.visitInputRef(CodeGenerator.scala:587)
 at 
org.apache.flink.table.codegen.CodeGenerator.visitInputRef(CodeGenerator.scala:66)
 at org.apache.calcite.rex.RexInputRef.accept(RexInputRef.java:112)
 at 
org.apache.flink.table.codegen.CodeGenerator$$anonfun$16.apply(CodeGenerator.scala:754)
 at 
org.apache.flink.table.codegen.CodeGenerator$$anonfun$16.apply(CodeGenerator.scala:744)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
 at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
 at scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
 at scala.collection.AbstractTraversable.map(Traversable.scala:104)
 at 
org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:744)
 at 
org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:66)
 at org.apache.calcite.rex.RexCall.accept(RexCall.java:107)
 at 
org.apache.flink.table.codegen.CodeGenerator$$anonfun$16.apply(CodeGenerator.scala:754)
 at 
org.apache.flink.table.codegen.CodeGenerator$$anonfun$16.apply(CodeGenerator.scala:744)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
 at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
 at scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
 at scala.collection.AbstractTraversable.map(Traversable.scala:104)
 at 
org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:744)
 at 
org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:66)
 at org.apache.calcite.rex.RexCall.accept(RexCall.java:107)
 at 
org.apache.flink.table.codegen.CodeGenerator.generateExpression(CodeGenerator.scala:247)
 at 
org.apache.flink.table.plan.nodes.CommonCorrelate$class.generateCollector(CommonCorrelate.scala:155)
 at 
org.apache.flink.table.plan.nodes.datastream.DataStreamCorrelate.generateCollector(DataStreamCorrelate.scala:38)
 at 
org.apache.flink.table.plan.nodes.datastream.DataStreamCorrelate.translateToPlan(DataStreamCorrelate.scala:116)
 at 
org.apache.flink.table.plan.nodes.datastream.DataStreamCalc.translateToPlan(DataStreamCalc.scala:97)
 at 
org.apache.flink.table.api.StreamTableEnvironment.translateToCRow(StreamTableEnvironment.scala:967)
 at 
org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:894)
 at 
org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:864)
 at 
org.apache.flink.table.api.java.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:224)
 at 

[jira] [Updated] (FLINK-11862) many differents query on same stream cause second condition of where of second query error

2019-03-08 Thread zhengbm (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11862?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhengbm updated FLINK-11862:

Summary: many differents query on same stream cause second condition of 
where of second query error  (was: 在同一条流上进行多次不同的sql,第二个sql的where条件不可用)

> many differents query on same stream cause second condition of where of 
> second query error
> --
>
> Key: FLINK-11862
> URL: https://issues.apache.org/jira/browse/FLINK-11862
> Project: Flink
>  Issue Type: Bug
>  Components: API / Table SQL
>Affects Versions: 1.7.2
> Environment: flink 1.7版本 java 1.8
>Reporter: zhengbm
>Priority: Major
>
> List fields = Lists.newArrayList("rawMessage","timestamp");
>  Schema schema = new Schema();
>  for (int i = 0; i < fields.size(); i++)
> { schema.field(fields.get(i), Types.STRING()).from(fields.get(i)); }
> tableEnvironment.connect(new Kafka()
>  .version("0.8")
>  .properties(properties)
>  .topic("raw_playtime_h5_source")
>  .startFromLatest()
>  )
>  .withFormat(new Json().failOnMissingField(false).deriveSchema())
>  .withSchema(schema)
>  .inAppendMode()
>  .registerTableSource("t1");
> Table table2 = tableEnvironment
>  .sqlQuery("select maps,`timestamp`,CARDINALITY(maps) AS maps_length ,1 as 
> flash from t1 ,LATERAL TABLE(split(rawMessage,'
>  t')) as T(maps) ");
> tableEnvironment.registerTable("t2", table2);
> Table table = tableEnvironment.sqlQuery("select `timestamp`,maps_length from 
> t2 where maps_length>0");
> TypeInformation typeInformation = table.getSchema().toRowType();
> String[] columns = table.getSchema().getFieldNames();
>  DataStream dataStream = tableEnvironment
>  .toAppendStream(table, typeInformation)
>  .map(new PhysicTransformMap(columns, 0));
> dataStream.print();
> try
> { env.execute(); }
> catch (Exception e)
> { e.printStackTrace(); }
> {color:#d04437}注:kafka中的数据流格式如下\{"timestamp" : 
> "","rawMessage":"xxx\txxx\t\t"}(如果where条件换成timestamp则可用,或者把where条件去掉程序运行正常)函数split是把一条string切分为map结构{color}
> 报错信息如下
> Exception in thread "main" org.apache.flink.table.codegen.CodeGenException: 
> Invalid input access.
>  at 
> org.apache.flink.table.codegen.CodeGenerator$$anonfun$15.apply(CodeGenerator.scala:587)
>  at 
> org.apache.flink.table.codegen.CodeGenerator$$anonfun$15.apply(CodeGenerator.scala:587)
>  at scala.Option.getOrElse(Option.scala:120)
>  at 
> org.apache.flink.table.codegen.CodeGenerator.visitInputRef(CodeGenerator.scala:587)
>  at 
> org.apache.flink.table.codegen.CodeGenerator.visitInputRef(CodeGenerator.scala:66)
>  at org.apache.calcite.rex.RexInputRef.accept(RexInputRef.java:112)
>  at 
> org.apache.flink.table.codegen.CodeGenerator$$anonfun$16.apply(CodeGenerator.scala:754)
>  at 
> org.apache.flink.table.codegen.CodeGenerator$$anonfun$16.apply(CodeGenerator.scala:744)
>  at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
>  at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
>  at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>  at scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
>  at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>  at 
> org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:744)
>  at 
> org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:66)
>  at org.apache.calcite.rex.RexCall.accept(RexCall.java:107)
>  at 
> org.apache.flink.table.codegen.CodeGenerator$$anonfun$16.apply(CodeGenerator.scala:754)
>  at 
> org.apache.flink.table.codegen.CodeGenerator$$anonfun$16.apply(CodeGenerator.scala:744)
>  at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
>  at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
>  at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>  at scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
>  at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>  at 
> org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:744)
>  at 
> org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:66)
>  at org.apache.calcite.rex.RexCall.accept(RexCall.java:107)
>  at 
> org.apache.flink.table.codegen.CodeGenerator.generateExpression(CodeGenerator.scala:247)
>  at 
> org.apache.flink.table.plan.nodes.CommonCorrelate$class.generateCollector(CommonCorrelate.scala:155)
>  at 
> 

[jira] [Updated] (FLINK-11862) 在同一条流上进行多次不同的sql,第二个sql的where条件不可用

2019-03-08 Thread zhengbm (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11862?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhengbm updated FLINK-11862:

Description: 
List fields = Lists.newArrayList("rawMessage","timestamp");
 Schema schema = new Schema();
 for (int i = 0; i < fields.size(); i++)

{ schema.field(fields.get(i), Types.STRING()).from(fields.get(i)); }

tableEnvironment.connect(new Kafka()
 .version("0.8")
 .properties(properties)
 .topic("raw_playtime_h5_source")
 .startFromLatest()
 )
 .withFormat(new Json().failOnMissingField(false).deriveSchema())
 .withSchema(schema)
 .inAppendMode()
 .registerTableSource("t1");

Table table2 = tableEnvironment
 .sqlQuery("select maps,`timestamp`,CARDINALITY(maps) AS maps_length ,1 as 
flash from t1 ,LATERAL TABLE(split(rawMessage,'
 t')) as T(maps) ");

tableEnvironment.registerTable("t2", table2);

Table table = tableEnvironment.sqlQuery("select `timestamp`,maps_length from t2 
where maps_length>0");

TypeInformation typeInformation = table.getSchema().toRowType();

String[] columns = table.getSchema().getFieldNames();
 DataStream dataStream = tableEnvironment
 .toAppendStream(table, typeInformation)
 .map(new PhysicTransformMap(columns, 0));

dataStream.print();

try

{ env.execute(); }

catch (Exception e)

{ e.printStackTrace(); }

{color:#d04437}注:kafka中的数据流格式如下\{"timestamp" : 
"","rawMessage":"xxx\txxx\t\t"}(如果where条件换成timestamp则可用,或者把where条件去掉程序运行正常)函数split是把一条string切分为map结构{color}

报错信息如下

Exception in thread "main" org.apache.flink.table.codegen.CodeGenException: 
Invalid input access.
 at 
org.apache.flink.table.codegen.CodeGenerator$$anonfun$15.apply(CodeGenerator.scala:587)
 at 
org.apache.flink.table.codegen.CodeGenerator$$anonfun$15.apply(CodeGenerator.scala:587)
 at scala.Option.getOrElse(Option.scala:120)
 at 
org.apache.flink.table.codegen.CodeGenerator.visitInputRef(CodeGenerator.scala:587)
 at 
org.apache.flink.table.codegen.CodeGenerator.visitInputRef(CodeGenerator.scala:66)
 at org.apache.calcite.rex.RexInputRef.accept(RexInputRef.java:112)
 at 
org.apache.flink.table.codegen.CodeGenerator$$anonfun$16.apply(CodeGenerator.scala:754)
 at 
org.apache.flink.table.codegen.CodeGenerator$$anonfun$16.apply(CodeGenerator.scala:744)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
 at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
 at scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
 at scala.collection.AbstractTraversable.map(Traversable.scala:104)
 at 
org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:744)
 at 
org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:66)
 at org.apache.calcite.rex.RexCall.accept(RexCall.java:107)
 at 
org.apache.flink.table.codegen.CodeGenerator$$anonfun$16.apply(CodeGenerator.scala:754)
 at 
org.apache.flink.table.codegen.CodeGenerator$$anonfun$16.apply(CodeGenerator.scala:744)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
 at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
 at scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
 at scala.collection.AbstractTraversable.map(Traversable.scala:104)
 at 
org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:744)
 at 
org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:66)
 at org.apache.calcite.rex.RexCall.accept(RexCall.java:107)
 at 
org.apache.flink.table.codegen.CodeGenerator.generateExpression(CodeGenerator.scala:247)
 at 
org.apache.flink.table.plan.nodes.CommonCorrelate$class.generateCollector(CommonCorrelate.scala:155)
 at 
org.apache.flink.table.plan.nodes.datastream.DataStreamCorrelate.generateCollector(DataStreamCorrelate.scala:38)
 at 
org.apache.flink.table.plan.nodes.datastream.DataStreamCorrelate.translateToPlan(DataStreamCorrelate.scala:116)
 at 
org.apache.flink.table.plan.nodes.datastream.DataStreamCalc.translateToPlan(DataStreamCalc.scala:97)
 at 
org.apache.flink.table.api.StreamTableEnvironment.translateToCRow(StreamTableEnvironment.scala:967)
 at 
org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:894)
 at 
org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:864)
 at 
org.apache.flink.table.api.java.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:224)
 at 
org.apache.flink.table.api.java.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:173)

  was:
List fields = Lists.newArrayList("rawMessage","timestamp");
 

[jira] [Updated] (FLINK-11862) 在同一条流上进行多次不同的sql,第二个sql的where条件不可用

2019-03-08 Thread zhengbm (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11862?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhengbm updated FLINK-11862:

Description: 
List fields = Lists.newArrayList("rawMessage","timestamp");
 Schema schema = new Schema();
 for (int i = 0; i < fields.size(); i++)

{ schema.field(fields.get(i), Types.STRING()).from(fields.get(i)); }

tableEnvironment.connect(new Kafka()
 .version("0.8")
 .properties(properties)
 .topic("raw_playtime_h5_source")
 .startFromLatest()
 )
 .withFormat(new Json().failOnMissingField(false).deriveSchema())
 .withSchema(schema)
 .inAppendMode()
 .registerTableSource("t1");

Table table2 = tableEnvironment
 .sqlQuery("select maps,`timestamp`,CARDINALITY(maps) AS maps_length ,1 as 
flash from t1 ,LATERAL TABLE(split(rawMessage,'
t')) as T(maps) ");

tableEnvironment.registerTable("t2", table2);

Table table = tableEnvironment.sqlQuery("select `timestamp`,maps_length from t2 
where maps_length>0");

TypeInformation typeInformation = table.getSchema().toRowType();

String[] columns = table.getSchema().getFieldNames();
 DataStream dataStream = tableEnvironment
 .toAppendStream(table, typeInformation)
 .map(new PhysicTransformMap(columns, 0));

dataStream.print();

try

{ env.execute(); }

catch (Exception e)

{ e.printStackTrace(); }

{color:#d04437}注:kafka中的数据流格式如下\{"timestamp" : 
"","rawMessage":"xxx\txxx\t\t"}(如果where条件换成timestamp则可用,或者把where条件去掉程序运行正常){color}

报错信息如下

Exception in thread "main" org.apache.flink.table.codegen.CodeGenException: 
Invalid input access.
 at 
org.apache.flink.table.codegen.CodeGenerator$$anonfun$15.apply(CodeGenerator.scala:587)
 at 
org.apache.flink.table.codegen.CodeGenerator$$anonfun$15.apply(CodeGenerator.scala:587)
 at scala.Option.getOrElse(Option.scala:120)
 at 
org.apache.flink.table.codegen.CodeGenerator.visitInputRef(CodeGenerator.scala:587)
 at 
org.apache.flink.table.codegen.CodeGenerator.visitInputRef(CodeGenerator.scala:66)
 at org.apache.calcite.rex.RexInputRef.accept(RexInputRef.java:112)
 at 
org.apache.flink.table.codegen.CodeGenerator$$anonfun$16.apply(CodeGenerator.scala:754)
 at 
org.apache.flink.table.codegen.CodeGenerator$$anonfun$16.apply(CodeGenerator.scala:744)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
 at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
 at scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
 at scala.collection.AbstractTraversable.map(Traversable.scala:104)
 at 
org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:744)
 at 
org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:66)
 at org.apache.calcite.rex.RexCall.accept(RexCall.java:107)
 at 
org.apache.flink.table.codegen.CodeGenerator$$anonfun$16.apply(CodeGenerator.scala:754)
 at 
org.apache.flink.table.codegen.CodeGenerator$$anonfun$16.apply(CodeGenerator.scala:744)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
 at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
 at scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
 at scala.collection.AbstractTraversable.map(Traversable.scala:104)
 at 
org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:744)
 at 
org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:66)
 at org.apache.calcite.rex.RexCall.accept(RexCall.java:107)
 at 
org.apache.flink.table.codegen.CodeGenerator.generateExpression(CodeGenerator.scala:247)
 at 
org.apache.flink.table.plan.nodes.CommonCorrelate$class.generateCollector(CommonCorrelate.scala:155)
 at 
org.apache.flink.table.plan.nodes.datastream.DataStreamCorrelate.generateCollector(DataStreamCorrelate.scala:38)
 at 
org.apache.flink.table.plan.nodes.datastream.DataStreamCorrelate.translateToPlan(DataStreamCorrelate.scala:116)
 at 
org.apache.flink.table.plan.nodes.datastream.DataStreamCalc.translateToPlan(DataStreamCalc.scala:97)
 at 
org.apache.flink.table.api.StreamTableEnvironment.translateToCRow(StreamTableEnvironment.scala:967)
 at 
org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:894)
 at 
org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:864)
 at 
org.apache.flink.table.api.java.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:224)
 at 
org.apache.flink.table.api.java.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:173)

  was:
List fields = Lists.newArrayList("rawMessage","timestamp");
Schema schema = new Schema();

[jira] [Created] (FLINK-11862) 在同一条流上进行多次不同的sql,第二个sql的where条件不可用

2019-03-08 Thread zhengbm (JIRA)
zhengbm created FLINK-11862:
---

 Summary: 在同一条流上进行多次不同的sql,第二个sql的where条件不可用
 Key: FLINK-11862
 URL: https://issues.apache.org/jira/browse/FLINK-11862
 Project: Flink
  Issue Type: Bug
  Components: API / Table SQL
Affects Versions: 1.7.2
 Environment: flink 1.7版本 java 1.8
Reporter: zhengbm


List fields = Lists.newArrayList("rawMessage","timestamp");
Schema schema = new Schema();
for (int i = 0; i < fields.size(); i++) {
 schema.field(fields.get(i), Types.STRING()).from(fields.get(i));
}
tableEnvironment.connect(new Kafka()
 .version("0.8")
 .properties(properties)
 .topic("raw_playtime_h5_source")
 .startFromLatest()
 )
 .withFormat(new Json().failOnMissingField(false).deriveSchema())
 .withSchema(schema)
 .inAppendMode()
 .registerTableSource("t1");

Table table2 = tableEnvironment
 .sqlQuery("select maps,`timestamp`,CARDINALITY(maps) AS maps_length ,1 as 
flash from t1 ,LATERAL TABLE(split(rawMessage,'\\t')) as T(maps) ");

tableEnvironment.registerTable("t2", table2);

Table table = tableEnvironment.sqlQuery("select `timestamp`,maps_length from t2 
where maps_length>0");

TypeInformation typeInformation = table.getSchema().toRowType();

String[] columns = table.getSchema().getFieldNames();
DataStream dataStream = tableEnvironment
 .toAppendStream(table, typeInformation)
 .map(new PhysicTransformMap(columns, 0));

dataStream.print();

try {
 env.execute();
} catch (Exception e) {
 e.printStackTrace();
}

注:kafka中的数据流格式如下\{"timestamp" : "","rawMessage":"xxx\txxx\t\t"}

 

Exception in thread "main" org.apache.flink.table.codegen.CodeGenException: 
Invalid input access.
 at 
org.apache.flink.table.codegen.CodeGenerator$$anonfun$15.apply(CodeGenerator.scala:587)
 at 
org.apache.flink.table.codegen.CodeGenerator$$anonfun$15.apply(CodeGenerator.scala:587)
 at scala.Option.getOrElse(Option.scala:120)
 at 
org.apache.flink.table.codegen.CodeGenerator.visitInputRef(CodeGenerator.scala:587)
 at 
org.apache.flink.table.codegen.CodeGenerator.visitInputRef(CodeGenerator.scala:66)
 at org.apache.calcite.rex.RexInputRef.accept(RexInputRef.java:112)
 at 
org.apache.flink.table.codegen.CodeGenerator$$anonfun$16.apply(CodeGenerator.scala:754)
 at 
org.apache.flink.table.codegen.CodeGenerator$$anonfun$16.apply(CodeGenerator.scala:744)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
 at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
 at scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
 at scala.collection.AbstractTraversable.map(Traversable.scala:104)
 at 
org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:744)
 at 
org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:66)
 at org.apache.calcite.rex.RexCall.accept(RexCall.java:107)
 at 
org.apache.flink.table.codegen.CodeGenerator$$anonfun$16.apply(CodeGenerator.scala:754)
 at 
org.apache.flink.table.codegen.CodeGenerator$$anonfun$16.apply(CodeGenerator.scala:744)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
 at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
 at scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
 at scala.collection.AbstractTraversable.map(Traversable.scala:104)
 at 
org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:744)
 at 
org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:66)
 at org.apache.calcite.rex.RexCall.accept(RexCall.java:107)
 at 
org.apache.flink.table.codegen.CodeGenerator.generateExpression(CodeGenerator.scala:247)
 at 
org.apache.flink.table.plan.nodes.CommonCorrelate$class.generateCollector(CommonCorrelate.scala:155)
 at 
org.apache.flink.table.plan.nodes.datastream.DataStreamCorrelate.generateCollector(DataStreamCorrelate.scala:38)
 at 
org.apache.flink.table.plan.nodes.datastream.DataStreamCorrelate.translateToPlan(DataStreamCorrelate.scala:116)
 at 
org.apache.flink.table.plan.nodes.datastream.DataStreamCalc.translateToPlan(DataStreamCalc.scala:97)
 at 
org.apache.flink.table.api.StreamTableEnvironment.translateToCRow(StreamTableEnvironment.scala:967)
 at 
org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:894)
 at 
org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:864)
 at 
org.apache.flink.table.api.java.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:224)
 at 

[jira] [Created] (FLINK-11861) JobMasterTriggerSavepointIT case is not executed

2019-03-08 Thread shuai.xu (JIRA)
shuai.xu created FLINK-11861:


 Summary: JobMasterTriggerSavepointIT case is not executed
 Key: FLINK-11861
 URL: https://issues.apache.org/jira/browse/FLINK-11861
 Project: Flink
  Issue Type: Bug
  Components: Tests
Affects Versions: 1.7.2
Reporter: shuai.xu


The 
[JobMasterTriggerSavepointIT|https://github.com/apache/flink/blob/master/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointIT.java]
 will not be executed as the case name does not follow the style ***ITCase.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-11858) Introduce block compressor/decompressor for batch table runtime

2019-03-08 Thread Kurt Young (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11858?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young closed FLINK-11858.
--
   Resolution: Implemented
Fix Version/s: 1.9.0

fixed in 2345bbe0c8cfc1af75b8faef243dcde11777d5b7

> Introduce block compressor/decompressor for batch table runtime
> ---
>
> Key: FLINK-11858
> URL: https://issues.apache.org/jira/browse/FLINK-11858
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Operators
>Reporter: Kurt Young
>Assignee: Kurt Young
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] KurtYoung closed pull request #7941: [FLINK-11858][table-runtime-blink] Introduce block compression to batch table runtime

2019-03-08 Thread GitBox
KurtYoung closed pull request #7941: [FLINK-11858][table-runtime-blink] 
Introduce block compression to batch table runtime
URL: https://github.com/apache/flink/pull/7941
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10575) Remove deprecated ExecutionGraphBuilder.buildGraph method

2019-03-08 Thread TisonKun (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10575?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16787698#comment-16787698
 ] 

TisonKun commented on FLINK-10575:
--

[~till.rohrmann] Besides remove the deprecated method, I notice that parameter 
{{prior}} is always {{null}}. It is still valid concept that we might attach a 
job graph to an existing execution graph?


{code:java}
   /**
 * Builds the ExecutionGraph from the JobGraph.
 * If a prior execution graph exists, the JobGraph will be attached. If 
no prior execution
 * graph exists, then the JobGraph will become attach to a new empty 
execution graph.
 */
public static ExecutionGraph buildGraph(
@Nullable ExecutionGraph prior,
{code}


> Remove deprecated ExecutionGraphBuilder.buildGraph method
> -
>
> Key: FLINK-10575
> URL: https://issues.apache.org/jira/browse/FLINK-10575
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Affects Versions: 1.7.0
>Reporter: JIN SUN
>Assignee: JIN SUN
>Priority: Major
>
> ExecutionGraphBuilder is not a public API and we should able to remove 
> deprecated method such as:
> @Deprecated
> public static ExecutionGraph buildGraph
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-11837) Improve internal data format

2019-03-08 Thread Kurt Young (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11837?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young closed FLINK-11837.
--
   Resolution: Implemented
Fix Version/s: 1.9.0

fixed in 043540bfd29ff1886c83c71c7b178d1261fa35bd

> Improve internal data format
> 
>
> Key: FLINK-11837
> URL: https://issues.apache.org/jira/browse/FLINK-11837
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / Operators
>Reporter: Jingsong Lee
>Assignee: Jingsong Lee
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> In FLINK-11701 , we introduce an abstract set of internal data formats to 
> table-runtime-blink. This JIRA is a complement to it.
> Introduce Decimal: Scale of this object is specified by the user, not 
> automatically determined(like BigDecimal).
> Introduce BinaryGeneric(GenericType): We don't care about specific types, we 
> work through serializers.
> Introduce LazyBinaryFormat: It can exist in two formats: binary or java object



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] KurtYoung closed pull request #7913: [FLINK-11837][table-runtime-blink] Improve internal data format

2019-03-08 Thread GitBox
KurtYoung closed pull request #7913: [FLINK-11837][table-runtime-blink] Improve 
internal data format
URL: https://github.com/apache/flink/pull/7913
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on issue #7913: [FLINK-11837][table-runtime-blink] Improve internal data format

2019-03-08 Thread GitBox
KurtYoung commented on issue #7913: [FLINK-11837][table-runtime-blink] Improve 
internal data format
URL: https://github.com/apache/flink/pull/7913#issuecomment-470860725
 
 
   LGTM, merging...


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10575) Remove deprecated ExecutionGraphBuilder.buildGraph method

2019-03-08 Thread TisonKun (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10575?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16787697#comment-16787697
 ] 

TisonKun commented on FLINK-10575:
--

[~isunjin] We can process this JIRA now. Since you looks like inactive for some 
times, I'd like to take over it in days.

> Remove deprecated ExecutionGraphBuilder.buildGraph method
> -
>
> Key: FLINK-10575
> URL: https://issues.apache.org/jira/browse/FLINK-10575
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Affects Versions: 1.7.0
>Reporter: JIN SUN
>Assignee: JIN SUN
>Priority: Major
>
> ExecutionGraphBuilder is not a public API and we should able to remove 
> deprecated method such as:
> @Deprecated
> public static ExecutionGraph buildGraph
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-11857) Introduce BinaryExternalSorter to batch table runtime

2019-03-08 Thread Kurt Young (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11857?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young updated FLINK-11857:
---
Description: We need a sorter to take full advantage of the high 
performance of the Binary format.

> Introduce BinaryExternalSorter to batch table runtime
> -
>
> Key: FLINK-11857
> URL: https://issues.apache.org/jira/browse/FLINK-11857
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / Operators
>Reporter: Jingsong Lee
>Assignee: Jingsong Lee
>Priority: Major
>
> We need a sorter to take full advantage of the high performance of the Binary 
> format.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-11857) Introduce BinaryExternalSorter to batch table runtime

2019-03-08 Thread Kurt Young (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11857?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young updated FLINK-11857:
---
Environment: (was: We need a sorter to take full advantage of the high 
performance of the Binary format.)

> Introduce BinaryExternalSorter to batch table runtime
> -
>
> Key: FLINK-11857
> URL: https://issues.apache.org/jira/browse/FLINK-11857
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / Operators
>Reporter: Jingsong Lee
>Assignee: Jingsong Lee
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] KurtYoung commented on a change in pull request #7931: [FLINK-11854] [table-planner-blink] Introduce batch physical nodes

2019-03-08 Thread GitBox
KurtYoung commented on a change in pull request #7931: [FLINK-11854] 
[table-planner-blink] Introduce batch physical nodes
URL: https://github.com/apache/flink/pull/7931#discussion_r263702948
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/RelNodeUtil.scala
 ##
 @@ -0,0 +1,630 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.util
+
+import org.apache.flink.api.common.operators.Order
+import org.apache.flink.table.CalcitePair
+import org.apache.flink.table.`type`.InternalType
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.dataview.DataViewSpec
+import org.apache.flink.table.functions.{AggregateFunction, 
UserDefinedFunction}
+import org.apache.flink.table.plan.FlinkJoinRelType
+import org.apache.flink.table.plan.nodes.ExpressionFormat
+import org.apache.flink.table.plan.nodes.ExpressionFormat.ExpressionFormat
+import org.apache.flink.table.plan.nodes.physical.batch.BatchPhysicalRel
+import org.apache.flink.table.typeutils.BinaryRowSerializer
+
+import org.apache.calcite.plan.{RelOptCost, RelOptPlanner}
+import org.apache.calcite.rel.RelFieldCollation.Direction
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.Window.Group
+import org.apache.calcite.rel.core.{AggregateCall, Calc, Window}
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rel.{RelFieldCollation, RelNode, RelWriter}
+import org.apache.calcite.rex.{RexCall, RexInputRef, RexLiteral, RexNode, 
RexProgram, RexWindowBound}
+import org.apache.calcite.sql.SqlKind
+import org.apache.calcite.util.mapping.IntPair
+
+import java.util
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
+/**
+  * Utility methods for RelNode.
+  */
+object RelNodeUtil {
 
 Review comment:
   BTW, this class name is also too big


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on a change in pull request #7931: [FLINK-11854] [table-planner-blink] Introduce batch physical nodes

2019-03-08 Thread GitBox
KurtYoung commented on a change in pull request #7931: [FLINK-11854] 
[table-planner-blink] Introduce batch physical nodes
URL: https://github.com/apache/flink/pull/7931#discussion_r263702820
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/RelNodeUtil.scala
 ##
 @@ -0,0 +1,630 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.util
+
+import org.apache.flink.api.common.operators.Order
+import org.apache.flink.table.CalcitePair
+import org.apache.flink.table.`type`.InternalType
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.dataview.DataViewSpec
+import org.apache.flink.table.functions.{AggregateFunction, 
UserDefinedFunction}
+import org.apache.flink.table.plan.FlinkJoinRelType
+import org.apache.flink.table.plan.nodes.ExpressionFormat
+import org.apache.flink.table.plan.nodes.ExpressionFormat.ExpressionFormat
+import org.apache.flink.table.plan.nodes.physical.batch.BatchPhysicalRel
+import org.apache.flink.table.typeutils.BinaryRowSerializer
+
+import org.apache.calcite.plan.{RelOptCost, RelOptPlanner}
+import org.apache.calcite.rel.RelFieldCollation.Direction
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.Window.Group
+import org.apache.calcite.rel.core.{AggregateCall, Calc, Window}
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rel.{RelFieldCollation, RelNode, RelWriter}
+import org.apache.calcite.rex.{RexCall, RexInputRef, RexLiteral, RexNode, 
RexProgram, RexWindowBound}
+import org.apache.calcite.sql.SqlKind
+import org.apache.calcite.util.mapping.IntPair
+
+import java.util
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
+/**
+  * Utility methods for RelNode.
+  */
+object RelNodeUtil {
 
 Review comment:
   I think this utility class should focus on operator name formatting, things 
like compute cost should be operator specific


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] asfgit merged pull request #7923: [FLINK-11703][table-planner-blink] Introduce TableEnvironments and support registerDataStream and sqlQuery

2019-03-08 Thread GitBox
asfgit merged pull request #7923: [FLINK-11703][table-planner-blink] Introduce 
TableEnvironments and support registerDataStream and sqlQuery
URL: https://github.com/apache/flink/pull/7923
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-11703) Introduce TableEnvironment and support registerDataStream and sqlQuery

2019-03-08 Thread Jark Wu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11703?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu closed FLINK-11703.
---
Resolution: Fixed

Fixed in 1.9: 586b890237bc2516fa202bfb8c89f5dea4209143

> Introduce TableEnvironment and support registerDataStream and sqlQuery
> --
>
> Key: FLINK-11703
> URL: https://issues.apache.org/jira/browse/FLINK-11703
> Project: Flink
>  Issue Type: New Feature
>  Components: SQL / Planner
>Reporter: Jark Wu
>Assignee: Jark Wu
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> In order to support plan test, we need the ability to register a DataStream 
> (or TableSource) and perform sql query on it.
> This issue is aiming to introduce a basic and simple TableEnvironment which 
> only support {{registerDataStream}} and {{sqlQuery}}.
> In the context of FLINK-11067 and FLINK-11068 (convert TableEnvironments and 
> Tables to interfaces), the introduced {{TableEnvironment}} in 
> {{table-planner-blink}} is a temporary solution and will be refactored/moved 
> to a {{BlinkPlanner}}, once FLINK-11067, FLINK-11068 is done.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11068) Convert the API classes *Table, *Window to interfaces

2019-03-08 Thread Hequn Cheng (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16787669#comment-16787669
 ] 

Hequn Cheng commented on FLINK-11068:
-

[~twalthr] Hi, I have almost finished this issue. I think it's good to sync 
with you first, then submit the PR once the Expression PR is finished. The main 
issues need to be confirmed with you are as follows:

1. Create only one Table interface in flink-table-api-java and rename the 
current Table to TableImpl. 

2. Considering {{OverWindowedTable window(OverWindow... overWindows)}} in 
Table, the {{WindowedTable window(Window window)}} may be confused to users. 
From the API, it looks like OverWindow is a subclass of Window, however, the 
Window is actually a GroupWindow, thus maybe we can change the api from
{code:java}
WindowedTable window(Window window);
{code}
to
{code:java}
GroupWindowedTable window(GroupWindow groupWindow);
{code}
and the two APIs are as follows(when we put them together) which I think is 
more clear:
{code:java}
GroupWindowedTable window(GroupWindow groupWindow);
OverWindowedTable window(OverWindow... overWindows);
{code}
3.TableConversions.scala and other conversion classes will stay in the current 
folder for now and be moved into the api module once TableEnvironment has been 
converted into an interface.

Would be great to have your suggestions. Thank you!

> Convert the API classes *Table, *Window to interfaces
> -
>
> Key: FLINK-11068
> URL: https://issues.apache.org/jira/browse/FLINK-11068
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Table SQL
>Reporter: Timo Walther
>Assignee: Hequn Cheng
>Priority: Major
>
> A more detailed description can be found in 
> [FLIP-32|https://cwiki.apache.org/confluence/display/FLINK/FLIP-32%3A+Restructure+flink-table+for+future+contributions].
> This includes: Table, GroupedTable, WindowedTable, WindowGroupedTable, 
> OverWindowedTable, Window, OverWindow
> We can keep the "Table" Scala implementation in a planner module until it has 
> been converted to Java.
> We can add a method to the planner later to give us a concrete instance. This 
> is one possibility to have a smooth transition period instead of changing all 
> classes at once.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] wuchong commented on issue #7923: [FLINK-11703][table-planner-blink] Introduce TableEnvironments and support registerDataStream and sqlQuery

2019-03-08 Thread GitBox
wuchong commented on issue #7923: [FLINK-11703][table-planner-blink] Introduce 
TableEnvironments and support registerDataStream and sqlQuery
URL: https://github.com/apache/flink/pull/7923#issuecomment-470849645
 
 
   The failed python test is not related to this PR.
   
   Merging...


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on issue #7941: [FLINK-11858][table-runtime-blink] Introduce block compression to batch table runtime

2019-03-08 Thread GitBox
JingsongLi commented on issue #7941: [FLINK-11858][table-runtime-blink] 
Introduce block compression to batch table runtime
URL: https://github.com/apache/flink/pull/7941#issuecomment-470848379
 
 
   +1 LGTM


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on issue #7941: [FLINK-11858][table-runtime-blink] Introduce block compression to batch table runtime

2019-03-08 Thread GitBox
KurtYoung commented on issue #7941: [FLINK-11858][table-runtime-blink] 
Introduce block compression to batch table runtime
URL: https://github.com/apache/flink/pull/7941#issuecomment-470848112
 
 
   Hi @JingsongLi , thanks for the review, i have addressed your comment.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #7913: [FLINK-11837][table-runtime-blink] Improve internal data format

2019-03-08 Thread GitBox
JingsongLi commented on a change in pull request #7913: 
[FLINK-11837][table-runtime-blink] Improve internal data format
URL: https://github.com/apache/flink/pull/7913#discussion_r263696844
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/JoinedRow.java
 ##
 @@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat;
+
+/**
+ * Join two row to one row.
+ */
+public final class JoinedRow implements BaseRow {
+
+   private BaseRow row1;
+   private BaseRow row2;
+   private byte header;
+
+   public JoinedRow() {}
+
+   public JoinedRow(BaseRow row1, BaseRow row2) {
+   this.row1 = row1;
+   this.row2 = row2;
+   }
+
+   public JoinedRow replace(BaseRow row1, BaseRow row2) {
+   this.row1 = row1;
+   this.row2 = row2;
+   return this;
+   }
+
+   @Override
+   public int getArity() {
+   return row1.getArity() + row2.getArity();
+   }
+
+   @Override
+   public byte getHeader() {
+   return header;
+   }
+
+   @Override
+   public void setHeader(byte header) {
+   this.header = header;
+   }
+
+   @Override
+   public boolean isNullAt(int i) {
+   if (i < row1.getArity()) {
+   return row1.isNullAt(i);
+   } else {
+   return row2.isNullAt(i - row1.getArity());
+   }
+   }
+
+   @Override
+   public boolean getBoolean(int i) {
+   if (i < row1.getArity()) {
+   return row1.getBoolean(i);
+   } else {
+   return row2.getBoolean(i - row1.getArity());
+   }
+   }
+
+   @Override
+   public byte getByte(int i) {
+   if (i < row1.getArity()) {
+   return row1.getByte(i);
+   } else {
+   return row2.getByte(i - row1.getArity());
+   }
+   }
+
+   @Override
+   public short getShort(int i) {
+   if (i < row1.getArity()) {
+   return row1.getShort(i);
+   } else {
+   return row2.getShort(i - row1.getArity());
+   }
+   }
+
+   @Override
+   public int getInt(int i) {
+   if (i < row1.getArity()) {
+   return row1.getInt(i);
+   } else {
+   return row2.getInt(i - row1.getArity());
+   }
+   }
+
+   @Override
+   public long getLong(int i) {
+   if (i < row1.getArity()) {
+   return row1.getLong(i);
+   } else {
+   return row2.getLong(i - row1.getArity());
+   }
+   }
+
+   @Override
+   public float getFloat(int i) {
+   if (i < row1.getArity()) {
+   return row1.getFloat(i);
+   } else {
+   return row2.getFloat(i - row1.getArity());
+   }
+   }
+
+   @Override
+   public double getDouble(int i) {
+   if (i < row1.getArity()) {
+   return row1.getDouble(i);
+   } else {
+   return row2.getDouble(i - row1.getArity());
+   }
+   }
+
+   @Override
+   public char getChar(int i) {
+   if (i < row1.getArity()) {
+   return row1.getChar(i);
+   } else {
+   return row2.getChar(i - row1.getArity());
+   }
+   }
+
+   @Override
+   public Decimal getDecimal(int i, int precision, int scale) {
+   if (i < row1.getArity()) {
+   return row1.getDecimal(i, precision, scale);
+   } else {
+   return row2.getDecimal(i - row1.getArity(), precision, 
scale);
+   }
+   }
+
+   @Override
+   public  BinaryGeneric getGeneric(int i) {
+   if (i < row1.getArity()) {
+