[jira] [Assigned] (FLINK-10531) State TTL RocksDb backend end-to-end test failed on Travis

2018-10-18 Thread Renjie Liu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10531?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Renjie Liu reassigned FLINK-10531:
--

Assignee: Renjie Liu

> State TTL RocksDb backend end-to-end test failed on Travis
> --
>
> Key: FLINK-10531
> URL: https://issues.apache.org/jira/browse/FLINK-10531
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.6.1
>Reporter: Till Rohrmann
>Assignee: Renjie Liu
>Priority: Critical
>  Labels: test-stability
>
> The {{State TTL RocksDb backend end-to-end test}} end-to-end test failed on 
> Travis.
> https://travis-ci.org/apache/flink/jobs/438226190
> https://api.travis-ci.org/v3/job/438226190/log.txt



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10491) Deadlock during spilling data in SpillableSubpartition

2018-10-18 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16656242#comment-16656242
 ] 

ASF GitHub Bot commented on FLINK-10491:


zhijiangW commented on issue #6809: [FLINK-10491][network] Pass BufferPoolOwner 
in the constructor of LocalBufferPool
URL: https://github.com/apache/flink/pull/6809#issuecomment-431236371
 
 
   @pnowojski , thanks for reviews again! 
   
   I have mainly two concerns of tests from comments.
   1. How should we define the scope of this PR? I found another common issue 
that the new added tests may trigger some unreasonable history tests. If we 
solved all the related history tests in this PR, it may be not good for 
convergence for this PR.
   2. How to evaluate the usages of mock, spy, etc in test? I have some 
thoughts in the comments.
   
   BTW, I learned some good ideas from your test reviews, then I think I would 
focus on polishing some existing history tests if have time. :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Deadlock during spilling data in SpillableSubpartition 
> ---
>
> Key: FLINK-10491
> URL: https://issues.apache.org/jira/browse/FLINK-10491
> Project: Flink
>  Issue Type: Bug
>  Components: Network
>Affects Versions: 1.5.4, 1.6.1
>Reporter: Piotr Nowojski
>Assignee: zhijiang
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.5.6, 1.6.3, 1.7.0
>
>
> Originally reported here: 
> [https://lists.apache.org/thread.html/472c8f4a2711c5e217fadd9a88f8c73670218e7432bb81ba3f5076db@%3Cuser.flink.apache.org%3E]
> Thread dump (from 1.5.3 version) showing two deadlocked threads, because they 
> are taking two locks in different order:
> {noformat}
> Thread-1
> "DataSink (DATA#HadoopFileOutputFormat ) (1/2)@11002" prio=5 tid=0x3e2 nid=NA 
> waiting for monitor entry
> waiting for Map (Key Extractor) (1/10)@9967 to release lock on <0x2dfb> (a 
> java.util.ArrayDeque)
> at 
> org.apache.flink.runtime.io.network.partition.SpillableSubpartition.releaseMemory(SpillableSubpartition.java:223)
> at 
> org.apache.flink.runtime.io.network.partition.ResultPartition.releaseMemory(ResultPartition.java:373)
> at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.setNumBuffers(LocalBufferPool.java:355)
> - locked <0x2dfd> (a java.util.ArrayDeque)
> at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.redistributeBuffers(NetworkBufferPool.java:402)
> at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.recycleMemorySegments(NetworkBufferPool.java:203)
> - locked <0x2da5> (a java.lang.Object)
> at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.recycleMemorySegments(NetworkBufferPool.java:193)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.returnExclusiveSegments(SingleInputGate.java:318)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.releaseAllResources(RemoteInputChannel.java:259)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:578)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.pollNextBufferOrEvent(SingleInputGate.java:507)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.waitAndGetNextInputGate(UnionInputGate.java:213)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.getNextBufferOrEvent(UnionInputGate.java:163)
> at 
> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86)
> at 
> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
> at 
> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
> at 
> org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:216)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
> at java.lang.Thread.run(Thread.java:745)
> Thread-2
> "Map (Key Extractor) (1/10)@9967" prio=5 tid=0xaab nid=NA waiting for monitor 
> entry
> java.lang.Thread.State: BLOCKED
> blocks DataSink (DATA#HadoopFileOutputFormat ) (1/2)@11002
> waiting for DataSink (DATA#HadoopFileOutputFormat ) (1/2)@11002 to release 
> lock on <0x2dfd> (a java.util.ArrayDeque)
> at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:261)
> at 
> org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:171)
> at 

[GitHub] zhijiangW commented on issue #6809: [FLINK-10491][network] Pass BufferPoolOwner in the constructor of LocalBufferPool

2018-10-18 Thread GitBox
zhijiangW commented on issue #6809: [FLINK-10491][network] Pass BufferPoolOwner 
in the constructor of LocalBufferPool
URL: https://github.com/apache/flink/pull/6809#issuecomment-431236371
 
 
   @pnowojski , thanks for reviews again! 
   
   I have mainly two concerns of tests from comments.
   1. How should we define the scope of this PR? I found another common issue 
that the new added tests may trigger some unreasonable history tests. If we 
solved all the related history tests in this PR, it may be not good for 
convergence for this PR.
   2. How to evaluate the usages of mock, spy, etc in test? I have some 
thoughts in the comments.
   
   BTW, I learned some good ideas from your test reviews, then I think I would 
focus on polishing some existing history tests if have time. :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10540) Remove legacy FlinkMiniCluster

2018-10-18 Thread TisonKun (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10540?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16656239#comment-16656239
 ] 

TisonKun commented on FLINK-10540:
--

Yes I see. And in testing code its subclasses({{LocalFlinkMiniCluster}} and 
{{TestingCluster}}) are still in use, we might want to get rid of those uses, 
too. Otherwise they become blockers to this.

> Remove legacy FlinkMiniCluster
> --
>
> Key: FLINK-10540
> URL: https://issues.apache.org/jira/browse/FLINK-10540
> Project: Flink
>  Issue Type: Sub-task
>Affects Versions: 1.7.0
>Reporter: TisonKun
>Assignee: Shimin Yang
>Priority: Major
> Fix For: 1.7.0
>
>
> {{FlinkMiniCluster}} is based on legacy cluster mode and should be no longer 
> used.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10527) Cleanup constant isNewMode in YarnTestBase

2018-10-18 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10527?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16656237#comment-16656237
 ] 

ASF GitHub Bot commented on FLINK-10527:


yanghua commented on a change in pull request #6816: [FLINK-10527] Cleanup 
constant isNewMode in YarnTestBase
URL: https://github.com/apache/flink/pull/6816#discussion_r226526901
 
 

 ##
 File path: 
flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
 ##
 @@ -189,9 +189,9 @@ public void perJobYarnClusterOffHeap() throws IOException {
/**
 * Test TaskManager failure and also if the vcores are set correctly 
(see issue FLINK-2213).
 */
+   @Ignore
@Test(timeout = 10) // timeout after 100 seconds
public void testTaskManagerFailure() throws Exception {
-   assumeTrue("The new mode does not start TMs upfront.", 
!isNewMode);
 
 Review comment:
   @tillrohrmann So, is your review suggestion still the same? Do we still need 
to keep the isNewMode field? (About these methods, I have created a [separate 
issue](https://issues.apache.org/jira/browse/FLINK-10558) to track them, I 
think we will not forget them in the end).


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Cleanup constant isNewMode in YarnTestBase
> --
>
> Key: FLINK-10527
> URL: https://issues.apache.org/jira/browse/FLINK-10527
> Project: Flink
>  Issue Type: Sub-task
>  Components: YARN
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> This seems to be a residual problem with FLINK-10396. It is set to true in 
> that PR. Currently it has three usage scenarios:
> 1. assert, caused an error
> {code:java}
> assumeTrue("The new mode does not start TMs upfront.", !isNewMode);
> {code}
> 2. if (!isNewMode) the logic in the block would not have invoked, the if 
> block can be removed
> 3. if (isNewMode) always been invoked, the if statement can be removed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] yanghua commented on a change in pull request #6816: [FLINK-10527] Cleanup constant isNewMode in YarnTestBase

2018-10-18 Thread GitBox
yanghua commented on a change in pull request #6816: [FLINK-10527] Cleanup 
constant isNewMode in YarnTestBase
URL: https://github.com/apache/flink/pull/6816#discussion_r226526901
 
 

 ##
 File path: 
flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
 ##
 @@ -189,9 +189,9 @@ public void perJobYarnClusterOffHeap() throws IOException {
/**
 * Test TaskManager failure and also if the vcores are set correctly 
(see issue FLINK-2213).
 */
+   @Ignore
@Test(timeout = 10) // timeout after 100 seconds
public void testTaskManagerFailure() throws Exception {
-   assumeTrue("The new mode does not start TMs upfront.", 
!isNewMode);
 
 Review comment:
   @tillrohrmann So, is your review suggestion still the same? Do we still need 
to keep the isNewMode field? (About these methods, I have created a [separate 
issue](https://issues.apache.org/jira/browse/FLINK-10558) to track them, I 
think we will not forget them in the end).


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Comment Edited] (FLINK-10540) Remove legacy FlinkMiniCluster

2018-10-18 Thread Shimin Yang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10540?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16656235#comment-16656235
 ] 

Shimin Yang edited comment on FLINK-10540 at 10/19/18 3:37 AM:
---

Look like it's only used by flink-storm example in production code. Better to 
start after storm is removed [~Tison].


was (Author: dangdangdang):
Look like it's only used by flink-storm example. Better to start after storm is 
removed [~Tison].

> Remove legacy FlinkMiniCluster
> --
>
> Key: FLINK-10540
> URL: https://issues.apache.org/jira/browse/FLINK-10540
> Project: Flink
>  Issue Type: Sub-task
>Affects Versions: 1.7.0
>Reporter: TisonKun
>Assignee: Shimin Yang
>Priority: Major
> Fix For: 1.7.0
>
>
> {{FlinkMiniCluster}} is based on legacy cluster mode and should be no longer 
> used.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8098) LeaseExpiredException when using FsStateBackend for checkpointing due to multiple mappers tries to access the same file.

2018-10-18 Thread Paul Lin (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16656236#comment-16656236
 ] 

Paul Lin commented on FLINK-8098:
-

+1 for this issue. We get the same problem on Flink 1.5.3 with CDH-5.6.0. 

> LeaseExpiredException when using FsStateBackend for checkpointing due to 
> multiple mappers tries to access the same file.
> 
>
> Key: FLINK-8098
> URL: https://issues.apache.org/jira/browse/FLINK-8098
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.2
> Environment: Yarn, HDFS 2.7.3, Kafka, scala streaming API, CEP
>Reporter: Shashank Agarwal
>Priority: Major
>
> I am running streaming application with parallelism 6. I have enabled 
> checkpointing(1000). But application gets the crash after 1-2 days. After 
> analysing logs i found following trace. 
> {code}
> 2017-11-17 11:19:06,696 WARN  
> org.apache.flink.streaming.runtime.tasks.StreamTask   - Could not 
> properly clean up the async checkpoint runnable.
> java.lang.Exception: Could not properly cancel managed keyed state future.
>   at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:90)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:1023)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.close(StreamTask.java:983)
>   at org.apache.flink.util.IOUtils.closeQuietly(IOUtils.java:262)
>   at org.apache.flink.util.IOUtils.closeAllQuietly(IOUtils.java:251)
>   at 
> org.apache.flink.util.AbstractCloseableRegistry.close(AbstractCloseableRegistry.java:97)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.cancel(StreamTask.java:355)
>   at 
> org.apache.flink.runtime.taskmanager.Task$TaskCanceler.run(Task.java:1463)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.util.concurrent.ExecutionException: java.io.IOException: 
> Could not flush and close the file system output stream to 
> hdfs://xyz.com:8020/flink/sux/54944cea1f566ee801656e06cdeeabbc/chk-40191/cf145018-0599-4281-b254-96600a4e4965
>  in order to obtain the stream state handle
>   at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>   at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>   at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>   at 
> org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:85)
>   at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:88)
>   ... 8 more
> Caused by: java.io.IOException: Could not flush and close the file system 
> output stream to 
> hdfs://xyz.com:8020/flink/sux/54944cea1f566ee801656e06cdeeabbc/chk-40191/cf145018-0599-4281-b254-96600a4e4965
>  in order to obtain the stream state handle
>   at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:336)
>   at 
> org.apache.flink.runtime.checkpoint.AbstractAsyncSnapshotIOCallable.closeStreamAndGetStateHandle(AbstractAsyncSnapshotIOCallable.java:100)
>   at 
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:351)
>   at 
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:329)
>   at 
> org.apache.flink.runtime.io.async.AbstractAsyncIOCallable.call(AbstractAsyncIOCallable.java:72)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   ... 1 more
> Caused by: 
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException):
>  No lease 
> flink/sux/54944cea1f566ee801656e06cdeeabbc/chk-40191/cf145018-0599-4281-b254-96600a4e4965
>  (inode 812148671): File does not exist. [Lease.  Holder: 
> DFSClient_NONMAPREDUCE_1721510813_94, pendingcreates: 161]
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:3659)
>   at 
> 

[jira] [Commented] (FLINK-10540) Remove legacy FlinkMiniCluster

2018-10-18 Thread Shimin Yang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10540?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16656235#comment-16656235
 ] 

Shimin Yang commented on FLINK-10540:
-

Look like it's only used by flink-storm example. Better to start after storm is 
removed [~Tison].

> Remove legacy FlinkMiniCluster
> --
>
> Key: FLINK-10540
> URL: https://issues.apache.org/jira/browse/FLINK-10540
> Project: Flink
>  Issue Type: Sub-task
>Affects Versions: 1.7.0
>Reporter: TisonKun
>Assignee: Shimin Yang
>Priority: Major
> Fix For: 1.7.0
>
>
> {{FlinkMiniCluster}} is based on legacy cluster mode and should be no longer 
> used.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9970) Add ASCII/CHR function for table/sql API

2018-10-18 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16656234#comment-16656234
 ] 

ASF GitHub Bot commented on FLINK-9970:
---

yanghua commented on a change in pull request #6432: [FLINK-9970] [table] Add 
ASCII/CHR function for table/sql API
URL: https://github.com/apache/flink/pull/6432#discussion_r226526582
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala
 ##
 @@ -266,6 +268,21 @@ object ScalarFunctions {
 regexp_extract(str, regex, 0)
   }
 
+  /**
+* Returns the ASCII code value of the leftmost character of the string str.
+*/
+  def ascii(str: String): Integer = {
+if (str == null || str.equals("")) {
+  0
+} else {
+  if (CharMatcher.ASCII.matches(str.charAt(0))) {
+str.charAt(0).toByte.toInt
+  } else {
+0
 
 Review comment:
   Hi @xccui I tried to change the value of this method that originally 
returned 0 to null, but both related tests throw NPE, I don't know if it is 
related to Integer. Can you try it?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add ASCII/CHR function for table/sql API
> 
>
> Key: FLINK-9970
> URL: https://issues.apache.org/jira/browse/FLINK-9970
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Minor
>  Labels: pull-request-available
>
> for ASCII function : 
> refer to : 
> [https://dev.mysql.com/doc/refman/8.0/en/string-functions.html#function_ascii]
> for CHR function : 
> This function convert ASCII code to a character,
> refer to : [https://doc.ispirer.com/sqlways/Output/SQLWays-1-071.html]
> Considering "CHAR" always is a keyword in many database, so we use "CHR" 
> keyword.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] yanghua commented on a change in pull request #6432: [FLINK-9970] [table] Add ASCII/CHR function for table/sql API

2018-10-18 Thread GitBox
yanghua commented on a change in pull request #6432: [FLINK-9970] [table] Add 
ASCII/CHR function for table/sql API
URL: https://github.com/apache/flink/pull/6432#discussion_r226526582
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala
 ##
 @@ -266,6 +268,21 @@ object ScalarFunctions {
 regexp_extract(str, regex, 0)
   }
 
+  /**
+* Returns the ASCII code value of the leftmost character of the string str.
+*/
+  def ascii(str: String): Integer = {
+if (str == null || str.equals("")) {
+  0
+} else {
+  if (CharMatcher.ASCII.matches(str.charAt(0))) {
+str.charAt(0).toByte.toInt
+  } else {
+0
 
 Review comment:
   Hi @xccui I tried to change the value of this method that originally 
returned 0 to null, but both related tests throw NPE, I don't know if it is 
related to Integer. Can you try it?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10491) Deadlock during spilling data in SpillableSubpartition

2018-10-18 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16656231#comment-16656231
 ] 

ASF GitHub Bot commented on FLINK-10491:


zhijiangW commented on a change in pull request #6809: [FLINK-10491][network] 
Pass BufferPoolOwner in the constructor of LocalBufferPool
URL: https://github.com/apache/flink/pull/6809#discussion_r226525858
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
 ##
 @@ -205,6 +216,66 @@ protected void testAddOnPartition(final 
ResultPartitionType pipelined)
}
}
 
+   @Test
+   public void testReleaseMemoryOnBlockingPartition() throws Exception {
+   testReleaseMemory(ResultPartitionType.BLOCKING);
+   }
+
+   @Test
+   public void testReleaseMemoryOnPipelinedPartition() throws Exception {
+   testReleaseMemory(ResultPartitionType.PIPELINED);
+   }
+
+   /**
+* Tests {@link ResultPartition#releaseMemory(int)} on a working 
partition.
+*
+* @param resultPartitionType the result partition type to set up
+*/
+   private void testReleaseMemory(final ResultPartitionType 
resultPartitionType) throws Exception {
+   final int numBuffers = 10;
+   final NetworkEnvironment network = new NetworkEnvironment(
+   new NetworkBufferPool(numBuffers, 128),
+   new LocalConnectionManager(),
+   new ResultPartitionManager(),
+   new TaskEventDispatcher(),
+   new KvStateRegistry(),
+   null,
+   null,
+   IOManager.IOMode.SYNC,
+   0,
+   0,
+   2,
+   8,
+   true);
+   final ResultPartitionConsumableNotifier notifier = 
mock(ResultPartitionConsumableNotifier.class);
+   final ResultPartition resultPartition = 
spy(createPartition(notifier, resultPartitionType, false));
 
 Review comment:
   I also considered the `spy` usage here may bring some concerns and thought 
of the way you mentioned to avoid the `spy`. I should explain the reasons why 
using `spy` finally.
   
   I think there should have two separate tests for verifying different logics 
in two dimessions:
   1. Whether the `ResultPartition` is assigned as `BufferPoolOwner` correctly 
during creating `BufferPool` for different types, and this test is for 
verifying this relationship.
   
   2. Verify the logic of interface implementation for 
`BufferPoolOwner#releaseMemory` which is also missing currently, and I 
mentioned this in previous comments. But I think it is not the scope of this PR 
and I am willing to open a JIRA for it. At the beginning I also think the first 
verify actually does not belong to the scope of this PR, just because of the 
careless of migrating history codes to find history missing tests. 
   
   So if we want to check the state of `NetworkBufferPool` to avoid `spy` here, 
we have to touch the detail logic of `BufferPoolOwner#releaseMemory` mentioned 
in second part. And the first part is enough for this PR to avoid the mistake 
of assigning `BufferPoolOwner`. What do you think?
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Deadlock during spilling data in SpillableSubpartition 
> ---
>
> Key: FLINK-10491
> URL: https://issues.apache.org/jira/browse/FLINK-10491
> Project: Flink
>  Issue Type: Bug
>  Components: Network
>Affects Versions: 1.5.4, 1.6.1
>Reporter: Piotr Nowojski
>Assignee: zhijiang
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.5.6, 1.6.3, 1.7.0
>
>
> Originally reported here: 
> [https://lists.apache.org/thread.html/472c8f4a2711c5e217fadd9a88f8c73670218e7432bb81ba3f5076db@%3Cuser.flink.apache.org%3E]
> Thread dump (from 1.5.3 version) showing two deadlocked threads, because they 
> are taking two locks in different order:
> {noformat}
> Thread-1
> "DataSink (DATA#HadoopFileOutputFormat ) (1/2)@11002" prio=5 tid=0x3e2 nid=NA 
> waiting for monitor entry
> waiting for Map (Key Extractor) (1/10)@9967 to release lock on <0x2dfb> (a 
> java.util.ArrayDeque)
> at 
> org.apache.flink.runtime.io.network.partition.SpillableSubpartition.releaseMemory(SpillableSubpartition.java:223)
> at 
> 

[GitHub] zhijiangW commented on a change in pull request #6809: [FLINK-10491][network] Pass BufferPoolOwner in the constructor of LocalBufferPool

2018-10-18 Thread GitBox
zhijiangW commented on a change in pull request #6809: [FLINK-10491][network] 
Pass BufferPoolOwner in the constructor of LocalBufferPool
URL: https://github.com/apache/flink/pull/6809#discussion_r226525858
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
 ##
 @@ -205,6 +216,66 @@ protected void testAddOnPartition(final 
ResultPartitionType pipelined)
}
}
 
+   @Test
+   public void testReleaseMemoryOnBlockingPartition() throws Exception {
+   testReleaseMemory(ResultPartitionType.BLOCKING);
+   }
+
+   @Test
+   public void testReleaseMemoryOnPipelinedPartition() throws Exception {
+   testReleaseMemory(ResultPartitionType.PIPELINED);
+   }
+
+   /**
+* Tests {@link ResultPartition#releaseMemory(int)} on a working 
partition.
+*
+* @param resultPartitionType the result partition type to set up
+*/
+   private void testReleaseMemory(final ResultPartitionType 
resultPartitionType) throws Exception {
+   final int numBuffers = 10;
+   final NetworkEnvironment network = new NetworkEnvironment(
+   new NetworkBufferPool(numBuffers, 128),
+   new LocalConnectionManager(),
+   new ResultPartitionManager(),
+   new TaskEventDispatcher(),
+   new KvStateRegistry(),
+   null,
+   null,
+   IOManager.IOMode.SYNC,
+   0,
+   0,
+   2,
+   8,
+   true);
+   final ResultPartitionConsumableNotifier notifier = 
mock(ResultPartitionConsumableNotifier.class);
+   final ResultPartition resultPartition = 
spy(createPartition(notifier, resultPartitionType, false));
 
 Review comment:
   I also considered the `spy` usage here may bring some concerns and thought 
of the way you mentioned to avoid the `spy`. I should explain the reasons why 
using `spy` finally.
   
   I think there should have two separate tests for verifying different logics 
in two dimessions:
   1. Whether the `ResultPartition` is assigned as `BufferPoolOwner` correctly 
during creating `BufferPool` for different types, and this test is for 
verifying this relationship.
   
   2. Verify the logic of interface implementation for 
`BufferPoolOwner#releaseMemory` which is also missing currently, and I 
mentioned this in previous comments. But I think it is not the scope of this PR 
and I am willing to open a JIRA for it. At the beginning I also think the first 
verify actually does not belong to the scope of this PR, just because of the 
careless of migrating history codes to find history missing tests. 
   
   So if we want to check the state of `NetworkBufferPool` to avoid `spy` here, 
we have to touch the detail logic of `BufferPoolOwner#releaseMemory` mentioned 
in second part. And the first part is enough for this PR to avoid the mistake 
of assigning `BufferPoolOwner`. What do you think?
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-8483) Implement and expose outer joins

2018-10-18 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16656229#comment-16656229
 ] 

ASF GitHub Bot commented on FLINK-8483:
---

xccui commented on issue #6874: [FLINK-8483][DataStream] Implement and expose 
outer joins
URL: https://github.com/apache/flink/pull/6874#issuecomment-431232789
 
 
   
   Hi @florianschmidt1994 , thanks for working on this!
   
   Before looking into the code, I'd like to share two thoughts.
   
   1. I think there's no need to register a clean-up timer for each record. In 
[TimeBoundedJoin](https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamJoin.scala),
 only the first timer is registered in the `processElement()` method and 
following timers are registered in a "chained manner" after each cleaning-up.
   
   2. Without holding back watermarks, the join results in the downstream 
operators may be taken as outdated (according to the different mechanisms for 
rowtime propagation), which makes the join operation less applicable. What do 
you think?
   
   Best, Xingcan


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Implement and expose outer joins
> 
>
> Key: FLINK-8483
> URL: https://issues.apache.org/jira/browse/FLINK-8483
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Florian Schmidt
>Assignee: Florian Schmidt
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] xccui commented on issue #6874: [FLINK-8483][DataStream] Implement and expose outer joins

2018-10-18 Thread GitBox
xccui commented on issue #6874: [FLINK-8483][DataStream] Implement and expose 
outer joins
URL: https://github.com/apache/flink/pull/6874#issuecomment-431232789
 
 
   
   Hi @florianschmidt1994 , thanks for working on this!
   
   Before looking into the code, I'd like to share two thoughts.
   
   1. I think there's no need to register a clean-up timer for each record. In 
[TimeBoundedJoin](https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TimeBoundedStreamJoin.scala),
 only the first timer is registered in the `processElement()` method and 
following timers are registered in a "chained manner" after each cleaning-up.
   
   2. Without holding back watermarks, the join results in the downstream 
operators may be taken as outdated (according to the different mechanisms for 
rowtime propagation), which makes the join operation less applicable. What do 
you think?
   
   Best, Xingcan


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sunjincheng121 commented on a change in pull request #6873: [hotfix] Add Review Progress section to PR description template.

2018-10-18 Thread GitBox
sunjincheng121 commented on a change in pull request #6873: [hotfix] Add Review 
Progress section to PR description template.
URL: https://github.com/apache/flink/pull/6873#discussion_r226525062
 
 

 ##
 File path: .github/PULL_REQUEST_TEMPLATE.md
 ##
 @@ -70,3 +70,17 @@ This change added tests and can be verified as follows:
 
   - Does this pull request introduce a new feature? (yes / no)
   - If yes, how is the feature documented? (not applicable / docs / JavaDocs / 
not documented)
+
+
+
+# Review Progress
+
+**NOTE: THE REVIEW PROGRESS MUST ONLY BE UPDATED BY AN APACHE FLINK 
COMMITTER!**
+
+* [ ] 1. The contribution is well-described.
+* [ ] 2. There is consensus that the contribution should go into to Flink.
+* [ ] 3. [Does not need specific attention | Needs specific attention for X | 
Has attention for X by Y]
 
 Review comment:
   About [3], If the committer identifies [Needs specific attention for X], 
what do we need to do with PR?
   From the points of my view, the committer who change the review process, 
have to make sure all the specific attention parts are solved.So the [3] should 
be described : [ ] 3. All the specific attention parts are well-solved.
What do you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10540) Remove legacy FlinkMiniCluster

2018-10-18 Thread TisonKun (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10540?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16656218#comment-16656218
 ] 

TisonKun commented on FLINK-10540:
--

Great take [~dangdangdang]!

> Remove legacy FlinkMiniCluster
> --
>
> Key: FLINK-10540
> URL: https://issues.apache.org/jira/browse/FLINK-10540
> Project: Flink
>  Issue Type: Sub-task
>Affects Versions: 1.7.0
>Reporter: TisonKun
>Assignee: Shimin Yang
>Priority: Major
> Fix For: 1.7.0
>
>
> {{FlinkMiniCluster}} is based on legacy cluster mode and should be no longer 
> used.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10491) Deadlock during spilling data in SpillableSubpartition

2018-10-18 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16656215#comment-16656215
 ] 

ASF GitHub Bot commented on FLINK-10491:


zhijiangW commented on a change in pull request #6809: [FLINK-10491][network] 
Pass BufferPoolOwner in the constructor of LocalBufferPool
URL: https://github.com/apache/flink/pull/6809#discussion_r226523253
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
 ##
 @@ -205,6 +216,66 @@ protected void testAddOnPartition(final 
ResultPartitionType pipelined)
}
}
 
+   @Test
+   public void testReleaseMemoryOnBlockingPartition() throws Exception {
+   testReleaseMemory(ResultPartitionType.BLOCKING);
+   }
+
+   @Test
+   public void testReleaseMemoryOnPipelinedPartition() throws Exception {
+   testReleaseMemory(ResultPartitionType.PIPELINED);
+   }
+
+   /**
+* Tests {@link ResultPartition#releaseMemory(int)} on a working 
partition.
+*
+* @param resultPartitionType the result partition type to set up
+*/
+   private void testReleaseMemory(final ResultPartitionType 
resultPartitionType) throws Exception {
+   final int numBuffers = 10;
+   final NetworkEnvironment network = new NetworkEnvironment(
+   new NetworkBufferPool(numBuffers, 128),
+   new LocalConnectionManager(),
+   new ResultPartitionManager(),
+   new TaskEventDispatcher(),
+   new KvStateRegistry(),
+   null,
+   null,
+   IOManager.IOMode.SYNC,
+   0,
+   0,
+   2,
+   8,
+   true);
+   final ResultPartitionConsumableNotifier notifier = 
mock(ResultPartitionConsumableNotifier.class);
 
 Review comment:
   I checked these two implementations and they are exactly the same with empty 
operation, so I think we should open a separate JIRA for unifying current two 
existing implementations and make it public for outer usage.
   
   Considering the mock issue, I have some concerns:
   
   1. Do you think there are any differences between mock and empty operations 
in real class? 
   
   2. There are already many usages of mock in existing tests, do you think it 
is necessary to polish them in real classes? If so, I am willing to open JIRA 
for them. :)
   
   3. I think the unit tests are mainly for verifying partial logics or 
processes, so if the behaviors of some classes are not cared about in the 
target test, we can mock these objects to make simple. Otherwise we can develop 
the ITCase for end-to-end verify with all real classes.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Deadlock during spilling data in SpillableSubpartition 
> ---
>
> Key: FLINK-10491
> URL: https://issues.apache.org/jira/browse/FLINK-10491
> Project: Flink
>  Issue Type: Bug
>  Components: Network
>Affects Versions: 1.5.4, 1.6.1
>Reporter: Piotr Nowojski
>Assignee: zhijiang
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.5.6, 1.6.3, 1.7.0
>
>
> Originally reported here: 
> [https://lists.apache.org/thread.html/472c8f4a2711c5e217fadd9a88f8c73670218e7432bb81ba3f5076db@%3Cuser.flink.apache.org%3E]
> Thread dump (from 1.5.3 version) showing two deadlocked threads, because they 
> are taking two locks in different order:
> {noformat}
> Thread-1
> "DataSink (DATA#HadoopFileOutputFormat ) (1/2)@11002" prio=5 tid=0x3e2 nid=NA 
> waiting for monitor entry
> waiting for Map (Key Extractor) (1/10)@9967 to release lock on <0x2dfb> (a 
> java.util.ArrayDeque)
> at 
> org.apache.flink.runtime.io.network.partition.SpillableSubpartition.releaseMemory(SpillableSubpartition.java:223)
> at 
> org.apache.flink.runtime.io.network.partition.ResultPartition.releaseMemory(ResultPartition.java:373)
> at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.setNumBuffers(LocalBufferPool.java:355)
> - locked <0x2dfd> (a java.util.ArrayDeque)
> at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.redistributeBuffers(NetworkBufferPool.java:402)
> at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.recycleMemorySegments(NetworkBufferPool.java:203)
> - locked <0x2da5> (a 

[GitHub] zhijiangW commented on a change in pull request #6809: [FLINK-10491][network] Pass BufferPoolOwner in the constructor of LocalBufferPool

2018-10-18 Thread GitBox
zhijiangW commented on a change in pull request #6809: [FLINK-10491][network] 
Pass BufferPoolOwner in the constructor of LocalBufferPool
URL: https://github.com/apache/flink/pull/6809#discussion_r226523253
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
 ##
 @@ -205,6 +216,66 @@ protected void testAddOnPartition(final 
ResultPartitionType pipelined)
}
}
 
+   @Test
+   public void testReleaseMemoryOnBlockingPartition() throws Exception {
+   testReleaseMemory(ResultPartitionType.BLOCKING);
+   }
+
+   @Test
+   public void testReleaseMemoryOnPipelinedPartition() throws Exception {
+   testReleaseMemory(ResultPartitionType.PIPELINED);
+   }
+
+   /**
+* Tests {@link ResultPartition#releaseMemory(int)} on a working 
partition.
+*
+* @param resultPartitionType the result partition type to set up
+*/
+   private void testReleaseMemory(final ResultPartitionType 
resultPartitionType) throws Exception {
+   final int numBuffers = 10;
+   final NetworkEnvironment network = new NetworkEnvironment(
+   new NetworkBufferPool(numBuffers, 128),
+   new LocalConnectionManager(),
+   new ResultPartitionManager(),
+   new TaskEventDispatcher(),
+   new KvStateRegistry(),
+   null,
+   null,
+   IOManager.IOMode.SYNC,
+   0,
+   0,
+   2,
+   8,
+   true);
+   final ResultPartitionConsumableNotifier notifier = 
mock(ResultPartitionConsumableNotifier.class);
 
 Review comment:
   I checked these two implementations and they are exactly the same with empty 
operation, so I think we should open a separate JIRA for unifying current two 
existing implementations and make it public for outer usage.
   
   Considering the mock issue, I have some concerns:
   
   1. Do you think there are any differences between mock and empty operations 
in real class? 
   
   2. There are already many usages of mock in existing tests, do you think it 
is necessary to polish them in real classes? If so, I am willing to open JIRA 
for them. :)
   
   3. I think the unit tests are mainly for verifying partial logics or 
processes, so if the behaviors of some classes are not cared about in the 
target test, we can mock these objects to make simple. Otherwise we can develop 
the ITCase for end-to-end verify with all real classes.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Assigned] (FLINK-10540) Remove legacy FlinkMiniCluster

2018-10-18 Thread Shimin Yang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10540?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shimin Yang reassigned FLINK-10540:
---

Assignee: Shimin Yang

> Remove legacy FlinkMiniCluster
> --
>
> Key: FLINK-10540
> URL: https://issues.apache.org/jira/browse/FLINK-10540
> Project: Flink
>  Issue Type: Sub-task
>Affects Versions: 1.7.0
>Reporter: TisonKun
>Assignee: Shimin Yang
>Priority: Major
> Fix For: 1.7.0
>
>
> {{FlinkMiniCluster}} is based on legacy cluster mode and should be no longer 
> used.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10491) Deadlock during spilling data in SpillableSubpartition

2018-10-18 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16656201#comment-16656201
 ] 

ASF GitHub Bot commented on FLINK-10491:


zhijiangW commented on a change in pull request #6809: [FLINK-10491][network] 
Pass BufferPoolOwner in the constructor of LocalBufferPool
URL: https://github.com/apache/flink/pull/6809#discussion_r226520720
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
 ##
 @@ -205,6 +216,66 @@ protected void testAddOnPartition(final 
ResultPartitionType pipelined)
}
}
 
+   @Test
+   public void testReleaseMemoryOnBlockingPartition() throws Exception {
+   testReleaseMemory(ResultPartitionType.BLOCKING);
+   }
+
+   @Test
+   public void testReleaseMemoryOnPipelinedPartition() throws Exception {
+   testReleaseMemory(ResultPartitionType.PIPELINED);
+   }
+
+   /**
+* Tests {@link ResultPartition#releaseMemory(int)} on a working 
partition.
+*
+* @param resultPartitionType the result partition type to set up
+*/
+   private void testReleaseMemory(final ResultPartitionType 
resultPartitionType) throws Exception {
+   final int numBuffers = 10;
+   final NetworkEnvironment network = new NetworkEnvironment(
 
 Review comment:
   Yes, I agree with the option of providing a `NetworkEnvironmentBuilder` 
nested in `NetworkEnvironment`. 
   But I think it is worth opening a separate JIRA for unifying all the related 
history usages, because it is not caused by this PR, and this PR would be based 
on that new JIRA. What do you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Deadlock during spilling data in SpillableSubpartition 
> ---
>
> Key: FLINK-10491
> URL: https://issues.apache.org/jira/browse/FLINK-10491
> Project: Flink
>  Issue Type: Bug
>  Components: Network
>Affects Versions: 1.5.4, 1.6.1
>Reporter: Piotr Nowojski
>Assignee: zhijiang
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.5.6, 1.6.3, 1.7.0
>
>
> Originally reported here: 
> [https://lists.apache.org/thread.html/472c8f4a2711c5e217fadd9a88f8c73670218e7432bb81ba3f5076db@%3Cuser.flink.apache.org%3E]
> Thread dump (from 1.5.3 version) showing two deadlocked threads, because they 
> are taking two locks in different order:
> {noformat}
> Thread-1
> "DataSink (DATA#HadoopFileOutputFormat ) (1/2)@11002" prio=5 tid=0x3e2 nid=NA 
> waiting for monitor entry
> waiting for Map (Key Extractor) (1/10)@9967 to release lock on <0x2dfb> (a 
> java.util.ArrayDeque)
> at 
> org.apache.flink.runtime.io.network.partition.SpillableSubpartition.releaseMemory(SpillableSubpartition.java:223)
> at 
> org.apache.flink.runtime.io.network.partition.ResultPartition.releaseMemory(ResultPartition.java:373)
> at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.setNumBuffers(LocalBufferPool.java:355)
> - locked <0x2dfd> (a java.util.ArrayDeque)
> at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.redistributeBuffers(NetworkBufferPool.java:402)
> at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.recycleMemorySegments(NetworkBufferPool.java:203)
> - locked <0x2da5> (a java.lang.Object)
> at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.recycleMemorySegments(NetworkBufferPool.java:193)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.returnExclusiveSegments(SingleInputGate.java:318)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.releaseAllResources(RemoteInputChannel.java:259)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:578)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.pollNextBufferOrEvent(SingleInputGate.java:507)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.waitAndGetNextInputGate(UnionInputGate.java:213)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.getNextBufferOrEvent(UnionInputGate.java:163)
> at 
> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86)
> at 
> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
> at 
> 

[GitHub] zhijiangW commented on a change in pull request #6809: [FLINK-10491][network] Pass BufferPoolOwner in the constructor of LocalBufferPool

2018-10-18 Thread GitBox
zhijiangW commented on a change in pull request #6809: [FLINK-10491][network] 
Pass BufferPoolOwner in the constructor of LocalBufferPool
URL: https://github.com/apache/flink/pull/6809#discussion_r226520720
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
 ##
 @@ -205,6 +216,66 @@ protected void testAddOnPartition(final 
ResultPartitionType pipelined)
}
}
 
+   @Test
+   public void testReleaseMemoryOnBlockingPartition() throws Exception {
+   testReleaseMemory(ResultPartitionType.BLOCKING);
+   }
+
+   @Test
+   public void testReleaseMemoryOnPipelinedPartition() throws Exception {
+   testReleaseMemory(ResultPartitionType.PIPELINED);
+   }
+
+   /**
+* Tests {@link ResultPartition#releaseMemory(int)} on a working 
partition.
+*
+* @param resultPartitionType the result partition type to set up
+*/
+   private void testReleaseMemory(final ResultPartitionType 
resultPartitionType) throws Exception {
+   final int numBuffers = 10;
+   final NetworkEnvironment network = new NetworkEnvironment(
 
 Review comment:
   Yes, I agree with the option of providing a `NetworkEnvironmentBuilder` 
nested in `NetworkEnvironment`. 
   But I think it is worth opening a separate JIRA for unifying all the related 
history usages, because it is not caused by this PR, and this PR would be based 
on that new JIRA. What do you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9970) Add ASCII/CHR function for table/sql API

2018-10-18 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16656200#comment-16656200
 ] 

ASF GitHub Bot commented on FLINK-9970:
---

yanghua commented on a change in pull request #6432: [FLINK-9970] [table] Add 
ASCII/CHR function for table/sql API
URL: https://github.com/apache/flink/pull/6432#discussion_r226520617
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala
 ##
 @@ -252,6 +252,17 @@ object ScalarFunctions {
 regexp_extract(str, regex, 0)
   }
 
+  /**
+* Returns a numeric value of the leftmost character of the string str.
+*/
+  def ascii(str: String): Integer = {
+if (str == null || str.equals("")) {
+  0
+} else {
+  str.charAt(0).toByte.toInt
 
 Review comment:
   I think it is really necessary to add an ASCII judgment here. I 
misunderstood what you mean. I thought you were talking about [this issue]( 
https://github.com/apache/flink/pull/6432#discussion_r205512482).
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add ASCII/CHR function for table/sql API
> 
>
> Key: FLINK-9970
> URL: https://issues.apache.org/jira/browse/FLINK-9970
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Minor
>  Labels: pull-request-available
>
> for ASCII function : 
> refer to : 
> [https://dev.mysql.com/doc/refman/8.0/en/string-functions.html#function_ascii]
> for CHR function : 
> This function convert ASCII code to a character,
> refer to : [https://doc.ispirer.com/sqlways/Output/SQLWays-1-071.html]
> Considering "CHAR" always is a keyword in many database, so we use "CHR" 
> keyword.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] yanghua commented on a change in pull request #6432: [FLINK-9970] [table] Add ASCII/CHR function for table/sql API

2018-10-18 Thread GitBox
yanghua commented on a change in pull request #6432: [FLINK-9970] [table] Add 
ASCII/CHR function for table/sql API
URL: https://github.com/apache/flink/pull/6432#discussion_r226520617
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala
 ##
 @@ -252,6 +252,17 @@ object ScalarFunctions {
 regexp_extract(str, regex, 0)
   }
 
+  /**
+* Returns a numeric value of the leftmost character of the string str.
+*/
+  def ascii(str: String): Integer = {
+if (str == null || str.equals("")) {
+  0
+} else {
+  str.charAt(0).toByte.toInt
 
 Review comment:
   I think it is really necessary to add an ASCII judgment here. I 
misunderstood what you mean. I thought you were talking about [this issue]( 
https://github.com/apache/flink/pull/6432#discussion_r205512482).
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10166) Dependency problems when executing SQL query in sql-client

2018-10-18 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10166?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16656183#comment-16656183
 ] 

ASF GitHub Bot commented on FLINK-10166:


leanken commented on issue #6863: [FLINK-10166][table] Replace 
commons.codec.binary.Base64 with java.util.Base64
URL: https://github.com/apache/flink/pull/6863#issuecomment-431223615
 
 
   let me fix TestCase build break first.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Dependency problems when executing SQL query in sql-client
> --
>
> Key: FLINK-10166
> URL: https://issues.apache.org/jira/browse/FLINK-10166
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.6.0
>Reporter: Dawid Wysakowicz
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> When tried to run query:
> {code}
> select count(distinct name) from (Values ('a'), ('b')) AS NameTable(name)
> {code}
> in {{sql-client.sh}} I got:
> {code}
> [ERROR] Could not execute SQL statement. Reason:
> org.codehaus.commons.compiler.CompileException: Line 43, Column 10: Unknown 
> variable or type "org.apache.commons.codec.binary.Base64"
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] leanken commented on issue #6863: [FLINK-10166][table] Replace commons.codec.binary.Base64 with java.util.Base64

2018-10-18 Thread GitBox
leanken commented on issue #6863: [FLINK-10166][table] Replace 
commons.codec.binary.Base64 with java.util.Base64
URL: https://github.com/apache/flink/pull/6863#issuecomment-431223615
 
 
   let me fix TestCase build break first.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] XuQianJin-Stars commented on a change in pull request #6823: [FLINK-10134] UTF-16 support for TextInputFormat bug refixed

2018-10-18 Thread GitBox
XuQianJin-Stars commented on a change in pull request #6823: [FLINK-10134] 
UTF-16 support for TextInputFormat bug refixed
URL: https://github.com/apache/flink/pull/6823#discussion_r226516362
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
 ##
 @@ -472,6 +498,7 @@ public void open(FileInputSplit split) throws IOException {
 
this.offset = splitStart;
if (this.splitStart != 0) {
+   setBomFileCharset(split);
 
 Review comment:
   I have two questions about this commit, as follows:
   For the first suggestion, I feel that users often cannot know the encoding 
of the file accurately. For example: file encoding `UTF-16LE`, with bom header, 
user-specified encoding `UTF-16BE` will report an error. And there is bom UTF 
with bom encoding I believe will be the majority. So I think it is necessary to 
do the bom code detection first, which is better for the user experience.
   For the fourth recommendation, the seek of `GenericCsvInputFormat` cannot be 
seek to position 0. It calls the `seek` method of `InputStreamFSInputWrapper`. 
This method cannot currently seek to position 0.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10134) UTF-16 support for TextInputFormat

2018-10-18 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16656173#comment-16656173
 ] 

ASF GitHub Bot commented on FLINK-10134:


XuQianJin-Stars commented on a change in pull request #6823: [FLINK-10134] 
UTF-16 support for TextInputFormat bug refixed
URL: https://github.com/apache/flink/pull/6823#discussion_r226516362
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
 ##
 @@ -472,6 +498,7 @@ public void open(FileInputSplit split) throws IOException {
 
this.offset = splitStart;
if (this.splitStart != 0) {
+   setBomFileCharset(split);
 
 Review comment:
   I have two questions about this commit, as follows:
   For the first suggestion, I feel that users often cannot know the encoding 
of the file accurately. For example: file encoding `UTF-16LE`, with bom header, 
user-specified encoding `UTF-16BE` will report an error. And there is bom UTF 
with bom encoding I believe will be the majority. So I think it is necessary to 
do the bom code detection first, which is better for the user experience.
   For the fourth recommendation, the seek of `GenericCsvInputFormat` cannot be 
seek to position 0. It calls the `seek` method of `InputStreamFSInputWrapper`. 
This method cannot currently seek to position 0.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> UTF-16 support for TextInputFormat
> --
>
> Key: FLINK-10134
> URL: https://issues.apache.org/jira/browse/FLINK-10134
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.4.2
>Reporter: David Dreyfus
>Priority: Critical
>  Labels: pull-request-available
>
> It does not appear that Flink supports a charset encoding of "UTF-16". It 
> particular, it doesn't appear that Flink consumes the Byte Order Mark (BOM) 
> to establish whether a UTF-16 file is UTF-16LE or UTF-16BE.
>  
> TextInputFormat.setCharset("UTF-16") calls DelimitedInputFormat.setCharset(), 
> which sets TextInputFormat.charsetName and then modifies the previously set 
> delimiterString to construct the proper byte string encoding of the the 
> delimiter. This same charsetName is also used in TextInputFormat.readRecord() 
> to interpret the bytes read from the file.
>  
> There are two problems that this implementation would seem to have when using 
> UTF-16.
>  # delimiterString.getBytes(getCharset()) in DelimitedInputFormat.java will 
> return a Big Endian byte sequence including the Byte Order Mark (BOM). The 
> actual text file will not contain a BOM at each line ending, so the delimiter 
> will never be read. Moreover, if the actual byte encoding of the file is 
> Little Endian, the bytes will be interpreted incorrectly.
>  # TextInputFormat.readRecord() will not see a BOM each time it decodes a 
> byte sequence with the String(bytes, offset, numBytes, charset) call. 
> Therefore, it will assume Big Endian, which may not always be correct. [1] 
> [https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java#L95]
>  
> While there are likely many solutions, I would think that all of them would 
> have to start by reading the BOM from the file when a Split is opened and 
> then using that BOM to modify the specified encoding to a BOM specific one 
> when the caller doesn't specify one, and to overwrite the caller's 
> specification if the BOM is in conflict with the caller's specification. That 
> is, if the BOM indicates Little Endian and the caller indicates UTF-16BE, 
> Flink should rewrite the charsetName as UTF-16LE.
>  I hope this makes sense and that I haven't been testing incorrectly or 
> misreading the code.
>  
> I've verified the problem on version 1.4.2. I believe the problem exists on 
> all versions. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] XuQianJin-Stars commented on a change in pull request #6823: [FLINK-10134] UTF-16 support for TextInputFormat bug refixed

2018-10-18 Thread GitBox
XuQianJin-Stars commented on a change in pull request #6823: [FLINK-10134] 
UTF-16 support for TextInputFormat bug refixed
URL: https://github.com/apache/flink/pull/6823#discussion_r226516362
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
 ##
 @@ -472,6 +498,7 @@ public void open(FileInputSplit split) throws IOException {
 
this.offset = splitStart;
if (this.splitStart != 0) {
+   setBomFileCharset(split);
 
 Review comment:
   I have two questions about this commit, as follows:
   For the first suggestion, I feel that users often cannot know the encoding 
of the file accurately. For example: file encoding `UTF-16LE`, with bom header, 
user-specified encoding `UTF-16BE` will report an error. And there is bom UTF 
encoding I believe will be the majority. So I think it is necessary to do the 
bom code detection first, which is better for the user experience.
   For the fourth recommendation, the seek of `GenericCsvInputFormat` cannot be 
seek to position 0. It calls the `seek` method of `InputStreamFSInputWrapper`. 
This method cannot currently seek to position 0.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10134) UTF-16 support for TextInputFormat

2018-10-18 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16656161#comment-16656161
 ] 

ASF GitHub Bot commented on FLINK-10134:


XuQianJin-Stars commented on a change in pull request #6823: [FLINK-10134] 
UTF-16 support for TextInputFormat bug refixed
URL: https://github.com/apache/flink/pull/6823#discussion_r226516362
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
 ##
 @@ -472,6 +498,7 @@ public void open(FileInputSplit split) throws IOException {
 
this.offset = splitStart;
if (this.splitStart != 0) {
+   setBomFileCharset(split);
 
 Review comment:
   I have two questions about this commit, as follows:
   For the first suggestion, I feel that users often cannot know the encoding 
of the file accurately. For example: file encoding `UTF-16LE`, with bom header, 
user-specified encoding `UTF-16BE` will report an error. And there is bom UTF 
encoding I believe will be the majority. So I think it is necessary to do the 
bom code detection first, which is better for the user experience.
   For the fourth recommendation, the seek of `GenericCsvInputFormat` cannot be 
seek to position 0. It calls the `seek` method of `InputStreamFSInputWrapper`. 
This method cannot currently seek to position 0.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> UTF-16 support for TextInputFormat
> --
>
> Key: FLINK-10134
> URL: https://issues.apache.org/jira/browse/FLINK-10134
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.4.2
>Reporter: David Dreyfus
>Priority: Critical
>  Labels: pull-request-available
>
> It does not appear that Flink supports a charset encoding of "UTF-16". It 
> particular, it doesn't appear that Flink consumes the Byte Order Mark (BOM) 
> to establish whether a UTF-16 file is UTF-16LE or UTF-16BE.
>  
> TextInputFormat.setCharset("UTF-16") calls DelimitedInputFormat.setCharset(), 
> which sets TextInputFormat.charsetName and then modifies the previously set 
> delimiterString to construct the proper byte string encoding of the the 
> delimiter. This same charsetName is also used in TextInputFormat.readRecord() 
> to interpret the bytes read from the file.
>  
> There are two problems that this implementation would seem to have when using 
> UTF-16.
>  # delimiterString.getBytes(getCharset()) in DelimitedInputFormat.java will 
> return a Big Endian byte sequence including the Byte Order Mark (BOM). The 
> actual text file will not contain a BOM at each line ending, so the delimiter 
> will never be read. Moreover, if the actual byte encoding of the file is 
> Little Endian, the bytes will be interpreted incorrectly.
>  # TextInputFormat.readRecord() will not see a BOM each time it decodes a 
> byte sequence with the String(bytes, offset, numBytes, charset) call. 
> Therefore, it will assume Big Endian, which may not always be correct. [1] 
> [https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java#L95]
>  
> While there are likely many solutions, I would think that all of them would 
> have to start by reading the BOM from the file when a Split is opened and 
> then using that BOM to modify the specified encoding to a BOM specific one 
> when the caller doesn't specify one, and to overwrite the caller's 
> specification if the BOM is in conflict with the caller's specification. That 
> is, if the BOM indicates Little Endian and the caller indicates UTF-16BE, 
> Flink should rewrite the charsetName as UTF-16LE.
>  I hope this makes sense and that I haven't been testing incorrectly or 
> misreading the code.
>  
> I've verified the problem on version 1.4.2. I believe the problem exists on 
> all versions. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9970) Add ASCII/CHR function for table/sql API

2018-10-18 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16656159#comment-16656159
 ] 

ASF GitHub Bot commented on FLINK-9970:
---

yanghua commented on a change in pull request #6432: [FLINK-9970] [table] Add 
ASCII/CHR function for table/sql API
URL: https://github.com/apache/flink/pull/6432#discussion_r226515985
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala
 ##
 @@ -252,6 +252,17 @@ object ScalarFunctions {
 regexp_extract(str, regex, 0)
   }
 
+  /**
+* Returns a numeric value of the leftmost character of the string str.
+*/
+  def ascii(str: String): Integer = {
+if (str == null || str.equals("")) {
+  0
 
 Review comment:
   OK, I can return `null`. In the Java world, Integer is an object, and 
returning null is fine from a grammatical and semantic point of view. However, 
when we use ascii('') as input to other operators, such as 
xxxFunction(ascii('')). The subsequent effects of null and 0 are completely 
different. Null may produce NPE or other behavior, while 0 is different. If you 
have already considered this, then I have no problem here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add ASCII/CHR function for table/sql API
> 
>
> Key: FLINK-9970
> URL: https://issues.apache.org/jira/browse/FLINK-9970
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Minor
>  Labels: pull-request-available
>
> for ASCII function : 
> refer to : 
> [https://dev.mysql.com/doc/refman/8.0/en/string-functions.html#function_ascii]
> for CHR function : 
> This function convert ASCII code to a character,
> refer to : [https://doc.ispirer.com/sqlways/Output/SQLWays-1-071.html]
> Considering "CHAR" always is a keyword in many database, so we use "CHR" 
> keyword.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] yanghua commented on a change in pull request #6432: [FLINK-9970] [table] Add ASCII/CHR function for table/sql API

2018-10-18 Thread GitBox
yanghua commented on a change in pull request #6432: [FLINK-9970] [table] Add 
ASCII/CHR function for table/sql API
URL: https://github.com/apache/flink/pull/6432#discussion_r226515985
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala
 ##
 @@ -252,6 +252,17 @@ object ScalarFunctions {
 regexp_extract(str, regex, 0)
   }
 
+  /**
+* Returns a numeric value of the leftmost character of the string str.
+*/
+  def ascii(str: String): Integer = {
+if (str == null || str.equals("")) {
+  0
 
 Review comment:
   OK, I can return `null`. In the Java world, Integer is an object, and 
returning null is fine from a grammatical and semantic point of view. However, 
when we use ascii('') as input to other operators, such as 
xxxFunction(ascii('')). The subsequent effects of null and 0 are completely 
different. Null may produce NPE or other behavior, while 0 is different. If you 
have already considered this, then I have no problem here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9970) Add ASCII/CHR function for table/sql API

2018-10-18 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16656144#comment-16656144
 ] 

ASF GitHub Bot commented on FLINK-9970:
---

yanghua commented on a change in pull request #6432: [FLINK-9970] [table] Add 
ASCII/CHR function for table/sql API
URL: https://github.com/apache/flink/pull/6432#discussion_r226513490
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala
 ##
 @@ -252,6 +252,17 @@ object ScalarFunctions {
 regexp_extract(str, regex, 0)
   }
 
+  /**
+* Returns a numeric value of the leftmost character of the string str.
+*/
+  def ascii(str: String): Integer = {
+if (str == null || str.equals("")) {
+  0
 
 Review comment:
   OK, will change it


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add ASCII/CHR function for table/sql API
> 
>
> Key: FLINK-9970
> URL: https://issues.apache.org/jira/browse/FLINK-9970
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Minor
>  Labels: pull-request-available
>
> for ASCII function : 
> refer to : 
> [https://dev.mysql.com/doc/refman/8.0/en/string-functions.html#function_ascii]
> for CHR function : 
> This function convert ASCII code to a character,
> refer to : [https://doc.ispirer.com/sqlways/Output/SQLWays-1-071.html]
> Considering "CHAR" always is a keyword in many database, so we use "CHR" 
> keyword.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] yanghua commented on a change in pull request #6432: [FLINK-9970] [table] Add ASCII/CHR function for table/sql API

2018-10-18 Thread GitBox
yanghua commented on a change in pull request #6432: [FLINK-9970] [table] Add 
ASCII/CHR function for table/sql API
URL: https://github.com/apache/flink/pull/6432#discussion_r226513490
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala
 ##
 @@ -252,6 +252,17 @@ object ScalarFunctions {
 regexp_extract(str, regex, 0)
   }
 
+  /**
+* Returns a numeric value of the leftmost character of the string str.
+*/
+  def ascii(str: String): Integer = {
+if (str == null || str.equals("")) {
+  0
 
 Review comment:
   OK, will change it


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9697) Provide connector for modern Kafka

2018-10-18 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16656141#comment-16656141
 ] 

ASF GitHub Bot commented on FLINK-9697:
---

yanghua edited a comment on issue #6703: [FLINK-9697] Provide connector for 
modern Kafka
URL: https://github.com/apache/flink/pull/6703#issuecomment-431216581
 
 
   @alexeyt820 I have moved these two issues into FLINK-10598 as sub issues.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Provide connector for modern Kafka
> --
>
> Key: FLINK-9697
> URL: https://issues.apache.org/jira/browse/FLINK-9697
> Project: Flink
>  Issue Type: Improvement
>Reporter: Ted Yu
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Kafka 2.0.0 would be released soon.
> Here is vote thread:
> [http://search-hadoop.com/m/Kafka/uyzND1vxnEd23QLxb?subj=+VOTE+2+0+0+RC1]
> We should provide connector for Kafka 2.0.0 once it is released.
> Upgrade to 2.0 documentation : 
> http://kafka.apache.org/20/documentation.html#upgrade_2_0_0



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9697) Provide connector for modern Kafka

2018-10-18 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16656140#comment-16656140
 ] 

ASF GitHub Bot commented on FLINK-9697:
---

yanghua commented on issue #6703: [FLINK-9697] Provide connector for modern 
Kafka
URL: https://github.com/apache/flink/pull/6703#issuecomment-431216581
 
 
   @alexeyt820 I have moved these two issues into FLINK-10598 as a sub issue.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Provide connector for modern Kafka
> --
>
> Key: FLINK-9697
> URL: https://issues.apache.org/jira/browse/FLINK-9697
> Project: Flink
>  Issue Type: Improvement
>Reporter: Ted Yu
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Kafka 2.0.0 would be released soon.
> Here is vote thread:
> [http://search-hadoop.com/m/Kafka/uyzND1vxnEd23QLxb?subj=+VOTE+2+0+0+RC1]
> We should provide connector for Kafka 2.0.0 once it is released.
> Upgrade to 2.0 documentation : 
> http://kafka.apache.org/20/documentation.html#upgrade_2_0_0



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] yanghua edited a comment on issue #6703: [FLINK-9697] Provide connector for modern Kafka

2018-10-18 Thread GitBox
yanghua edited a comment on issue #6703: [FLINK-9697] Provide connector for 
modern Kafka
URL: https://github.com/apache/flink/pull/6703#issuecomment-431216581
 
 
   @alexeyt820 I have moved these two issues into FLINK-10598 as sub issues.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] yanghua commented on issue #6703: [FLINK-9697] Provide connector for modern Kafka

2018-10-18 Thread GitBox
yanghua commented on issue #6703: [FLINK-9697] Provide connector for modern 
Kafka
URL: https://github.com/apache/flink/pull/6703#issuecomment-431216581
 
 
   @alexeyt820 I have moved these two issues into FLINK-10598 as a sub issue.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-10-18 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16656137#comment-16656137
 ] 

vinoyang commented on FLINK-8500:
-

Hi,  if the modern kafka connector also has this issue, Shall we remove 
*Kafka010Fetcher* from the title of the issue?

> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-10-18 Thread vinoyang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

vinoyang updated FLINK-8500:

Issue Type: Sub-task  (was: Improvement)
Parent: FLINK-10598

> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8354) Flink Kafka connector ignores Kafka message headers

2018-10-18 Thread vinoyang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-8354?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

vinoyang updated FLINK-8354:

Issue Type: Sub-task  (was: Bug)
Parent: FLINK-10598

> Flink Kafka connector ignores Kafka message  headers 
> -
>
> Key: FLINK-8354
> URL: https://issues.apache.org/jira/browse/FLINK-8354
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kafka Connector
> Environment: Kafka 0.11.0.0
> Flink 1.4.0
> flink-connector-kafka-0.11_2.11 
>Reporter: Mohammad Abareghi
>Assignee: Aegeaner
>Priority: Major
>  Labels: pull-request-available
>
> Kafka has introduced notion of Header for messages in version 0.11.0.0  
> https://issues.apache.org/jira/browse/KAFKA-4208.
> But flink-connector-kafka-0.11_2.11 which supports kafka 0.11.0.0 ignores 
> headers when consuming kafka messages. 
> It would be useful in some scenarios, such as distributed log tracing, to 
> support message headers to FlinkKafkaConsumer011 and FlinkKafkaProducer011. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9970) Add ASCII/CHR function for table/sql API

2018-10-18 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16656102#comment-16656102
 ] 

ASF GitHub Bot commented on FLINK-9970:
---

xccui commented on a change in pull request #6432: [FLINK-9970] [table] Add 
ASCII/CHR function for table/sql API
URL: https://github.com/apache/flink/pull/6432#discussion_r226506688
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala
 ##
 @@ -252,6 +252,17 @@ object ScalarFunctions {
 regexp_extract(str, regex, 0)
   }
 
+  /**
+* Returns a numeric value of the leftmost character of the string str.
+*/
+  def ascii(str: String): Integer = {
+if (str == null || str.equals("")) {
+  0
 
 Review comment:
   Yes, the results are "coincidentally" to be null, but I still suggest to 
explicitly return a `null` value instead of a normal result.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add ASCII/CHR function for table/sql API
> 
>
> Key: FLINK-9970
> URL: https://issues.apache.org/jira/browse/FLINK-9970
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Minor
>  Labels: pull-request-available
>
> for ASCII function : 
> refer to : 
> [https://dev.mysql.com/doc/refman/8.0/en/string-functions.html#function_ascii]
> for CHR function : 
> This function convert ASCII code to a character,
> refer to : [https://doc.ispirer.com/sqlways/Output/SQLWays-1-071.html]
> Considering "CHAR" always is a keyword in many database, so we use "CHR" 
> keyword.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] xccui commented on a change in pull request #6432: [FLINK-9970] [table] Add ASCII/CHR function for table/sql API

2018-10-18 Thread GitBox
xccui commented on a change in pull request #6432: [FLINK-9970] [table] Add 
ASCII/CHR function for table/sql API
URL: https://github.com/apache/flink/pull/6432#discussion_r226506688
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala
 ##
 @@ -252,6 +252,17 @@ object ScalarFunctions {
 regexp_extract(str, regex, 0)
   }
 
+  /**
+* Returns a numeric value of the leftmost character of the string str.
+*/
+  def ascii(str: String): Integer = {
+if (str == null || str.equals("")) {
+  0
 
 Review comment:
   Yes, the results are "coincidentally" to be null, but I still suggest to 
explicitly return a `null` value instead of a normal result.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9970) Add ASCII/CHR function for table/sql API

2018-10-18 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16656099#comment-16656099
 ] 

ASF GitHub Bot commented on FLINK-9970:
---

xccui commented on a change in pull request #6432: [FLINK-9970] [table] Add 
ASCII/CHR function for table/sql API
URL: https://github.com/apache/flink/pull/6432#discussion_r226506284
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala
 ##
 @@ -252,6 +252,17 @@ object ScalarFunctions {
 regexp_extract(str, regex, 0)
   }
 
+  /**
+* Returns a numeric value of the leftmost character of the string str.
+*/
+  def ascii(str: String): Integer = {
+if (str == null || str.equals("")) {
+  0
+} else {
+  str.charAt(0).toByte.toInt
 
 Review comment:
   Could you explain more about this behavior?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add ASCII/CHR function for table/sql API
> 
>
> Key: FLINK-9970
> URL: https://issues.apache.org/jira/browse/FLINK-9970
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Minor
>  Labels: pull-request-available
>
> for ASCII function : 
> refer to : 
> [https://dev.mysql.com/doc/refman/8.0/en/string-functions.html#function_ascii]
> for CHR function : 
> This function convert ASCII code to a character,
> refer to : [https://doc.ispirer.com/sqlways/Output/SQLWays-1-071.html]
> Considering "CHAR" always is a keyword in many database, so we use "CHR" 
> keyword.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9970) Add ASCII/CHR function for table/sql API

2018-10-18 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16656097#comment-16656097
 ] 

ASF GitHub Bot commented on FLINK-9970:
---

xccui commented on a change in pull request #6432: [FLINK-9970] [table] Add 
ASCII/CHR function for table/sql API
URL: https://github.com/apache/flink/pull/6432#discussion_r226506162
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
 ##
 @@ -889,6 +916,69 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
   "")
   }
 
+  @Test
+  def testChr(): Unit = {
+testAllApis(
+  'f14.chr(),
+  "f14.chr()",
+  "CHR(f14)",
+  "null")
+
+testAllApis(
+  'f34.chr(),
+  "f34.chr()",
+  "CHR(f34)",
+  "null")
+
+testAllApis(
+  'f34.chr(),
+  "f34.chr()",
+  "CHR(f34)",
+  "null")
+
+testAllApis(
+  'f36.chr(),
+  "f36.chr()",
+  "CHR(f36)",
+  "A")
+
+testAllApis(
+  'f37.chr(),
+  "f37.chr()",
+  "CHR(f37)",
+  "a")
+
+testAllApis(
+  'f38.chr(),
+  "f38.chr()",
+  "CHR(f38)",
+  "ÿ")
+
+testAllApis(
 
 Review comment:
   This is duplicated with a test case above.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add ASCII/CHR function for table/sql API
> 
>
> Key: FLINK-9970
> URL: https://issues.apache.org/jira/browse/FLINK-9970
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Minor
>  Labels: pull-request-available
>
> for ASCII function : 
> refer to : 
> [https://dev.mysql.com/doc/refman/8.0/en/string-functions.html#function_ascii]
> for CHR function : 
> This function convert ASCII code to a character,
> refer to : [https://doc.ispirer.com/sqlways/Output/SQLWays-1-071.html]
> Considering "CHAR" always is a keyword in many database, so we use "CHR" 
> keyword.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] xccui commented on a change in pull request #6432: [FLINK-9970] [table] Add ASCII/CHR function for table/sql API

2018-10-18 Thread GitBox
xccui commented on a change in pull request #6432: [FLINK-9970] [table] Add 
ASCII/CHR function for table/sql API
URL: https://github.com/apache/flink/pull/6432#discussion_r226506284
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala
 ##
 @@ -252,6 +252,17 @@ object ScalarFunctions {
 regexp_extract(str, regex, 0)
   }
 
+  /**
+* Returns a numeric value of the leftmost character of the string str.
+*/
+  def ascii(str: String): Integer = {
+if (str == null || str.equals("")) {
+  0
+} else {
+  str.charAt(0).toByte.toInt
 
 Review comment:
   Could you explain more about this behavior?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] xccui commented on a change in pull request #6432: [FLINK-9970] [table] Add ASCII/CHR function for table/sql API

2018-10-18 Thread GitBox
xccui commented on a change in pull request #6432: [FLINK-9970] [table] Add 
ASCII/CHR function for table/sql API
URL: https://github.com/apache/flink/pull/6432#discussion_r226506162
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
 ##
 @@ -889,6 +916,69 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
   "")
   }
 
+  @Test
+  def testChr(): Unit = {
+testAllApis(
+  'f14.chr(),
+  "f14.chr()",
+  "CHR(f14)",
+  "null")
+
+testAllApis(
+  'f34.chr(),
+  "f34.chr()",
+  "CHR(f34)",
+  "null")
+
+testAllApis(
+  'f34.chr(),
+  "f34.chr()",
+  "CHR(f34)",
+  "null")
+
+testAllApis(
+  'f36.chr(),
+  "f36.chr()",
+  "CHR(f36)",
+  "A")
+
+testAllApis(
+  'f37.chr(),
+  "f37.chr()",
+  "CHR(f37)",
+  "a")
+
+testAllApis(
+  'f38.chr(),
+  "f38.chr()",
+  "CHR(f38)",
+  "ÿ")
+
+testAllApis(
 
 Review comment:
   This is duplicated with a test case above.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10566) Flink Planning is exponential in the number of stages

2018-10-18 Thread Fabian Hueske (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16655692#comment-16655692
 ] 

Fabian Hueske commented on FLINK-10566:
---

This issue only affects DataSet programs. The DataStream API does not optimize 
the plans.
To be honest, this might be a major issue. The optimizer has not been developed 
for quite some time (besides some bugfixes) and changing the plan enumeration 
logic is not really easy.

One thing that might work, is to run the optimizer for subplans and connect the 
optimal subplans. However, we would need to identify the edges at which we want 
to split the plan. One challenge here is that the DataSet API allows for 
branching and merging plans (in contrast to most relational optimizers). 

What you also could try to do is to pass optimizer hints yourself. That would 
reduce the search space because the execution strategies would be be fixed.

> Flink Planning is exponential in the number of stages
> -
>
> Key: FLINK-10566
> URL: https://issues.apache.org/jira/browse/FLINK-10566
> Project: Flink
>  Issue Type: Bug
>  Components: Optimizer
>Affects Versions: 1.5.4
>Reporter: Robert Bradshaw
>Priority: Major
> Attachments: chart.png
>
>
> This makes it nearly impossible to run graphs with 100 or more stages. (The 
> execution itself is still sub-second, but the job submission takes 
> increasingly long.)
> I can reproduce this with the following pipeline, which resembles my 
> real-world workloads (with depth up to 10 and width up, and past, 50). On 
> Flink it seems getting width beyond width 10 is problematic (times out after 
> hours). Note the log scale on the chart for time. 
>  
> {code:java}
>   public static void runPipeline(int depth, int width) throws Exception {
> final ExecutionEnvironment env = 
> ExecutionEnvironment.getExecutionEnvironment();
> DataSet input = env.fromElements("a", "b", "c");
> DataSet stats = null;
> for (int i = 0; i < depth; i++) {
>   stats = analyze(input, stats, width / (i + 1) + 1);
> }
> stats.writeAsText("out.txt");
> env.execute("depth " + depth + " width " + width);
>   }
>   public static DataSet analyze(DataSet input, 
> DataSet stats, int branches) {
> System.out.println("analyze " + branches);
> for (int i = 0; i < branches; i++) {
>   final int ii = i;
>   if (stats != null) {
> input = input.map(new RichMapFunction() {
> @Override
> public void open(Configuration parameters) throws Exception {
>   Collection broadcastSet = 
> getRuntimeContext().getBroadcastVariable("stats");
> }
> @Override
> public String map(String value) throws Exception {
>   return value;
> }
>   }).withBroadcastSet(stats.map(s -> "(" + s + ").map"), "stats");
>   }
>   DataSet branch = input
>.map(s -> new Tuple2(0, s + 
> ii))
>.groupBy(0)
>.minBy(1)
>.map(kv -> kv.f1);
>   if (stats == null) {
> stats = branch;
>   } else {
> stats = stats.union(branch);
>   }
> }
> return stats.map(s -> "(" + s + ").stats");
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10566) Flink Planning is exponential in the number of stages

2018-10-18 Thread Maximilian Michels (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16655683#comment-16655683
 ] 

Maximilian Michels commented on FLINK-10566:


Just to clarify, this happens during Plan creation which is specific to the 
batch execution. Afterwards, the Plan is converted into a JobGraph which is 
send to the JobMaster for scheduling.

[~robertwb] Have you run a similar experiment for the DataStream API? Asking 
because streaming has a different way of creating the JobGraph.

[~fhueske] or [~till.rohrmann] Do you have an idea how we could fix this? This 
blocks us from running TFX pipelines on Flink.

> Flink Planning is exponential in the number of stages
> -
>
> Key: FLINK-10566
> URL: https://issues.apache.org/jira/browse/FLINK-10566
> Project: Flink
>  Issue Type: Bug
>  Components: Optimizer
>Affects Versions: 1.5.4
>Reporter: Robert Bradshaw
>Priority: Major
> Attachments: chart.png
>
>
> This makes it nearly impossible to run graphs with 100 or more stages. (The 
> execution itself is still sub-second, but the job submission takes 
> increasingly long.)
> I can reproduce this with the following pipeline, which resembles my 
> real-world workloads (with depth up to 10 and width up, and past, 50). On 
> Flink it seems getting width beyond width 10 is problematic (times out after 
> hours). Note the log scale on the chart for time. 
>  
> {code:java}
>   public static void runPipeline(int depth, int width) throws Exception {
> final ExecutionEnvironment env = 
> ExecutionEnvironment.getExecutionEnvironment();
> DataSet input = env.fromElements("a", "b", "c");
> DataSet stats = null;
> for (int i = 0; i < depth; i++) {
>   stats = analyze(input, stats, width / (i + 1) + 1);
> }
> stats.writeAsText("out.txt");
> env.execute("depth " + depth + " width " + width);
>   }
>   public static DataSet analyze(DataSet input, 
> DataSet stats, int branches) {
> System.out.println("analyze " + branches);
> for (int i = 0; i < branches; i++) {
>   final int ii = i;
>   if (stats != null) {
> input = input.map(new RichMapFunction() {
> @Override
> public void open(Configuration parameters) throws Exception {
>   Collection broadcastSet = 
> getRuntimeContext().getBroadcastVariable("stats");
> }
> @Override
> public String map(String value) throws Exception {
>   return value;
> }
>   }).withBroadcastSet(stats.map(s -> "(" + s + ").map"), "stats");
>   }
>   DataSet branch = input
>.map(s -> new Tuple2(0, s + 
> ii))
>.groupBy(0)
>.minBy(1)
>.map(kv -> kv.f1);
>   if (stats == null) {
> stats = branch;
>   } else {
> stats = stats.union(branch);
>   }
> }
> return stats.map(s -> "(" + s + ").stats");
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10566) Flink Planning is exponential in the number of stages

2018-10-18 Thread Maximilian Michels (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16655681#comment-16655681
 ] 

Maximilian Michels commented on FLINK-10566:


Here's a thread dump:
 
{noformat}

at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at 
org.apache.flink.api.common.operators.SingleInputOperator.accept(SingleInputOperator.java:199)
at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at 
org.apache.flink.api.common.operators.SingleInputOperator.accept(SingleInputOperator.java:199)
at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
at 

[jira] [Commented] (FLINK-9697) Provide connector for modern Kafka

2018-10-18 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16655663#comment-16655663
 ] 

ASF GitHub Bot commented on FLINK-9697:
---

alexeyt820 commented on issue #6703: [FLINK-9697] Provide connector for modern 
Kafka
URL: https://github.com/apache/flink/pull/6703#issuecomment-431100016
 
 
   @yanghua, FLINK-8354 (headers) and FLINK-8500 (timestamp), but seems I don't 
have permissions to change anything in these Jira issues


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Provide connector for modern Kafka
> --
>
> Key: FLINK-9697
> URL: https://issues.apache.org/jira/browse/FLINK-9697
> Project: Flink
>  Issue Type: Improvement
>Reporter: Ted Yu
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Kafka 2.0.0 would be released soon.
> Here is vote thread:
> [http://search-hadoop.com/m/Kafka/uyzND1vxnEd23QLxb?subj=+VOTE+2+0+0+RC1]
> We should provide connector for Kafka 2.0.0 once it is released.
> Upgrade to 2.0 documentation : 
> http://kafka.apache.org/20/documentation.html#upgrade_2_0_0



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] alexeyt820 commented on issue #6703: [FLINK-9697] Provide connector for modern Kafka

2018-10-18 Thread GitBox
alexeyt820 commented on issue #6703: [FLINK-9697] Provide connector for modern 
Kafka
URL: https://github.com/apache/flink/pull/6703#issuecomment-431100016
 
 
   @yanghua, FLINK-8354 (headers) and FLINK-8500 (timestamp), but seems I don't 
have permissions to change anything in these Jira issues


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #6816: [FLINK-10527] Cleanup constant isNewMode in YarnTestBase

2018-10-18 Thread GitBox
tillrohrmann commented on a change in pull request #6816: [FLINK-10527] Cleanup 
constant isNewMode in YarnTestBase
URL: https://github.com/apache/flink/pull/6816#discussion_r226396417
 
 

 ##
 File path: 
flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
 ##
 @@ -189,9 +189,9 @@ public void perJobYarnClusterOffHeap() throws IOException {
/**
 * Test TaskManager failure and also if the vcores are set correctly 
(see issue FLINK-2213).
 */
+   @Ignore
@Test(timeout = 10) // timeout after 100 seconds
public void testTaskManagerFailure() throws Exception {
-   assumeTrue("The new mode does not start TMs upfront.", 
!isNewMode);
 
 Review comment:
   I think the problem is that there was another precondition violated which 
caused a `MultipleFailureException` to be thrown and due to that the 
`AssumptionException` was not properly treated as ignoring the respective test 
case.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10527) Cleanup constant isNewMode in YarnTestBase

2018-10-18 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10527?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16655636#comment-16655636
 ] 

ASF GitHub Bot commented on FLINK-10527:


tillrohrmann commented on a change in pull request #6816: [FLINK-10527] Cleanup 
constant isNewMode in YarnTestBase
URL: https://github.com/apache/flink/pull/6816#discussion_r226396417
 
 

 ##
 File path: 
flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
 ##
 @@ -189,9 +189,9 @@ public void perJobYarnClusterOffHeap() throws IOException {
/**
 * Test TaskManager failure and also if the vcores are set correctly 
(see issue FLINK-2213).
 */
+   @Ignore
@Test(timeout = 10) // timeout after 100 seconds
public void testTaskManagerFailure() throws Exception {
-   assumeTrue("The new mode does not start TMs upfront.", 
!isNewMode);
 
 Review comment:
   I think the problem is that there was another precondition violated which 
caused a `MultipleFailureException` to be thrown and due to that the 
`AssumptionException` was not properly treated as ignoring the respective test 
case.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Cleanup constant isNewMode in YarnTestBase
> --
>
> Key: FLINK-10527
> URL: https://issues.apache.org/jira/browse/FLINK-10527
> Project: Flink
>  Issue Type: Sub-task
>  Components: YARN
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> This seems to be a residual problem with FLINK-10396. It is set to true in 
> that PR. Currently it has three usage scenarios:
> 1. assert, caused an error
> {code:java}
> assumeTrue("The new mode does not start TMs upfront.", !isNewMode);
> {code}
> 2. if (!isNewMode) the logic in the block would not have invoked, the if 
> block can be removed
> 3. if (isNewMode) always been invoked, the if statement can be removed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] tillrohrmann commented on issue #6872: [FLINK-10436] Add ConfigOption#withFallbackKeys

2018-10-18 Thread GitBox
tillrohrmann commented on issue #6872: [FLINK-10436] Add 
ConfigOption#withFallbackKeys
URL: https://github.com/apache/flink/pull/6872#issuecomment-431093823
 
 
   At the moment `rest.address` is effectively `dispatcher.rest.address`. The 
problem I see currently is that other components might already depend on 
`RestOption#ADDRESS`. If we now introduce a `RestOption#DISPATCHER_ADDRESS` 
with the fallback to `RestOption#ADDRESS` and `JobManagerOptions#ADDRESS` and 
remove the latter as a deprecated key from the former, then we break setups.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10436) Example config uses deprecated key jobmanager.rpc.address

2018-10-18 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10436?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16655627#comment-16655627
 ] 

ASF GitHub Bot commented on FLINK-10436:


tillrohrmann commented on issue #6872: [FLINK-10436] Add 
ConfigOption#withFallbackKeys
URL: https://github.com/apache/flink/pull/6872#issuecomment-431093823
 
 
   At the moment `rest.address` is effectively `dispatcher.rest.address`. The 
problem I see currently is that other components might already depend on 
`RestOption#ADDRESS`. If we now introduce a `RestOption#DISPATCHER_ADDRESS` 
with the fallback to `RestOption#ADDRESS` and `JobManagerOptions#ADDRESS` and 
remove the latter as a deprecated key from the former, then we break setups.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Example config uses deprecated key jobmanager.rpc.address
> -
>
> Key: FLINK-10436
> URL: https://issues.apache.org/jira/browse/FLINK-10436
> Project: Flink
>  Issue Type: Sub-task
>  Components: Startup Shell Scripts
>Affects Versions: 1.7.0
>Reporter: Ufuk Celebi
>Assignee: TisonKun
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> The example {{flink-conf.yaml}} shipped as part of the Flink distribution 
> (https://github.com/apache/flink/blob/master/flink-dist/src/main/resources/flink-conf.yaml)
>  has the following entry:
> {code}
> jobmanager.rpc.address: localhost
> {code}
> When using this key, the following deprecation warning is logged.
> {code}
> 2018-09-26 12:01:46,608 WARN  org.apache.flink.configuration.Configuration
>   - Config uses deprecated configuration key 
> 'jobmanager.rpc.address' instead of proper key 'rest.address'
> {code}
> The example config should not use deprecated config options.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-10569) Clean up uses of Scheduler and Instance in valid tests

2018-10-18 Thread TisonKun (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10569?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16655620#comment-16655620
 ] 

TisonKun edited comment on FLINK-10569 at 10/18/18 5:22 PM:


I find this topic is more complex than I ever thought. We actually have two 
version of slot implementation and current {{ExecutionGraph}} tests heavily 
depend on legacy slot implementation. Maybe we should separate this thread into 
stepwise issues decouples existing valid tests with legacy slot 
implementation(as well as Scheduler and Instance).

cc [~till.rohrmann].


was (Author: tison):
I find this topic is more complex than I ever thought. We actually have two 
version of slot implementation and current {{ExecutionGraph}} tests heavily 
depend on legacy slot implementation. Maybe we should separated this thread 
into stepwise issues decouples existing valid tests with legacy slot 
implementation(as well as Scheduler and Instance).

cc [~till.rohrmann].

> Clean up uses of Scheduler and Instance in valid tests
> --
>
> Key: FLINK-10569
> URL: https://issues.apache.org/jira/browse/FLINK-10569
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.7.0
>Reporter: TisonKun
>Assignee: TisonKun
>Priority: Major
> Fix For: 1.7.0
>
>
> Legacy class {{Scheduler}} and {{Instance}} are still used in some valid 
> tests like {{ExecutionGraphRestartTest}}. We should replace them with FLIP-6 
> schedule mode. The best way I can find is use {{SimpleSlotProvider}}.
> Note that we need not to remove all use points among all files since most of 
> them stay in legacy codebase like {{JobManager.scala}} and would be removed 
> later.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-10569) Clean up uses of Scheduler and Instance in valid tests

2018-10-18 Thread TisonKun (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10569?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16655620#comment-16655620
 ] 

TisonKun edited comment on FLINK-10569 at 10/18/18 5:22 PM:


I find this topic is more complex than I ever thought. We actually have two 
version of slot implementation and current {{ExecutionGraph}} tests heavily 
depend on legacy slot implementation. Maybe we should separated this thread 
into stepwise issues decouples existing valid tests with legacy slot 
implementation(as well as Scheduler and Instance).

cc [~till.rohrmann].


was (Author: tison):
I find this topic is more complex than I ever thought. We actually have two 
version of slot implementation and current {{ExecutionGraph}} related tests 
heavily depend on legacy slot implementation. Maybe we should separated this 
thread into stepwise issues decouples existing valid tests with legacy slot 
implementation(as well as Scheduler and Instance).

cc [~till.rohrmann].

> Clean up uses of Scheduler and Instance in valid tests
> --
>
> Key: FLINK-10569
> URL: https://issues.apache.org/jira/browse/FLINK-10569
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.7.0
>Reporter: TisonKun
>Assignee: TisonKun
>Priority: Major
> Fix For: 1.7.0
>
>
> Legacy class {{Scheduler}} and {{Instance}} are still used in some valid 
> tests like {{ExecutionGraphRestartTest}}. We should replace them with FLIP-6 
> schedule mode. The best way I can find is use {{SimpleSlotProvider}}.
> Note that we need not to remove all use points among all files since most of 
> them stay in legacy codebase like {{JobManager.scala}} and would be removed 
> later.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10569) Clean up uses of Scheduler and Instance in valid tests

2018-10-18 Thread TisonKun (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10569?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16655620#comment-16655620
 ] 

TisonKun commented on FLINK-10569:
--

I find this topic is more complex than I ever thought. We actually have two 
version of slot implementation and current {{ExecutionGraph}} related tests 
heavily depend on legacy slot implementation. Maybe we should separated this 
thread into stepwise issues decouples existing valid tests with legacy slot 
implementation(as well as Scheduler and Instance).

cc [~till.rohrmann].

> Clean up uses of Scheduler and Instance in valid tests
> --
>
> Key: FLINK-10569
> URL: https://issues.apache.org/jira/browse/FLINK-10569
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.7.0
>Reporter: TisonKun
>Assignee: TisonKun
>Priority: Major
> Fix For: 1.7.0
>
>
> Legacy class {{Scheduler}} and {{Instance}} are still used in some valid 
> tests like {{ExecutionGraphRestartTest}}. We should replace them with FLIP-6 
> schedule mode. The best way I can find is use {{SimpleSlotProvider}}.
> Note that we need not to remove all use points among all files since most of 
> them stay in legacy codebase like {{JobManager.scala}} and would be removed 
> later.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10251) Handle oversized response messages in AkkaRpcActor

2018-10-18 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10251?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16655615#comment-16655615
 ] 

ASF GitHub Bot commented on FLINK-10251:


tillrohrmann commented on a change in pull request #6876: [FLINK-10251] Handle 
oversized response messages in AkkaRpcActor
URL: https://github.com/apache/flink/pull/6876#discussion_r226393468
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
 ##
 @@ -108,9 +111,11 @@
 
private volatile boolean stopped;
 
-   public AkkaRpcService(final ActorSystem actorSystem, final Time 
timeout) {
+   public AkkaRpcService(final ActorSystem actorSystem, final 
Configuration configuration) {
 
 Review comment:
   Yes let's introduce an `AkkaRpcServiceConfiguration`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Handle oversized response messages in AkkaRpcActor
> --
>
> Key: FLINK-10251
> URL: https://issues.apache.org/jira/browse/FLINK-10251
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.5.3, 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.5.6, 1.6.3, 1.7.0
>
>
> The {{AkkaRpcActor}} should check whether an RPC response which is sent to a 
> remote sender does not exceed the maximum framesize of the underlying 
> {{ActorSystem}}. If this is the case we should fail fast instead. We can 
> achieve this by serializing the response and sending the serialized byte 
> array.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] tillrohrmann commented on a change in pull request #6876: [FLINK-10251] Handle oversized response messages in AkkaRpcActor

2018-10-18 Thread GitBox
tillrohrmann commented on a change in pull request #6876: [FLINK-10251] Handle 
oversized response messages in AkkaRpcActor
URL: https://github.com/apache/flink/pull/6876#discussion_r226393468
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
 ##
 @@ -108,9 +111,11 @@
 
private volatile boolean stopped;
 
-   public AkkaRpcService(final ActorSystem actorSystem, final Time 
timeout) {
+   public AkkaRpcService(final ActorSystem actorSystem, final 
Configuration configuration) {
 
 Review comment:
   Yes let's introduce an `AkkaRpcServiceConfiguration`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10602) Run MetricFetcher in metrics ActorSystem

2018-10-18 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10602?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16655600#comment-16655600
 ] 

ASF GitHub Bot commented on FLINK-10602:


tillrohrmann closed pull request #6877: [FLINK-10602] Use metric's ActorSystem 
in MetricFetcher
URL: https://github.com/apache/flink/pull/6877
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
index 35fad322ef9..150f17fa04c 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -54,6 +54,7 @@
 import org.apache.flink.runtime.security.SecurityContext;
 import org.apache.flink.runtime.security.SecurityUtils;
 import org.apache.flink.runtime.util.ZooKeeperUtils;
+import 
org.apache.flink.runtime.webmonitor.retriever.impl.AkkaQueryServiceRetriever;
 import org.apache.flink.util.AutoCloseableAsync;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FileUtils;
@@ -219,6 +220,9 @@ private void runCluster(Configuration configuration) throws 
Exception {
heartbeatServices,
metricRegistry,
archivedExecutionGraphStore,
+   new AkkaQueryServiceRetriever(
+   metricQueryServiceActorSystem,
+   
Time.milliseconds(configuration.getLong(WebOptions.TIMEOUT))),
this);
 
clusterComponent.getShutDownFuture().whenComplete(
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java
index 6d557d0815f..354245d29ba 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java
@@ -22,7 +22,6 @@
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ConfigurationUtils;
 import org.apache.flink.configuration.RestOptions;
-import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.FutureUtils;
@@ -46,16 +45,14 @@
 import org.apache.flink.runtime.rest.RestEndpointFactory;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
 import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
 import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
-import 
org.apache.flink.runtime.webmonitor.retriever.impl.AkkaQueryServiceRetriever;
+import 
org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
 import org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 
-import akka.actor.ActorSystem;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -102,6 +99,7 @@ public AbstractDispatcherResourceManagerComponentFactory(
HeartbeatServices heartbeatServices,
MetricRegistry metricRegistry,
ArchivedExecutionGraphStore archivedExecutionGraphStore,
+   MetricQueryServiceRetriever metricQueryServiceRetriever,
FatalErrorHandler fatalErrorHandler) throws Exception {
 
LeaderRetrievalService dispatcherLeaderRetrievalService = null;
@@ -130,10 +128,6 @@ public AbstractDispatcherResourceManagerComponentFactory(
10,
Time.milliseconds(50L));
 
-   // TODO: Remove once we have ported the MetricFetcher 
to the RpcEndpoint
-   final ActorSystem actorSystem = ((AkkaRpcService) 
rpcService).getActorSystem();
-   final Time timeout = 
Time.milliseconds(configuration.getLong(WebOptions.TIMEOUT));
-
 

[jira] [Resolved] (FLINK-10602) Run MetricFetcher in metrics ActorSystem

2018-10-18 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10602?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann resolved FLINK-10602.
---
Resolution: Fixed

Fixed via 
https://github.com/apache/flink/commit/f6a7100507c4d6eda25c10618f0ad028ffef32d8

> Run MetricFetcher in metrics ActorSystem
> 
>
> Key: FLINK-10602
> URL: https://issues.apache.org/jira/browse/FLINK-10602
> Project: Flink
>  Issue Type: Sub-task
>  Components: Metrics
>Affects Versions: 1.7.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> After letting the {{MetricQueryService}} run into a separate {{ActorSystem}}, 
> we should also let the {{MetricFetcher}} use the same {{ActorSystem}} to 
> fetch the metrics. That way it will be possible to configure the metric 
> {{ActorSystem}} different from the {{RpcService}} {{ActorSystem}} (e.g. 
> different frame size).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] tillrohrmann closed pull request #6877: [FLINK-10602] Use metric's ActorSystem in MetricFetcher

2018-10-18 Thread GitBox
tillrohrmann closed pull request #6877: [FLINK-10602] Use metric's ActorSystem 
in MetricFetcher
URL: https://github.com/apache/flink/pull/6877
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
index 35fad322ef9..150f17fa04c 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -54,6 +54,7 @@
 import org.apache.flink.runtime.security.SecurityContext;
 import org.apache.flink.runtime.security.SecurityUtils;
 import org.apache.flink.runtime.util.ZooKeeperUtils;
+import 
org.apache.flink.runtime.webmonitor.retriever.impl.AkkaQueryServiceRetriever;
 import org.apache.flink.util.AutoCloseableAsync;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FileUtils;
@@ -219,6 +220,9 @@ private void runCluster(Configuration configuration) throws 
Exception {
heartbeatServices,
metricRegistry,
archivedExecutionGraphStore,
+   new AkkaQueryServiceRetriever(
+   metricQueryServiceActorSystem,
+   
Time.milliseconds(configuration.getLong(WebOptions.TIMEOUT))),
this);
 
clusterComponent.getShutDownFuture().whenComplete(
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java
index 6d557d0815f..354245d29ba 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractDispatcherResourceManagerComponentFactory.java
@@ -22,7 +22,6 @@
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ConfigurationUtils;
 import org.apache.flink.configuration.RestOptions;
-import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.FutureUtils;
@@ -46,16 +45,14 @@
 import org.apache.flink.runtime.rest.RestEndpointFactory;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
 import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
 import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
-import 
org.apache.flink.runtime.webmonitor.retriever.impl.AkkaQueryServiceRetriever;
+import 
org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
 import org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 
-import akka.actor.ActorSystem;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -102,6 +99,7 @@ public AbstractDispatcherResourceManagerComponentFactory(
HeartbeatServices heartbeatServices,
MetricRegistry metricRegistry,
ArchivedExecutionGraphStore archivedExecutionGraphStore,
+   MetricQueryServiceRetriever metricQueryServiceRetriever,
FatalErrorHandler fatalErrorHandler) throws Exception {
 
LeaderRetrievalService dispatcherLeaderRetrievalService = null;
@@ -130,10 +128,6 @@ public AbstractDispatcherResourceManagerComponentFactory(
10,
Time.milliseconds(50L));
 
-   // TODO: Remove once we have ported the MetricFetcher 
to the RpcEndpoint
-   final ActorSystem actorSystem = ((AkkaRpcService) 
rpcService).getActorSystem();
-   final Time timeout = 
Time.milliseconds(configuration.getLong(WebOptions.TIMEOUT));
-
webMonitorEndpoint = 
restEndpointFactory.createRestEndpoint(
configuration,
dispatcherGatewayRetriever,
@@ -143,7 +137,7 @@ public 

[jira] [Commented] (FLINK-10436) Example config uses deprecated key jobmanager.rpc.address

2018-10-18 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10436?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16655598#comment-16655598
 ] 

ASF GitHub Bot commented on FLINK-10436:


zentol commented on issue #6872: [FLINK-10436] Add ConfigOption#withFallbackKeys
URL: https://github.com/apache/flink/pull/6872#issuecomment-431087755
 
 
   I understood what the fallback is supposed to do, but it doesn't make sense 
that something generic like `rest.address` uses the very specific 
`jobmanager.rpc.address` as a fall-back. Falling back to a _more specific_ 
option is conceptually not sound.
   If you want to make the fallback behavior re-usable I'd suggest a dedicated 
`dispatcher.rest.port` option with fall-backs to `rest.port` and 
`jobmanager.rpc.address` which you then use to configure the options that the 
`RestEndpoint` expects.
   This way you retain the convenient fallback behavior without coupling the 
rest configuration to any other component.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Example config uses deprecated key jobmanager.rpc.address
> -
>
> Key: FLINK-10436
> URL: https://issues.apache.org/jira/browse/FLINK-10436
> Project: Flink
>  Issue Type: Sub-task
>  Components: Startup Shell Scripts
>Affects Versions: 1.7.0
>Reporter: Ufuk Celebi
>Assignee: TisonKun
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> The example {{flink-conf.yaml}} shipped as part of the Flink distribution 
> (https://github.com/apache/flink/blob/master/flink-dist/src/main/resources/flink-conf.yaml)
>  has the following entry:
> {code}
> jobmanager.rpc.address: localhost
> {code}
> When using this key, the following deprecation warning is logged.
> {code}
> 2018-09-26 12:01:46,608 WARN  org.apache.flink.configuration.Configuration
>   - Config uses deprecated configuration key 
> 'jobmanager.rpc.address' instead of proper key 'rest.address'
> {code}
> The example config should not use deprecated config options.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] zentol commented on issue #6872: [FLINK-10436] Add ConfigOption#withFallbackKeys

2018-10-18 Thread GitBox
zentol commented on issue #6872: [FLINK-10436] Add ConfigOption#withFallbackKeys
URL: https://github.com/apache/flink/pull/6872#issuecomment-431087755
 
 
   I understood what the fallback is supposed to do, but it doesn't make sense 
that something generic like `rest.address` uses the very specific 
`jobmanager.rpc.address` as a fall-back. Falling back to a _more specific_ 
option is conceptually not sound.
   If you want to make the fallback behavior re-usable I'd suggest a dedicated 
`dispatcher.rest.port` option with fall-backs to `rest.port` and 
`jobmanager.rpc.address` which you then use to configure the options that the 
`RestEndpoint` expects.
   This way you retain the convenient fallback behavior without coupling the 
rest configuration to any other component.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10309) Cancel flink job occurs java.net.ConnectException

2018-10-18 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10309?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1663#comment-1663
 ] 

ASF GitHub Bot commented on FLINK-10309:


GJL commented on issue #6785: [FLINK-10309][rest] Before shutting down cluster, 
wait for asynchronous operations
URL: https://github.com/apache/flink/pull/6785#issuecomment-431078873
 
 
   @zentol Thanks for reviewing. I will keep the refactoring in a separate 
commit.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Cancel flink job occurs java.net.ConnectException
> -
>
> Key: FLINK-10309
> URL: https://issues.apache.org/jira/browse/FLINK-10309
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination, REST
>Affects Versions: 1.5.3, 1.6.0, 1.7.0
>Reporter: vinoyang
>Assignee: Gary Yao
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.5.6, 1.6.3, 1.7.0
>
>
> The problem occurs when using the Yarn per-job detached mode. Trying to 
> cancel with savepoint fails with the following exception before being able to 
> retrieve the savepoint path:
> exception stack trace : 
> {code:java}
> org.apache.flink.util.FlinkException: Could not cancel job .
>         at 
> org.apache.flink.client.cli.CliFrontend.lambda$cancel$4(CliFrontend.java:585)
>         at 
> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:960)
>         at 
> org.apache.flink.client.cli.CliFrontend.cancel(CliFrontend.java:577)
>         at 
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1034)
>         at java.lang.Thread.run(Thread.java:748)
> Caused by: java.util.concurrent.ExecutionException: 
> org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not 
> complete the operation. Number of retries has been exhausted.
>         at 
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>         at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
>         at 
> org.apache.flink.client.program.rest.RestClusterClient.cancelWithSavepoint(RestClusterClient.java:398)
>         at 
> org.apache.flink.client.cli.CliFrontend.lambda$cancel$4(CliFrontend.java:583)
>         ... 6 more
> Caused by: org.apache.flink.runtime.concurrent.FutureUtils$RetryException: 
> Could not complete the operation. Number of retries has been exhausted.
>         at 
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$5(FutureUtils.java:213)
>         at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>         at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
>         at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>         at 
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>         at 
> org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$1(RestClient.java:274)
>         at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:680)
>         at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:603)
>         at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:563)
>         at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:424)
>         at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:268)
>         at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:284)
>         at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
>         at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
>         at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
>         at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
>         at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
>         at 
> 

[GitHub] GJL commented on issue #6785: [FLINK-10309][rest] Before shutting down cluster, wait for asynchronous operations

2018-10-18 Thread GitBox
GJL commented on issue #6785: [FLINK-10309][rest] Before shutting down cluster, 
wait for asynchronous operations
URL: https://github.com/apache/flink/pull/6785#issuecomment-431078873
 
 
   @zentol Thanks for reviewing. I will keep the refactoring in a separate 
commit.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10555) Port AkkaSslITCase to new code base

2018-10-18 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16655539#comment-16655539
 ] 

ASF GitHub Bot commented on FLINK-10555:


TisonKun commented on issue #6849: [FLINK-10555] [test] Port AkkaSslITCase to 
new code base
URL: https://github.com/apache/flink/pull/6849#issuecomment-431075922
 
 
   @tillrohrmann yes I think those are covered by the mentioned existing tests. 
Moving negative tests to `BlobServerSslTest` as suggested.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Port AkkaSslITCase to new code base
> ---
>
> Key: FLINK-10555
> URL: https://issues.apache.org/jira/browse/FLINK-10555
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.7.0
>Reporter: TisonKun
>Assignee: TisonKun
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Port {{AkkaSslITCase}} to new code base, as {{MiniClusterSslITCase}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] TisonKun commented on issue #6849: [FLINK-10555] [test] Port AkkaSslITCase to new code base

2018-10-18 Thread GitBox
TisonKun commented on issue #6849: [FLINK-10555] [test] Port AkkaSslITCase to 
new code base
URL: https://github.com/apache/flink/pull/6849#issuecomment-431075922
 
 
   @tillrohrmann yes I think those are covered by the mentioned existing tests. 
Moving negative tests to `BlobServerSslTest` as suggested.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10563) Expose shaded Presto S3 filesystem under "s3p" scheme

2018-10-18 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10563?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16655529#comment-16655529
 ] 

ASF GitHub Bot commented on FLINK-10563:


aljoscha commented on issue #6855: [FLINK-10563] Expose shaded Presto S3 
filesystem under "s3p" scheme
URL: https://github.com/apache/flink/pull/6855#issuecomment-431074098
 
 
   will update both, thanks!


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Expose shaded Presto S3 filesystem under "s3p" scheme
> -
>
> Key: FLINK-10563
> URL: https://issues.apache.org/jira/browse/FLINK-10563
> Project: Flink
>  Issue Type: Improvement
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Currently, you can't use the shaded hadoop S3 filesystem and the presto S3 
> filesystem at the same time. If we exposed the presto filesystem under an 
> additional scheme we enable using both at the same time.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] aljoscha commented on issue #6855: [FLINK-10563] Expose shaded Presto S3 filesystem under "s3p" scheme

2018-10-18 Thread GitBox
aljoscha commented on issue #6855: [FLINK-10563] Expose shaded Presto S3 
filesystem under "s3p" scheme
URL: https://github.com/apache/flink/pull/6855#issuecomment-431074098
 
 
   will update both, thanks!


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9970) Add ASCII/CHR function for table/sql API

2018-10-18 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16655509#comment-16655509
 ] 

ASF GitHub Bot commented on FLINK-9970:
---

yanghua commented on issue #6432: [FLINK-9970] [table] Add ASCII/CHR function 
for table/sql API
URL: https://github.com/apache/flink/pull/6432#issuecomment-431070571
 
 
   @xccui Any more review comment?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add ASCII/CHR function for table/sql API
> 
>
> Key: FLINK-9970
> URL: https://issues.apache.org/jira/browse/FLINK-9970
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Minor
>  Labels: pull-request-available
>
> for ASCII function : 
> refer to : 
> [https://dev.mysql.com/doc/refman/8.0/en/string-functions.html#function_ascii]
> for CHR function : 
> This function convert ASCII code to a character,
> refer to : [https://doc.ispirer.com/sqlways/Output/SQLWays-1-071.html]
> Considering "CHAR" always is a keyword in many database, so we use "CHR" 
> keyword.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] yanghua commented on issue #6432: [FLINK-9970] [table] Add ASCII/CHR function for table/sql API

2018-10-18 Thread GitBox
yanghua commented on issue #6432: [FLINK-9970] [table] Add ASCII/CHR function 
for table/sql API
URL: https://github.com/apache/flink/pull/6432#issuecomment-431070571
 
 
   @xccui Any more review comment?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10384) Add Sinh math function supported in Table API and SQL

2018-10-18 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16655493#comment-16655493
 ] 

ASF GitHub Bot commented on FLINK-10384:


pnowojski closed pull request #6730: [FLINK-10384][table] Add Sinh math 
function supported in Table API and SQL
URL: https://github.com/apache/flink/pull/6730
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/dev/table/functions.md b/docs/dev/table/functions.md
index 37171a2079f..e47b31772b3 100644
--- a/docs/dev/table/functions.md
+++ b/docs/dev/table/functions.md
@@ -1197,6 +1197,18 @@ SIN(numeric)
   
 
 
+
+  
+{% highlight text %}
+SINH(numeric)
+{% endhighlight %}
+  
+  
+Returns the hyperbolic sine of numeric. 
+The return type is DOUBLE.
+  
+
+
 
   
 {% highlight text %}
@@ -1656,6 +1668,18 @@ NUMERIC.sin()
   
 
 
+
+  
+{% highlight java %}
+NUMERIC.sinh()
+{% endhighlight %}
+  
+  
+Returns the hyperbolic sine of NUMERIC. 
+The return type is DOUBLE.
+  
+
+
 
   
 {% highlight java %}
@@ -2116,6 +2140,18 @@ NUMERIC.sin()
   
 
 
+
+  
+{% highlight scala %}
+NUMERIC.sinh()
+{% endhighlight %}
+  
+  
+Returns the hyperbolic sine of NUMERIC. 
+The return type is DOUBLE.
+  
+
+
 
   
 {% highlight scala %}
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
index 7c585674b54..f9fb93cb7fa 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
@@ -357,6 +357,11 @@ trait ImplicitExpressionOperations {
 */
   def floor() = Floor(expr)
 
+  /**
+* Calculates the hyperbolic sine of a given value.
+*/
+  def sinh() = Sinh(expr)
+
   /**
 * Calculates the smallest integer greater than or equal to a given number.
 */
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala
index 1ae6e39e073..8abe55db8d5 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala
@@ -84,6 +84,9 @@ object BuiltInMethods {
   val ACOS = Types.lookupMethod(classOf[Math], "acos", classOf[Double])
   val ACOS_DEC = Types.lookupMethod(classOf[SqlFunctions], "acos", 
classOf[JBigDecimal])
 
+  val SINH = Types.lookupMethod(classOf[Math], "sinh", classOf[Double])
+  val SINH_DEC = Types.lookupMethod(classOf[ScalarFunctions], "sinh", 
classOf[JBigDecimal])
+
   val ATAN = Types.lookupMethod(classOf[Math], "atan", classOf[Double])
   val ATAN_DEC = Types.lookupMethod(classOf[SqlFunctions], "atan", 
classOf[JBigDecimal])
 
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
index 9a6aeb15b3b..c7070400c9f 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
@@ -241,6 +241,18 @@ object FunctionGenerator {
 DOUBLE_TYPE_INFO,
 BuiltInMethods.LN)
 
+  addSqlFunctionMethod(
+SINH,
+Seq(DOUBLE_TYPE_INFO),
+DOUBLE_TYPE_INFO,
+BuiltInMethods.SINH)
+
+  addSqlFunctionMethod(
+SINH,
+Seq(BIG_DEC_TYPE_INFO),
+DOUBLE_TYPE_INFO,
+BuiltInMethods.SINH_DEC)
+
   addSqlFunctionMethod(
 EXP,
 Seq(DOUBLE_TYPE_INFO),
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/mathExpressions.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/mathExpressions.scala
index 97e7190c0eb..05539deb711 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/mathExpressions.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/mathExpressions.scala
@@ -165,6 +165,20 @@ case class Power(left: Expression, right: Expression) 
extends BinaryExpression w
   }
 }
 
+case class Sinh(child: 

[jira] [Commented] (FLINK-10384) Add Sinh math function supported in Table API and SQL

2018-10-18 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16655492#comment-16655492
 ] 

ASF GitHub Bot commented on FLINK-10384:


pnowojski commented on issue #6730: [FLINK-10384][table] Add Sinh math function 
supported in Table API and SQL
URL: https://github.com/apache/flink/pull/6730#issuecomment-431066934
 
 
   Thanks @yanghua :) Merging


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add Sinh math function supported in Table API and SQL
> -
>
> Key: FLINK-10384
> URL: https://issues.apache.org/jira/browse/FLINK-10384
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Minor
>  Labels: pull-request-available
>
> like FLINK-10340 for adding Cosh math function



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] pnowojski closed pull request #6730: [FLINK-10384][table] Add Sinh math function supported in Table API and SQL

2018-10-18 Thread GitBox
pnowojski closed pull request #6730: [FLINK-10384][table] Add Sinh math 
function supported in Table API and SQL
URL: https://github.com/apache/flink/pull/6730
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/dev/table/functions.md b/docs/dev/table/functions.md
index 37171a2079f..e47b31772b3 100644
--- a/docs/dev/table/functions.md
+++ b/docs/dev/table/functions.md
@@ -1197,6 +1197,18 @@ SIN(numeric)
   
 
 
+
+  
+{% highlight text %}
+SINH(numeric)
+{% endhighlight %}
+  
+  
+Returns the hyperbolic sine of numeric. 
+The return type is DOUBLE.
+  
+
+
 
   
 {% highlight text %}
@@ -1656,6 +1668,18 @@ NUMERIC.sin()
   
 
 
+
+  
+{% highlight java %}
+NUMERIC.sinh()
+{% endhighlight %}
+  
+  
+Returns the hyperbolic sine of NUMERIC. 
+The return type is DOUBLE.
+  
+
+
 
   
 {% highlight java %}
@@ -2116,6 +2140,18 @@ NUMERIC.sin()
   
 
 
+
+  
+{% highlight scala %}
+NUMERIC.sinh()
+{% endhighlight %}
+  
+  
+Returns the hyperbolic sine of NUMERIC. 
+The return type is DOUBLE.
+  
+
+
 
   
 {% highlight scala %}
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
index 7c585674b54..f9fb93cb7fa 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
@@ -357,6 +357,11 @@ trait ImplicitExpressionOperations {
 */
   def floor() = Floor(expr)
 
+  /**
+* Calculates the hyperbolic sine of a given value.
+*/
+  def sinh() = Sinh(expr)
+
   /**
 * Calculates the smallest integer greater than or equal to a given number.
 */
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala
index 1ae6e39e073..8abe55db8d5 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala
@@ -84,6 +84,9 @@ object BuiltInMethods {
   val ACOS = Types.lookupMethod(classOf[Math], "acos", classOf[Double])
   val ACOS_DEC = Types.lookupMethod(classOf[SqlFunctions], "acos", 
classOf[JBigDecimal])
 
+  val SINH = Types.lookupMethod(classOf[Math], "sinh", classOf[Double])
+  val SINH_DEC = Types.lookupMethod(classOf[ScalarFunctions], "sinh", 
classOf[JBigDecimal])
+
   val ATAN = Types.lookupMethod(classOf[Math], "atan", classOf[Double])
   val ATAN_DEC = Types.lookupMethod(classOf[SqlFunctions], "atan", 
classOf[JBigDecimal])
 
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
index 9a6aeb15b3b..c7070400c9f 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
@@ -241,6 +241,18 @@ object FunctionGenerator {
 DOUBLE_TYPE_INFO,
 BuiltInMethods.LN)
 
+  addSqlFunctionMethod(
+SINH,
+Seq(DOUBLE_TYPE_INFO),
+DOUBLE_TYPE_INFO,
+BuiltInMethods.SINH)
+
+  addSqlFunctionMethod(
+SINH,
+Seq(BIG_DEC_TYPE_INFO),
+DOUBLE_TYPE_INFO,
+BuiltInMethods.SINH_DEC)
+
   addSqlFunctionMethod(
 EXP,
 Seq(DOUBLE_TYPE_INFO),
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/mathExpressions.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/mathExpressions.scala
index 97e7190c0eb..05539deb711 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/mathExpressions.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/mathExpressions.scala
@@ -165,6 +165,20 @@ case class Power(left: Expression, right: Expression) 
extends BinaryExpression w
   }
 }
 
+case class Sinh(child: Expression) extends UnaryExpression {
+
+  override private[flink] def resultType: TypeInformation[_] = 
DOUBLE_TYPE_INFO;
+
+  override private[flink] def validateInput(): ValidationResult =
+TypeCheckUtils.assertNumericExpr(child.resultType, 

[GitHub] pnowojski commented on issue #6730: [FLINK-10384][table] Add Sinh math function supported in Table API and SQL

2018-10-18 Thread GitBox
pnowojski commented on issue #6730: [FLINK-10384][table] Add Sinh math function 
supported in Table API and SQL
URL: https://github.com/apache/flink/pull/6730#issuecomment-431066934
 
 
   Thanks @yanghua :) Merging


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10251) Handle oversized response messages in AkkaRpcActor

2018-10-18 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10251?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16655486#comment-16655486
 ] 

ASF GitHub Bot commented on FLINK-10251:


yanghua commented on a change in pull request #6876: [FLINK-10251] Handle 
oversized response messages in AkkaRpcActor
URL: https://github.com/apache/flink/pull/6876#discussion_r226366888
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
 ##
 @@ -108,9 +111,11 @@
 
private volatile boolean stopped;
 
-   public AkkaRpcService(final ActorSystem actorSystem, final Time 
timeout) {
+   public AkkaRpcService(final ActorSystem actorSystem, final 
Configuration configuration) {
 
 Review comment:
   Sorry, I forgot to refactor this code segment : 
   
   ```
   if (actorSystem.settings().config().hasPath(MAXIMUM_FRAME_SIZE_PATH)) {
maximumFramesize = 
actorSystem.settings().config().getBytes(MAXIMUM_FRAME_SIZE_PATH);
} else {
// only local communication
maximumFramesize = Long.MAX_VALUE;
}
   ```
   
   to use
   
   ```
   configuration.getString(AkkaOptions.FRAMESIZE);
   ```
   
   (this suggestion from @zentol)
   
   If I would do this, I think it's better to replace scattered config items 
into one Configuration object so that we can encapsulate change?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Handle oversized response messages in AkkaRpcActor
> --
>
> Key: FLINK-10251
> URL: https://issues.apache.org/jira/browse/FLINK-10251
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.5.3, 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.5.6, 1.6.3, 1.7.0
>
>
> The {{AkkaRpcActor}} should check whether an RPC response which is sent to a 
> remote sender does not exceed the maximum framesize of the underlying 
> {{ActorSystem}}. If this is the case we should fail fast instead. We can 
> achieve this by serializing the response and sending the serialized byte 
> array.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] yanghua commented on a change in pull request #6876: [FLINK-10251] Handle oversized response messages in AkkaRpcActor

2018-10-18 Thread GitBox
yanghua commented on a change in pull request #6876: [FLINK-10251] Handle 
oversized response messages in AkkaRpcActor
URL: https://github.com/apache/flink/pull/6876#discussion_r226366888
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
 ##
 @@ -108,9 +111,11 @@
 
private volatile boolean stopped;
 
-   public AkkaRpcService(final ActorSystem actorSystem, final Time 
timeout) {
+   public AkkaRpcService(final ActorSystem actorSystem, final 
Configuration configuration) {
 
 Review comment:
   Sorry, I forgot to refactor this code segment : 
   
   ```
   if (actorSystem.settings().config().hasPath(MAXIMUM_FRAME_SIZE_PATH)) {
maximumFramesize = 
actorSystem.settings().config().getBytes(MAXIMUM_FRAME_SIZE_PATH);
} else {
// only local communication
maximumFramesize = Long.MAX_VALUE;
}
   ```
   
   to use
   
   ```
   configuration.getString(AkkaOptions.FRAMESIZE);
   ```
   
   (this suggestion from @zentol)
   
   If I would do this, I think it's better to replace scattered config items 
into one Configuration object so that we can encapsulate change?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10436) Example config uses deprecated key jobmanager.rpc.address

2018-10-18 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10436?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16655481#comment-16655481
 ] 

ASF GitHub Bot commented on FLINK-10436:


TisonKun commented on a change in pull request #6872: [FLINK-10436] Add 
ConfigOption#withFallbackKeys
URL: https://github.com/apache/flink/pull/6872#discussion_r226366382
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
 ##
 @@ -701,12 +701,14 @@ public boolean contains(ConfigOption configOption) {
if (this.confData.containsKey(configOption.key())) {
return true;
}
-   else if (configOption.hasDeprecatedKeys()) {
-   // try the deprecated keys
-   for (String deprecatedKey : 
configOption.deprecatedKeys()) {
-   if 
(this.confData.containsKey(deprecatedKey)) {
-   LOG.warn("Config uses 
deprecated configuration key '{}' instead of proper key '{}'",
-   deprecatedKey, 
configOption.key());
+   else if (configOption.hasFallbackKeys()) {
+   // try the fallback keys
+   for (FallbackKey fallbackKey : 
configOption.fallbackKeys()) {
+   if 
(this.confData.containsKey(fallbackKey.getKey())) {
+   if (fallbackKey.isDeprecated()) 
{
+   LOG.warn("Config uses 
deprecated configuration key '{}' instead of proper key '{}'",
+   
fallbackKey.getKey(), configOption.key());
 
 Review comment:
   Oh I see the repeat, would do as suggested, sorry :P


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Example config uses deprecated key jobmanager.rpc.address
> -
>
> Key: FLINK-10436
> URL: https://issues.apache.org/jira/browse/FLINK-10436
> Project: Flink
>  Issue Type: Sub-task
>  Components: Startup Shell Scripts
>Affects Versions: 1.7.0
>Reporter: Ufuk Celebi
>Assignee: TisonKun
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> The example {{flink-conf.yaml}} shipped as part of the Flink distribution 
> (https://github.com/apache/flink/blob/master/flink-dist/src/main/resources/flink-conf.yaml)
>  has the following entry:
> {code}
> jobmanager.rpc.address: localhost
> {code}
> When using this key, the following deprecation warning is logged.
> {code}
> 2018-09-26 12:01:46,608 WARN  org.apache.flink.configuration.Configuration
>   - Config uses deprecated configuration key 
> 'jobmanager.rpc.address' instead of proper key 'rest.address'
> {code}
> The example config should not use deprecated config options.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] TisonKun commented on a change in pull request #6872: [FLINK-10436] Add ConfigOption#withFallbackKeys

2018-10-18 Thread GitBox
TisonKun commented on a change in pull request #6872: [FLINK-10436] Add 
ConfigOption#withFallbackKeys
URL: https://github.com/apache/flink/pull/6872#discussion_r226366382
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
 ##
 @@ -701,12 +701,14 @@ public boolean contains(ConfigOption configOption) {
if (this.confData.containsKey(configOption.key())) {
return true;
}
-   else if (configOption.hasDeprecatedKeys()) {
-   // try the deprecated keys
-   for (String deprecatedKey : 
configOption.deprecatedKeys()) {
-   if 
(this.confData.containsKey(deprecatedKey)) {
-   LOG.warn("Config uses 
deprecated configuration key '{}' instead of proper key '{}'",
-   deprecatedKey, 
configOption.key());
+   else if (configOption.hasFallbackKeys()) {
+   // try the fallback keys
+   for (FallbackKey fallbackKey : 
configOption.fallbackKeys()) {
+   if 
(this.confData.containsKey(fallbackKey.getKey())) {
+   if (fallbackKey.isDeprecated()) 
{
+   LOG.warn("Config uses 
deprecated configuration key '{}' instead of proper key '{}'",
+   
fallbackKey.getKey(), configOption.key());
 
 Review comment:
   Oh I see the repeat, would do as suggested, sorry :P


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10436) Example config uses deprecated key jobmanager.rpc.address

2018-10-18 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10436?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16655473#comment-16655473
 ] 

ASF GitHub Bot commented on FLINK-10436:


TisonKun commented on a change in pull request #6872: [FLINK-10436] Add 
ConfigOption#withFallbackKeys
URL: https://github.com/apache/flink/pull/6872#discussion_r226364750
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
 ##
 @@ -701,12 +701,14 @@ public boolean contains(ConfigOption configOption) {
if (this.confData.containsKey(configOption.key())) {
return true;
}
-   else if (configOption.hasDeprecatedKeys()) {
-   // try the deprecated keys
-   for (String deprecatedKey : 
configOption.deprecatedKeys()) {
-   if 
(this.confData.containsKey(deprecatedKey)) {
-   LOG.warn("Config uses 
deprecated configuration key '{}' instead of proper key '{}'",
-   deprecatedKey, 
configOption.key());
+   else if (configOption.hasFallbackKeys()) {
+   // try the fallback keys
+   for (FallbackKey fallbackKey : 
configOption.fallbackKeys()) {
+   if 
(this.confData.containsKey(fallbackKey.getKey())) {
+   if (fallbackKey.isDeprecated()) 
{
+   LOG.warn("Config uses 
deprecated configuration key '{}' instead of proper key '{}'",
+   
fallbackKey.getKey(), configOption.key());
 
 Review comment:
   does it mean to extract the LOG statement to one method like below?
   
   ```java
   private void warningDeprecated(...) {
  LOG.warn(...);
   }
   ```
   
   I wonder what value it provide or just style convenience to follow. I don't 
find another code block with a method just logging one statement.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Example config uses deprecated key jobmanager.rpc.address
> -
>
> Key: FLINK-10436
> URL: https://issues.apache.org/jira/browse/FLINK-10436
> Project: Flink
>  Issue Type: Sub-task
>  Components: Startup Shell Scripts
>Affects Versions: 1.7.0
>Reporter: Ufuk Celebi
>Assignee: TisonKun
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> The example {{flink-conf.yaml}} shipped as part of the Flink distribution 
> (https://github.com/apache/flink/blob/master/flink-dist/src/main/resources/flink-conf.yaml)
>  has the following entry:
> {code}
> jobmanager.rpc.address: localhost
> {code}
> When using this key, the following deprecation warning is logged.
> {code}
> 2018-09-26 12:01:46,608 WARN  org.apache.flink.configuration.Configuration
>   - Config uses deprecated configuration key 
> 'jobmanager.rpc.address' instead of proper key 'rest.address'
> {code}
> The example config should not use deprecated config options.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] TisonKun commented on a change in pull request #6872: [FLINK-10436] Add ConfigOption#withFallbackKeys

2018-10-18 Thread GitBox
TisonKun commented on a change in pull request #6872: [FLINK-10436] Add 
ConfigOption#withFallbackKeys
URL: https://github.com/apache/flink/pull/6872#discussion_r226364750
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
 ##
 @@ -701,12 +701,14 @@ public boolean contains(ConfigOption configOption) {
if (this.confData.containsKey(configOption.key())) {
return true;
}
-   else if (configOption.hasDeprecatedKeys()) {
-   // try the deprecated keys
-   for (String deprecatedKey : 
configOption.deprecatedKeys()) {
-   if 
(this.confData.containsKey(deprecatedKey)) {
-   LOG.warn("Config uses 
deprecated configuration key '{}' instead of proper key '{}'",
-   deprecatedKey, 
configOption.key());
+   else if (configOption.hasFallbackKeys()) {
+   // try the fallback keys
+   for (FallbackKey fallbackKey : 
configOption.fallbackKeys()) {
+   if 
(this.confData.containsKey(fallbackKey.getKey())) {
+   if (fallbackKey.isDeprecated()) 
{
+   LOG.warn("Config uses 
deprecated configuration key '{}' instead of proper key '{}'",
+   
fallbackKey.getKey(), configOption.key());
 
 Review comment:
   does it mean to extract the LOG statement to one method like below?
   
   ```java
   private void warningDeprecated(...) {
  LOG.warn(...);
   }
   ```
   
   I wonder what value it provide or just style convenience to follow. I don't 
find another code block with a method just logging one statement.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10436) Example config uses deprecated key jobmanager.rpc.address

2018-10-18 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10436?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16655468#comment-16655468
 ] 

ASF GitHub Bot commented on FLINK-10436:


TisonKun commented on a change in pull request #6872: [FLINK-10436] Add 
ConfigOption#withFallbackKeys
URL: https://github.com/apache/flink/pull/6872#discussion_r226363659
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/configuration/FallbackKey.java
 ##
 @@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+/**
+ * A key with FallbackKeys will fall back to the FallbackKeys if it itself is 
not configured.
+ */
+public class FallbackKey {
+
+   private final String key;
+
+   private final boolean isDeprecated;
+
+   public String getKey() {
+   return key;
+   }
+
+   public boolean isDeprecated() {
+   return isDeprecated;
+   }
+
+   FallbackKey(String key, boolean isDeprecated) {
 
 Review comment:
   sound great.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Example config uses deprecated key jobmanager.rpc.address
> -
>
> Key: FLINK-10436
> URL: https://issues.apache.org/jira/browse/FLINK-10436
> Project: Flink
>  Issue Type: Sub-task
>  Components: Startup Shell Scripts
>Affects Versions: 1.7.0
>Reporter: Ufuk Celebi
>Assignee: TisonKun
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> The example {{flink-conf.yaml}} shipped as part of the Flink distribution 
> (https://github.com/apache/flink/blob/master/flink-dist/src/main/resources/flink-conf.yaml)
>  has the following entry:
> {code}
> jobmanager.rpc.address: localhost
> {code}
> When using this key, the following deprecation warning is logged.
> {code}
> 2018-09-26 12:01:46,608 WARN  org.apache.flink.configuration.Configuration
>   - Config uses deprecated configuration key 
> 'jobmanager.rpc.address' instead of proper key 'rest.address'
> {code}
> The example config should not use deprecated config options.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10436) Example config uses deprecated key jobmanager.rpc.address

2018-10-18 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10436?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16655470#comment-16655470
 ] 

ASF GitHub Bot commented on FLINK-10436:


TisonKun commented on a change in pull request #6872: [FLINK-10436] Add 
ConfigOption#withFallbackKeys
URL: https://github.com/apache/flink/pull/6872#discussion_r226363901
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
 ##
 @@ -701,12 +701,14 @@ public boolean contains(ConfigOption configOption) {
if (this.confData.containsKey(configOption.key())) {
return true;
}
-   else if (configOption.hasDeprecatedKeys()) {
-   // try the deprecated keys
-   for (String deprecatedKey : 
configOption.deprecatedKeys()) {
-   if 
(this.confData.containsKey(deprecatedKey)) {
-   LOG.warn("Config uses 
deprecated configuration key '{}' instead of proper key '{}'",
-   deprecatedKey, 
configOption.key());
+   else if (configOption.hasFallbackKeys()) {
+   // try the fallback keys
+   for (FallbackKey fallbackKey : 
configOption.fallbackKeys()) {
+   if 
(this.confData.containsKey(fallbackKey.getKey())) {
+   if (fallbackKey.isDeprecated()) 
{
 
 Review comment:
   agree


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Example config uses deprecated key jobmanager.rpc.address
> -
>
> Key: FLINK-10436
> URL: https://issues.apache.org/jira/browse/FLINK-10436
> Project: Flink
>  Issue Type: Sub-task
>  Components: Startup Shell Scripts
>Affects Versions: 1.7.0
>Reporter: Ufuk Celebi
>Assignee: TisonKun
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> The example {{flink-conf.yaml}} shipped as part of the Flink distribution 
> (https://github.com/apache/flink/blob/master/flink-dist/src/main/resources/flink-conf.yaml)
>  has the following entry:
> {code}
> jobmanager.rpc.address: localhost
> {code}
> When using this key, the following deprecation warning is logged.
> {code}
> 2018-09-26 12:01:46,608 WARN  org.apache.flink.configuration.Configuration
>   - Config uses deprecated configuration key 
> 'jobmanager.rpc.address' instead of proper key 'rest.address'
> {code}
> The example config should not use deprecated config options.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] TisonKun commented on a change in pull request #6872: [FLINK-10436] Add ConfigOption#withFallbackKeys

2018-10-18 Thread GitBox
TisonKun commented on a change in pull request #6872: [FLINK-10436] Add 
ConfigOption#withFallbackKeys
URL: https://github.com/apache/flink/pull/6872#discussion_r226363659
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/configuration/FallbackKey.java
 ##
 @@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+/**
+ * A key with FallbackKeys will fall back to the FallbackKeys if it itself is 
not configured.
+ */
+public class FallbackKey {
+
+   private final String key;
+
+   private final boolean isDeprecated;
+
+   public String getKey() {
+   return key;
+   }
+
+   public boolean isDeprecated() {
+   return isDeprecated;
+   }
+
+   FallbackKey(String key, boolean isDeprecated) {
 
 Review comment:
   sound great.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] TisonKun commented on a change in pull request #6872: [FLINK-10436] Add ConfigOption#withFallbackKeys

2018-10-18 Thread GitBox
TisonKun commented on a change in pull request #6872: [FLINK-10436] Add 
ConfigOption#withFallbackKeys
URL: https://github.com/apache/flink/pull/6872#discussion_r226363901
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
 ##
 @@ -701,12 +701,14 @@ public boolean contains(ConfigOption configOption) {
if (this.confData.containsKey(configOption.key())) {
return true;
}
-   else if (configOption.hasDeprecatedKeys()) {
-   // try the deprecated keys
-   for (String deprecatedKey : 
configOption.deprecatedKeys()) {
-   if 
(this.confData.containsKey(deprecatedKey)) {
-   LOG.warn("Config uses 
deprecated configuration key '{}' instead of proper key '{}'",
-   deprecatedKey, 
configOption.key());
+   else if (configOption.hasFallbackKeys()) {
+   // try the fallback keys
+   for (FallbackKey fallbackKey : 
configOption.fallbackKeys()) {
+   if 
(this.confData.containsKey(fallbackKey.getKey())) {
+   if (fallbackKey.isDeprecated()) 
{
 
 Review comment:
   agree


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10252) Handle oversized metric messges

2018-10-18 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16655451#comment-16655451
 ] 

ASF GitHub Bot commented on FLINK-10252:


yanghua commented on a change in pull request #6850: [FLINK-10252] Handle 
oversized metric messges
URL: https://github.com/apache/flink/pull/6850#discussion_r226358701
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java
 ##
 @@ -124,50 +124,86 @@ public MetricSerializationResult serialize(
Map> counters,
Map, Tuple2> gauges,
Map> 
histograms,
-   Map> meters) {
+   Map> meters,
+   long maximumFramesize,
+   MetricQueryService queryService) {
 
buffer.clear();
+   boolean unregisterRemainingMetrics = false;
 
int numCounters = 0;
for (Map.Entry> 
entry : counters.entrySet()) {
+   if (unregisterRemainingMetrics) {
+   queryService.unregister(entry.getKey());
+   continue;
+   }
+
try {
serializeCounter(buffer, 
entry.getValue().f0, entry.getValue().f1, entry.getKey());
numCounters++;
+   if (buffer.length() > maximumFramesize) 
{
+   unregisterRemainingMetrics = 
true;
+   }
} catch (Exception e) {
LOG.debug("Failed to serialize 
counter.", e);
+
}
}
 
int numGauges = 0;
for (Map.Entry, Tuple2> entry : gauges.entrySet()) {
+   if (unregisterRemainingMetrics) {
+   queryService.unregister(entry.getKey());
+   continue;
+   }
+
try {
serializeGauge(buffer, 
entry.getValue().f0, entry.getValue().f1, entry.getKey());
numGauges++;
+   if (buffer.length() > maximumFramesize) 
{
+   unregisterRemainingMetrics = 
true;
+   }
} catch (Exception e) {
LOG.debug("Failed to serialize gauge.", 
e);
}
}
 
-   int numHistograms = 0;
-   for (Map.Entry> entry : histograms.entrySet()) {
-   try {
-   serializeHistogram(buffer, 
entry.getValue().f0, entry.getValue().f1, entry.getKey());
-   numHistograms++;
-   } catch (Exception e) {
-   LOG.debug("Failed to serialize 
histogram.", e);
-   }
-   }
-
int numMeters = 0;
for (Map.Entry> 
entry : meters.entrySet()) {
+   if (unregisterRemainingMetrics) {
+   queryService.unregister(entry.getKey());
+   continue;
+   }
+
try {
serializeMeter(buffer, 
entry.getValue().f0, entry.getValue().f1, entry.getKey());
numMeters++;
+   if (buffer.length() > maximumFramesize) 
{
+   unregisterRemainingMetrics = 
true;
+   }
} catch (Exception e) {
LOG.debug("Failed to serialize meter.", 
e);
}
}
 
+   int numHistograms = 0;
+   for (Map.Entry> entry : histograms.entrySet()) {
+   if (unregisterRemainingMetrics) {
+   queryService.unregister(entry.getKey());
+   continue;
+   }
+
+   try {
+   serializeHistogram(buffer, 

[jira] [Commented] (FLINK-10436) Example config uses deprecated key jobmanager.rpc.address

2018-10-18 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10436?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16655450#comment-16655450
 ] 

ASF GitHub Bot commented on FLINK-10436:


TisonKun commented on issue #6872: [FLINK-10436] Add 
ConfigOption#withFallbackKeys
URL: https://github.com/apache/flink/pull/6872#issuecomment-431058218
 
 
   The idea we need a fallback key is because keyA and keyB(with keyA fallback 
to keyB) are conceptually not the same thing or keyB succeed keyA. It just, in 
the context, we use the configured keyB as a default(fallback) value to keyA. 
And I agree that if the key's semantic changes, we should update the fallback 
dependency on demand.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Example config uses deprecated key jobmanager.rpc.address
> -
>
> Key: FLINK-10436
> URL: https://issues.apache.org/jira/browse/FLINK-10436
> Project: Flink
>  Issue Type: Sub-task
>  Components: Startup Shell Scripts
>Affects Versions: 1.7.0
>Reporter: Ufuk Celebi
>Assignee: TisonKun
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> The example {{flink-conf.yaml}} shipped as part of the Flink distribution 
> (https://github.com/apache/flink/blob/master/flink-dist/src/main/resources/flink-conf.yaml)
>  has the following entry:
> {code}
> jobmanager.rpc.address: localhost
> {code}
> When using this key, the following deprecation warning is logged.
> {code}
> 2018-09-26 12:01:46,608 WARN  org.apache.flink.configuration.Configuration
>   - Config uses deprecated configuration key 
> 'jobmanager.rpc.address' instead of proper key 'rest.address'
> {code}
> The example config should not use deprecated config options.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] yanghua commented on a change in pull request #6850: [FLINK-10252] Handle oversized metric messges

2018-10-18 Thread GitBox
yanghua commented on a change in pull request #6850: [FLINK-10252] Handle 
oversized metric messges
URL: https://github.com/apache/flink/pull/6850#discussion_r226358701
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java
 ##
 @@ -124,50 +124,86 @@ public MetricSerializationResult serialize(
Map> counters,
Map, Tuple2> gauges,
Map> 
histograms,
-   Map> meters) {
+   Map> meters,
+   long maximumFramesize,
+   MetricQueryService queryService) {
 
buffer.clear();
+   boolean unregisterRemainingMetrics = false;
 
int numCounters = 0;
for (Map.Entry> 
entry : counters.entrySet()) {
+   if (unregisterRemainingMetrics) {
+   queryService.unregister(entry.getKey());
+   continue;
+   }
+
try {
serializeCounter(buffer, 
entry.getValue().f0, entry.getValue().f1, entry.getKey());
numCounters++;
+   if (buffer.length() > maximumFramesize) 
{
+   unregisterRemainingMetrics = 
true;
+   }
} catch (Exception e) {
LOG.debug("Failed to serialize 
counter.", e);
+
}
}
 
int numGauges = 0;
for (Map.Entry, Tuple2> entry : gauges.entrySet()) {
+   if (unregisterRemainingMetrics) {
+   queryService.unregister(entry.getKey());
+   continue;
+   }
+
try {
serializeGauge(buffer, 
entry.getValue().f0, entry.getValue().f1, entry.getKey());
numGauges++;
+   if (buffer.length() > maximumFramesize) 
{
+   unregisterRemainingMetrics = 
true;
+   }
} catch (Exception e) {
LOG.debug("Failed to serialize gauge.", 
e);
}
}
 
-   int numHistograms = 0;
-   for (Map.Entry> entry : histograms.entrySet()) {
-   try {
-   serializeHistogram(buffer, 
entry.getValue().f0, entry.getValue().f1, entry.getKey());
-   numHistograms++;
-   } catch (Exception e) {
-   LOG.debug("Failed to serialize 
histogram.", e);
-   }
-   }
-
int numMeters = 0;
for (Map.Entry> 
entry : meters.entrySet()) {
+   if (unregisterRemainingMetrics) {
+   queryService.unregister(entry.getKey());
+   continue;
+   }
+
try {
serializeMeter(buffer, 
entry.getValue().f0, entry.getValue().f1, entry.getKey());
numMeters++;
+   if (buffer.length() > maximumFramesize) 
{
+   unregisterRemainingMetrics = 
true;
+   }
} catch (Exception e) {
LOG.debug("Failed to serialize meter.", 
e);
}
}
 
+   int numHistograms = 0;
+   for (Map.Entry> entry : histograms.entrySet()) {
+   if (unregisterRemainingMetrics) {
+   queryService.unregister(entry.getKey());
+   continue;
+   }
+
+   try {
+   serializeHistogram(buffer, 
entry.getValue().f0, entry.getValue().f1, entry.getKey());
+   numHistograms++;
+   if (buffer.length() > maximumFramesize) 
{
 
 Review comment:
   OK, I will try to separate the byte array 

[GitHub] TisonKun commented on issue #6872: [FLINK-10436] Add ConfigOption#withFallbackKeys

2018-10-18 Thread GitBox
TisonKun commented on issue #6872: [FLINK-10436] Add 
ConfigOption#withFallbackKeys
URL: https://github.com/apache/flink/pull/6872#issuecomment-431058218
 
 
   The idea we need a fallback key is because keyA and keyB(with keyA fallback 
to keyB) are conceptually not the same thing or keyB succeed keyA. It just, in 
the context, we use the configured keyB as a default(fallback) value to keyA. 
And I agree that if the key's semantic changes, we should update the fallback 
dependency on demand.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10527) Cleanup constant isNewMode in YarnTestBase

2018-10-18 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10527?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16655447#comment-16655447
 ] 

ASF GitHub Bot commented on FLINK-10527:


yanghua commented on a change in pull request #6816: [FLINK-10527] Cleanup 
constant isNewMode in YarnTestBase
URL: https://github.com/apache/flink/pull/6816#discussion_r226357341
 
 

 ##
 File path: 
flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
 ##
 @@ -189,9 +189,9 @@ public void perJobYarnClusterOffHeap() throws IOException {
/**
 * Test TaskManager failure and also if the vcores are set correctly 
(see issue FLINK-2213).
 */
+   @Ignore
@Test(timeout = 10) // timeout after 100 seconds
public void testTaskManagerFailure() throws Exception {
-   assumeTrue("The new mode does not start TMs upfront.", 
!isNewMode);
 
 Review comment:
   It's here : https://api.travis-ci.org/v3/job/439612829/log.txt . It seems 
there is other reason, but this error message logged in the log file.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Cleanup constant isNewMode in YarnTestBase
> --
>
> Key: FLINK-10527
> URL: https://issues.apache.org/jira/browse/FLINK-10527
> Project: Flink
>  Issue Type: Sub-task
>  Components: YARN
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> This seems to be a residual problem with FLINK-10396. It is set to true in 
> that PR. Currently it has three usage scenarios:
> 1. assert, caused an error
> {code:java}
> assumeTrue("The new mode does not start TMs upfront.", !isNewMode);
> {code}
> 2. if (!isNewMode) the logic in the block would not have invoked, the if 
> block can be removed
> 3. if (isNewMode) always been invoked, the if statement can be removed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] yanghua commented on a change in pull request #6816: [FLINK-10527] Cleanup constant isNewMode in YarnTestBase

2018-10-18 Thread GitBox
yanghua commented on a change in pull request #6816: [FLINK-10527] Cleanup 
constant isNewMode in YarnTestBase
URL: https://github.com/apache/flink/pull/6816#discussion_r226357341
 
 

 ##
 File path: 
flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
 ##
 @@ -189,9 +189,9 @@ public void perJobYarnClusterOffHeap() throws IOException {
/**
 * Test TaskManager failure and also if the vcores are set correctly 
(see issue FLINK-2213).
 */
+   @Ignore
@Test(timeout = 10) // timeout after 100 seconds
public void testTaskManagerFailure() throws Exception {
-   assumeTrue("The new mode does not start TMs upfront.", 
!isNewMode);
 
 Review comment:
   It's here : https://api.travis-ci.org/v3/job/439612829/log.txt . It seems 
there is other reason, but this error message logged in the log file.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10436) Example config uses deprecated key jobmanager.rpc.address

2018-10-18 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10436?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16655437#comment-16655437
 ] 

ASF GitHub Bot commented on FLINK-10436:


tillrohrmann commented on issue #6872: [FLINK-10436] Add 
ConfigOption#withFallbackKeys
URL: https://github.com/apache/flink/pull/6872#issuecomment-431055291
 
 
   The idea is that some options can default to some other option if they are 
not specified but the other option. Here concretely, it is the `rest.address` 
which is the same as the `jobmanager.rpc.address`. This of course only makes 
sense in the current setup, where the `RestServerEndpoint` runs in the same 
process as the `Dispatcher`. In the future, this might change and could require 
to remove the fallback dependency depending on what the default behaviour is.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Example config uses deprecated key jobmanager.rpc.address
> -
>
> Key: FLINK-10436
> URL: https://issues.apache.org/jira/browse/FLINK-10436
> Project: Flink
>  Issue Type: Sub-task
>  Components: Startup Shell Scripts
>Affects Versions: 1.7.0
>Reporter: Ufuk Celebi
>Assignee: TisonKun
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> The example {{flink-conf.yaml}} shipped as part of the Flink distribution 
> (https://github.com/apache/flink/blob/master/flink-dist/src/main/resources/flink-conf.yaml)
>  has the following entry:
> {code}
> jobmanager.rpc.address: localhost
> {code}
> When using this key, the following deprecation warning is logged.
> {code}
> 2018-09-26 12:01:46,608 WARN  org.apache.flink.configuration.Configuration
>   - Config uses deprecated configuration key 
> 'jobmanager.rpc.address' instead of proper key 'rest.address'
> {code}
> The example config should not use deprecated config options.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] tillrohrmann commented on issue #6872: [FLINK-10436] Add ConfigOption#withFallbackKeys

2018-10-18 Thread GitBox
tillrohrmann commented on issue #6872: [FLINK-10436] Add 
ConfigOption#withFallbackKeys
URL: https://github.com/apache/flink/pull/6872#issuecomment-431055291
 
 
   The idea is that some options can default to some other option if they are 
not specified but the other option. Here concretely, it is the `rest.address` 
which is the same as the `jobmanager.rpc.address`. This of course only makes 
sense in the current setup, where the `RestServerEndpoint` runs in the same 
process as the `Dispatcher`. In the future, this might change and could require 
to remove the fallback dependency depending on what the default behaviour is.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10384) Add Sinh math function supported in Table API and SQL

2018-10-18 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16655434#comment-16655434
 ] 

ASF GitHub Bot commented on FLINK-10384:


yanghua commented on issue #6730: [FLINK-10384][table] Add Sinh math function 
supported in Table API and SQL
URL: https://github.com/apache/flink/pull/6730#issuecomment-431054636
 
 
   @pnowojski  I have rebased this PR. About those functions belongs one 
category should put into one PR, you are right. Next time, if I also contribute 
built-in functions, I will follow this rule.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add Sinh math function supported in Table API and SQL
> -
>
> Key: FLINK-10384
> URL: https://issues.apache.org/jira/browse/FLINK-10384
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Minor
>  Labels: pull-request-available
>
> like FLINK-10340 for adding Cosh math function



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10508) Port JobManagerITCase to new code base

2018-10-18 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10508?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16655432#comment-16655432
 ] 

ASF GitHub Bot commented on FLINK-10508:


TisonKun commented on issue #6834: [FLINK-10508] [tests] Port JobManagerITCase 
to new code base
URL: https://github.com/apache/flink/pull/6834#issuecomment-431054374
 
 
   Thanks for your review and answers! @tillrohrmann 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Port JobManagerITCase to new code base
> --
>
> Key: FLINK-10508
> URL: https://issues.apache.org/jira/browse/FLINK-10508
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.7.0
>Reporter: TisonKun
>Assignee: TisonKun
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Port {{JobManagerITCase}} to new code base.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] yanghua commented on issue #6730: [FLINK-10384][table] Add Sinh math function supported in Table API and SQL

2018-10-18 Thread GitBox
yanghua commented on issue #6730: [FLINK-10384][table] Add Sinh math function 
supported in Table API and SQL
URL: https://github.com/apache/flink/pull/6730#issuecomment-431054636
 
 
   @pnowojski  I have rebased this PR. About those functions belongs one 
category should put into one PR, you are right. Next time, if I also contribute 
built-in functions, I will follow this rule.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] TisonKun commented on issue #6834: [FLINK-10508] [tests] Port JobManagerITCase to new code base

2018-10-18 Thread GitBox
TisonKun commented on issue #6834: [FLINK-10508] [tests] Port JobManagerITCase 
to new code base
URL: https://github.com/apache/flink/pull/6834#issuecomment-431054374
 
 
   Thanks for your review and answers! @tillrohrmann 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


  1   2   3   4   5   >