[GitHub] yanghua commented on issue #6850: [FLINK-10252] Handle oversized metric messges

2018-10-16 Thread GitBox
yanghua commented on issue #6850: [FLINK-10252] Handle oversized metric messges
URL: https://github.com/apache/flink/pull/6850#issuecomment-430124087
 
 
   @zentol What about this change?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10252) Handle oversized metric messges

2018-10-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16651237#comment-16651237
 ] 

ASF GitHub Bot commented on FLINK-10252:


yanghua commented on issue #6850: [FLINK-10252] Handle oversized metric messges
URL: https://github.com/apache/flink/pull/6850#issuecomment-430124087
 
 
   @zentol What about this change?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Handle oversized metric messges
> ---
>
> Key: FLINK-10252
> URL: https://issues.apache.org/jira/browse/FLINK-10252
> Project: Flink
>  Issue Type: Sub-task
>  Components: Metrics
>Affects Versions: 1.5.3, 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Assignee: vinoyang
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.7.0, 1.6.2, 1.5.5
>
>
> Since the {{MetricQueryService}} is implemented as an Akka actor, it can only 
> send messages of a smaller size then the current {{akka.framesize}}. We 
> should check similarly to FLINK-10251 whether the payload exceeds the maximum 
> framesize and fail fast if it is true.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] yanghua opened a new pull request #6854: [FLINK-10547] Remove LegacyCLI

2018-10-16 Thread GitBox
yanghua opened a new pull request #6854: [FLINK-10547] Remove LegacyCLI
URL: https://github.com/apache/flink/pull/6854
 
 
   ## What is the purpose of the change
   
   *This pull request removes LegacyCLI*
   
   ## Brief change log
   
 - *Remove LegacyCLI*
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / **no**)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ **not documented**)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] yanghua commented on issue #6854: [FLINK-10547] Remove LegacyCLI

2018-10-16 Thread GitBox
yanghua commented on issue #6854: [FLINK-10547] Remove LegacyCLI
URL: https://github.com/apache/flink/pull/6854#issuecomment-430129477
 
 
   cc @tillrohrmann @zentol 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-10547) Remove LegacyCLI

2018-10-16 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10547?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-10547:
---
Labels: pull-request-available  (was: )

> Remove LegacyCLI
> 
>
> Key: FLINK-10547
> URL: https://issues.apache.org/jira/browse/FLINK-10547
> Project: Flink
>  Issue Type: Sub-task
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Minor
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10547) Remove LegacyCLI

2018-10-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16651254#comment-16651254
 ] 

ASF GitHub Bot commented on FLINK-10547:


yanghua opened a new pull request #6854: [FLINK-10547] Remove LegacyCLI
URL: https://github.com/apache/flink/pull/6854
 
 
   ## What is the purpose of the change
   
   *This pull request removes LegacyCLI*
   
   ## Brief change log
   
 - *Remove LegacyCLI*
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / **no**)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ **not documented**)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Remove LegacyCLI
> 
>
> Key: FLINK-10547
> URL: https://issues.apache.org/jira/browse/FLINK-10547
> Project: Flink
>  Issue Type: Sub-task
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Minor
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10547) Remove LegacyCLI

2018-10-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16651255#comment-16651255
 ] 

ASF GitHub Bot commented on FLINK-10547:


yanghua commented on issue #6854: [FLINK-10547] Remove LegacyCLI
URL: https://github.com/apache/flink/pull/6854#issuecomment-430129477
 
 
   cc @tillrohrmann @zentol 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Remove LegacyCLI
> 
>
> Key: FLINK-10547
> URL: https://issues.apache.org/jira/browse/FLINK-10547
> Project: Flink
>  Issue Type: Sub-task
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Minor
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] yanghua commented on issue #6770: [FLINK-10002] [Webfrontend] WebUI shows jm/tm logs more friendly.

2018-10-16 Thread GitBox
yanghua commented on issue #6770:  [FLINK-10002] [Webfrontend] WebUI shows 
jm/tm logs more friendly.
URL: https://github.com/apache/flink/pull/6770#issuecomment-430130122
 
 
   @zhangxinyu1 this PR has conflicting files, please update and rebase it. I 
will try to look around before Till.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10002) WebUI shows logs unfriendly, especially when the amount of logs is large

2018-10-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10002?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16651256#comment-16651256
 ] 

ASF GitHub Bot commented on FLINK-10002:


yanghua commented on issue #6770:  [FLINK-10002] [Webfrontend] WebUI shows 
jm/tm logs more friendly.
URL: https://github.com/apache/flink/pull/6770#issuecomment-430130122
 
 
   @zhangxinyu1 this PR has conflicting files, please update and rebase it. I 
will try to look around before Till.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> WebUI shows logs unfriendly, especially when the amount of logs is large
> 
>
> Key: FLINK-10002
> URL: https://issues.apache.org/jira/browse/FLINK-10002
> Project: Flink
>  Issue Type: Improvement
>  Components: Webfrontend
>Reporter: zhangxinyu
>Assignee: zhangxinyu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
> Attachments: image-2018-09-10-11-38-07-973.png
>
>
> When a streaming job run for a long time, the amount of logs may be very 
> large. The current WebUI shows all content of logs. It will cost much time to 
> download logs from task managers. and the browser cannot display the logs.
> Therefore, I suggest that Flink uses DailyRollingAppender to split logs by 
> default, and task manager provides an API that can get logs based on a 
> parameter of time interval. In this way WebUI can display logs based on time 
> interval.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] yanghua commented on issue #6703: [FLINK-9697] Provide connector for modern Kafka

2018-10-16 Thread GitBox
yanghua commented on issue #6703: [FLINK-9697] Provide connector for modern 
Kafka
URL: https://github.com/apache/flink/pull/6703#issuecomment-430130606
 
 
   Is there anything else I need to do about this PR?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9697) Provide connector for modern Kafka

2018-10-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16651258#comment-16651258
 ] 

ASF GitHub Bot commented on FLINK-9697:
---

yanghua commented on issue #6703: [FLINK-9697] Provide connector for modern 
Kafka
URL: https://github.com/apache/flink/pull/6703#issuecomment-430130606
 
 
   Is there anything else I need to do about this PR?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Provide connector for modern Kafka
> --
>
> Key: FLINK-9697
> URL: https://issues.apache.org/jira/browse/FLINK-9697
> Project: Flink
>  Issue Type: Improvement
>Reporter: Ted Yu
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> Kafka 2.0.0 would be released soon.
> Here is vote thread:
> [http://search-hadoop.com/m/Kafka/uyzND1vxnEd23QLxb?subj=+VOTE+2+0+0+RC1]
> We should provide connector for Kafka 2.0.0 once it is released.
> Upgrade to 2.0 documentation : 
> http://kafka.apache.org/20/documentation.html#upgrade_2_0_0



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10561) Flink doesn't start locally if remove log package

2018-10-16 Thread Aleksei Starikov (JIRA)
Aleksei Starikov created FLINK-10561:


 Summary: Flink doesn't start locally if remove log package
 Key: FLINK-10561
 URL: https://issues.apache.org/jira/browse/FLINK-10561
 Project: Flink
  Issue Type: Bug
  Components: Startup Shell Scripts
Affects Versions: 1.6.1
 Environment: windows 7
Reporter: Aleksei Starikov
 Attachments: start-cluster_without_log_package.png

Download flink from 
[https://www.apache.org/dyn/closer.lua/flink/flink-1.6.1/flink-1.6.1-bin-scala_2.11.tgz].

Unzip archive.

Remove log package.

Try to start locally using command .\bin\start-cluster.bat

 

Flink doesn't start and you get message:

 

D:\work\prog\flink-1.6.1>.\bin\start-cluster.bat

Системе не удается найти указанный путь.

Starting a local cluster with one JobManager process and one TaskManager 
process.

You can terminate the processes via CTRL-C in the spawned shell windows.

Web interface by default on [http://localhost:8081/].

Системе не удается найти указанный путь.

Системе не удается найти указанный путь.

 

In Russian "Системе не удается найти указанный путь" means "The system cannot 
find the path specified".

 

Flink can create this package himself or make more sensible message.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10540) Remove legacy FlinkMiniCluster

2018-10-16 Thread Shimin Yang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10540?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16651276#comment-16651276
 ] 

Shimin Yang commented on FLINK-10540:
-

Hi [~Tison], I would love to take this if you haven't worked on it.

> Remove legacy FlinkMiniCluster
> --
>
> Key: FLINK-10540
> URL: https://issues.apache.org/jira/browse/FLINK-10540
> Project: Flink
>  Issue Type: Sub-task
>Affects Versions: 1.7.0
>Reporter: TisonKun
>Priority: Major
> Fix For: 1.7.0
>
>
> {{FlinkMiniCluster}} is based on legacy cluster mode and should be no longer 
> used.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] aljoscha commented on issue #6703: [FLINK-9697] Provide connector for modern Kafka

2018-10-16 Thread GitBox
aljoscha commented on issue #6703: [FLINK-9697] Provide connector for modern 
Kafka
URL: https://github.com/apache/flink/pull/6703#issuecomment-430139529
 
 
   @yanghua You don't have to do anything more, I'll merge shortly.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9697) Provide connector for modern Kafka

2018-10-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16651280#comment-16651280
 ] 

ASF GitHub Bot commented on FLINK-9697:
---

aljoscha commented on issue #6703: [FLINK-9697] Provide connector for modern 
Kafka
URL: https://github.com/apache/flink/pull/6703#issuecomment-430139529
 
 
   @yanghua You don't have to do anything more, I'll merge shortly.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Provide connector for modern Kafka
> --
>
> Key: FLINK-9697
> URL: https://issues.apache.org/jira/browse/FLINK-9697
> Project: Flink
>  Issue Type: Improvement
>Reporter: Ted Yu
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> Kafka 2.0.0 would be released soon.
> Here is vote thread:
> [http://search-hadoop.com/m/Kafka/uyzND1vxnEd23QLxb?subj=+VOTE+2+0+0+RC1]
> We should provide connector for Kafka 2.0.0 once it is released.
> Upgrade to 2.0 documentation : 
> http://kafka.apache.org/20/documentation.html#upgrade_2_0_0



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] dianfu commented on a change in pull request #6815: [FLINK-7062][cep][table] Added basic support for MATCH_RECOGNIZE

2018-10-16 Thread GitBox
dianfu commented on a change in pull request #6815:  [FLINK-7062][cep][table] 
Added basic support for MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/6815#discussion_r225410002
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala
 ##
 @@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.codegen
+
+import java.util
+
+import org.apache.calcite.rex._
+import org.apache.calcite.sql.fun.SqlStdOperatorTable._
+import org.apache.flink.api.common.functions._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import org.apache.flink.cep.pattern.conditions.IterativeCondition
+import org.apache.flink.cep.{PatternFlatSelectFunction, PatternSelectFunction}
+import org.apache.flink.table.api.{TableConfig, TableException}
+import org.apache.flink.table.codegen.CodeGenUtils.{boxedTypeTermForTypeInfo, 
newName, primitiveDefaultValue}
+import org.apache.flink.table.codegen.Indenter.toISC
+import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.util.Collector
+import org.apache.flink.util.MathUtils.checkedDownCast
+import java.lang.{Long => JLong}
+
+import scala.collection.JavaConverters._
+
+/**
+  * A code generator for generating CEP related functions.
+  *
+  * @param config configuration that determines runtime behavior
+  * @param nullableInput input(s) can be null.
+  * @param input type information about the first input of the Function
+  * @param currentPattern if generating condition the name of pattern, which 
the condition will
+  *   be applied to
+  */
+class MatchCodeGenerator(
+config: TableConfig,
+nullableInput: Boolean,
+input: TypeInformation[_ <: Any],
+currentPattern: Option[String] = None)
+  extends CodeGenerator(config, nullableInput, input){
+
+  def generateMatchFunction[F <: Function, T <: Any](
+name: String,
+clazz: Class[F],
+bodyCode: String,
+returnType: TypeInformation[T])
+  : GeneratedFunction[F, T] = {
+// This is a separate method from FunctionCodeGenerator#generateFunction 
because as of now
+// functions in CEP library do not support rich interfaces
+val funcName = newName(name)
+val collectorTypeTerm = classOf[Collector[Any]].getCanonicalName
+val (functionClass, signature, inputStatements, isInterface) =
+  if (clazz == classOf[IterativeCondition[_]]) {
+val baseClass = classOf[IterativeCondition[_]]
+val inputTypeTerm = boxedTypeTermForTypeInfo(input)
+val contextType = 
classOf[IterativeCondition.Context[_]].getCanonicalName
+
+(baseClass,
+  s"boolean filter( Object _in1, $contextType $contextTerm)",
+  List(s"$inputTypeTerm $input1Term = ($inputTypeTerm) _in1;"),
+  false)
+  } else if (clazz == classOf[PatternSelectFunction[_, _]]) {
+val baseClass = classOf[PatternSelectFunction[_, _]]
+val inputTypeTerm =
+  s"java.util.Map>"
+
+(baseClass,
+  s"Object select($inputTypeTerm $input1Term)",
+  List(),
+  true)
+  } else if (clazz == classOf[PatternFlatSelectFunction[_, _]]) {
+val baseClass = classOf[PatternFlatSelectFunction[_, _]]
+val inputTypeTerm =
+  s"java.util.Map>"
+
+(baseClass,
+  s"void flatSelect($inputTypeTerm $input1Term, $collectorTypeTerm 
$collectorTerm)",
+  List(),
+  true)
+  } else {
+throw new CodeGenException("Unsupported Function.")
+  }
+
+val extendsKeyword = if (isInterface) "implements" else "extends"
+val funcCode = j"""
+  |public class $funcName $extendsKeyword 
${functionClass.getCanonicalName} {
+  |
+  |  ${reuseMemberCode()}
+  |
+  |  public $funcName() throws Exception {
+  |${reuseInitCode()}
+  |  }
+  |
+  |  @Override
+  |  public $signature throws Exception {
+  |${inputStatements.mkString("\n")}
+  |${reusePerRecordCode()}
+  |${reuseInputUnboxingCode()}
+  |$bodyCode
+  |  }
+  |}
+""".str

[GitHub] dianfu commented on a change in pull request #6815: [FLINK-7062][cep][table] Added basic support for MATCH_RECOGNIZE

2018-10-16 Thread GitBox
dianfu commented on a change in pull request #6815:  [FLINK-7062][cep][table] 
Added basic support for MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/6815#discussion_r225402418
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamMatch.scala
 ##
 @@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import java.util
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.RelFieldCollation.Direction
+import org.apache.calcite.rel._
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rex._
+import org.apache.calcite.sql.SqlKind
+import org.apache.calcite.sql.SqlMatchRecognize.AfterOption
+import org.apache.calcite.sql.fun.SqlStdOperatorTable._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeutils.TypeComparator
+import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy
+import org.apache.flink.cep.nfa.compiler.NFACompiler
+import org.apache.flink.cep.pattern.Pattern
+import org.apache.flink.cep.pattern.Quantifier.QuantifierProperty
+import org.apache.flink.cep.pattern.conditions.BooleanConditions
+import org.apache.flink.cep.{CEP, EventComparator, PatternStream}
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment, 
TableConfig,
+  TableException}
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.plan.nodes.MatchRecognize
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalMatch
+import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.table.plan.util.RexDefaultVisitor
+import org.apache.flink.table.runtime.`match`._
+import org.apache.flink.table.runtime.aggregate.SortUtil
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+import org.apache.flink.table.runtime.{RowKeySelector, RowtimeProcessFunction}
+import org.apache.flink.types.Row
+import org.apache.flink.util.MathUtils
+
+import _root_.scala.collection.JavaConverters._
+
+/**
+  * Flink RelNode which matches along with LogicalMatch.
+  */
+class DataStreamMatch(
+  cluster: RelOptCluster,
+  traitSet: RelTraitSet,
+  input: RelNode,
+  logicalMatch: FlinkLogicalMatch,
+  schema: RowSchema,
+  inputSchema: RowSchema)
+  extends SingleRel(cluster, traitSet, input)
+with MatchRecognize
+with DataStreamRel {
+
+  private[flink] def getLogicalMatch = logicalMatch
+
+  override def deriveRowType(): RelDataType = schema.relDataType
+
+  override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): 
RelNode = {
+new DataStreamMatch(
+  cluster,
+  traitSet,
+  inputs.get(0),
+  logicalMatch,
+  schema,
+  inputSchema)
+  }
+
+  override def toString: String = {
+matchToString(logicalMatch, inputSchema, getExpressionString)
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+explainMatch(super.explainTerms(pw), logicalMatch, inputSchema, 
getExpressionString)
+  }
+
+  private def translateOrder(
+tableEnv: StreamTableEnvironment,
+crowInput: DataStream[CRow],
+orderKeys: RelCollation) = {
+
+if (orderKeys.getFieldCollations.size() == 0) {
+  throw new TableException("You must specify either rowtime or proctime 
for order by.")
+}
+
+// need to identify time between others order fields. Time needs to be 
first sort element
+val timeOrderField = SortUtil.getFirstSortField(orderKeys, 
inputSchema.relDataType)
+
+if (!FlinkTypeFactory.isTimeIndicatorType(timeOrderField.getType)) {
+  throw new TableException(
+"You must specify either rowtime or proctime for order by as the first 
one.")
+}
+
+// time ordering needs to be ascending
+if (SortUtil.getFirstSortDirection(orderKeys) != Direction.ASCENDING) {
+  throw new TableException("Primary sort order of a streaming table must 
be ascending on time.")
+}
+
+val rowComparator = if (orderKeys.getFieldCollations.size() > 1) {
+  Some(SortUtil
+.createRowCompara

[GitHub] dianfu commented on a change in pull request #6815: [FLINK-7062][cep][table] Added basic support for MATCH_RECOGNIZE

2018-10-16 Thread GitBox
dianfu commented on a change in pull request #6815:  [FLINK-7062][cep][table] 
Added basic support for MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/6815#discussion_r225404650
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/match/MatchUtil.scala
 ##
 @@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.`match`
+
+import java.util
+
+import org.apache.calcite.rel.RelFieldCollation
+import org.apache.calcite.rex.RexNode
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.cep.pattern.conditions.IterativeCondition
+import org.apache.flink.cep.{PatternFlatSelectFunction, PatternSelectFunction}
 
 Review comment:
   unused import


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] dianfu commented on a change in pull request #6815: [FLINK-7062][cep][table] Added basic support for MATCH_RECOGNIZE

2018-10-16 Thread GitBox
dianfu commented on a change in pull request #6815:  [FLINK-7062][cep][table] 
Added basic support for MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/6815#discussion_r225412865
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamMatchRule.scala
 ##
 @@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.datastream
+
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
+import org.apache.calcite.rel.RelFieldCollation.Direction
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.plan.nodes.FlinkConventions
+import org.apache.flink.table.plan.nodes.datastream.DataStreamMatch
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalMatch
+import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.table.runtime.aggregate.SortUtil
+
+class DataStreamMatchRule
+  extends ConverterRule(
+classOf[FlinkLogicalMatch],
+FlinkConventions.LOGICAL,
+FlinkConventions.DATASTREAM,
+"DataStreamMatchRule") {
+
+
 
 Review comment:
   too many empty lines.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-7062) Support the basic functionality of MATCH_RECOGNIZE

2018-10-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7062?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16651285#comment-16651285
 ] 

ASF GitHub Bot commented on FLINK-7062:
---

dianfu commented on a change in pull request #6815:  [FLINK-7062][cep][table] 
Added basic support for MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/6815#discussion_r225404650
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/match/MatchUtil.scala
 ##
 @@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.`match`
+
+import java.util
+
+import org.apache.calcite.rel.RelFieldCollation
+import org.apache.calcite.rex.RexNode
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.cep.pattern.conditions.IterativeCondition
+import org.apache.flink.cep.{PatternFlatSelectFunction, PatternSelectFunction}
 
 Review comment:
   unused import


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support the basic functionality of MATCH_RECOGNIZE
> --
>
> Key: FLINK-7062
> URL: https://issues.apache.org/jira/browse/FLINK-7062
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API & SQL
>Reporter: Dian Fu
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
>
> In this JIRA, we will support the basic functionality of {{MATCH_RECOGNIZE}} 
> in Flink SQL API which includes the support of syntax {{MEASURES}}, 
> {{PATTERN}} and {{DEFINE}}. This would allow users write basic cep use cases 
> with SQL like the following example:
> {code}
> SELECT T.aid, T.bid, T.cid
> FROM MyTable
> MATCH_RECOGNIZE (
>   MEASURES
> A.id AS aid,
> B.id AS bid,
> C.id AS cid
>   PATTERN (A B C)
>   DEFINE
> A AS A.name = 'a',
> B AS B.name = 'b',
> C AS C.name = 'c'
> ) AS T
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7062) Support the basic functionality of MATCH_RECOGNIZE

2018-10-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7062?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16651283#comment-16651283
 ] 

ASF GitHub Bot commented on FLINK-7062:
---

dianfu commented on a change in pull request #6815:  [FLINK-7062][cep][table] 
Added basic support for MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/6815#discussion_r225410002
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala
 ##
 @@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.codegen
+
+import java.util
+
+import org.apache.calcite.rex._
+import org.apache.calcite.sql.fun.SqlStdOperatorTable._
+import org.apache.flink.api.common.functions._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import org.apache.flink.cep.pattern.conditions.IterativeCondition
+import org.apache.flink.cep.{PatternFlatSelectFunction, PatternSelectFunction}
+import org.apache.flink.table.api.{TableConfig, TableException}
+import org.apache.flink.table.codegen.CodeGenUtils.{boxedTypeTermForTypeInfo, 
newName, primitiveDefaultValue}
+import org.apache.flink.table.codegen.Indenter.toISC
+import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.util.Collector
+import org.apache.flink.util.MathUtils.checkedDownCast
+import java.lang.{Long => JLong}
+
+import scala.collection.JavaConverters._
+
+/**
+  * A code generator for generating CEP related functions.
+  *
+  * @param config configuration that determines runtime behavior
+  * @param nullableInput input(s) can be null.
+  * @param input type information about the first input of the Function
+  * @param currentPattern if generating condition the name of pattern, which 
the condition will
+  *   be applied to
+  */
+class MatchCodeGenerator(
+config: TableConfig,
+nullableInput: Boolean,
+input: TypeInformation[_ <: Any],
+currentPattern: Option[String] = None)
+  extends CodeGenerator(config, nullableInput, input){
+
+  def generateMatchFunction[F <: Function, T <: Any](
+name: String,
+clazz: Class[F],
+bodyCode: String,
+returnType: TypeInformation[T])
+  : GeneratedFunction[F, T] = {
+// This is a separate method from FunctionCodeGenerator#generateFunction 
because as of now
+// functions in CEP library do not support rich interfaces
+val funcName = newName(name)
+val collectorTypeTerm = classOf[Collector[Any]].getCanonicalName
+val (functionClass, signature, inputStatements, isInterface) =
+  if (clazz == classOf[IterativeCondition[_]]) {
+val baseClass = classOf[IterativeCondition[_]]
+val inputTypeTerm = boxedTypeTermForTypeInfo(input)
+val contextType = 
classOf[IterativeCondition.Context[_]].getCanonicalName
+
+(baseClass,
+  s"boolean filter( Object _in1, $contextType $contextTerm)",
+  List(s"$inputTypeTerm $input1Term = ($inputTypeTerm) _in1;"),
+  false)
+  } else if (clazz == classOf[PatternSelectFunction[_, _]]) {
+val baseClass = classOf[PatternSelectFunction[_, _]]
+val inputTypeTerm =
+  s"java.util.Map>"
+
+(baseClass,
+  s"Object select($inputTypeTerm $input1Term)",
+  List(),
+  true)
+  } else if (clazz == classOf[PatternFlatSelectFunction[_, _]]) {
+val baseClass = classOf[PatternFlatSelectFunction[_, _]]
+val inputTypeTerm =
+  s"java.util.Map>"
+
+(baseClass,
+  s"void flatSelect($inputTypeTerm $input1Term, $collectorTypeTerm 
$collectorTerm)",
+  List(),
+  true)
+  } else {
+throw new CodeGenException("Unsupported Function.")
+  }
+
+val extendsKeyword = if (isInterface) "implements" else "extends"
+val funcCode = j"""
+  |public class $funcName $extendsKeyword 
${functionClass.getCanonicalName} {
+  |
+  |  ${reuseMemberCode()}
+  |
+  |  public $funcName() throws Exception {
+  |${reuseInitCode()}
+  | 

[jira] [Commented] (FLINK-7062) Support the basic functionality of MATCH_RECOGNIZE

2018-10-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7062?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16651282#comment-16651282
 ] 

ASF GitHub Bot commented on FLINK-7062:
---

dianfu commented on a change in pull request #6815:  [FLINK-7062][cep][table] 
Added basic support for MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/6815#discussion_r225412865
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamMatchRule.scala
 ##
 @@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.datastream
+
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
+import org.apache.calcite.rel.RelFieldCollation.Direction
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.plan.nodes.FlinkConventions
+import org.apache.flink.table.plan.nodes.datastream.DataStreamMatch
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalMatch
+import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.table.runtime.aggregate.SortUtil
+
+class DataStreamMatchRule
+  extends ConverterRule(
+classOf[FlinkLogicalMatch],
+FlinkConventions.LOGICAL,
+FlinkConventions.DATASTREAM,
+"DataStreamMatchRule") {
+
+
 
 Review comment:
   too many empty lines.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support the basic functionality of MATCH_RECOGNIZE
> --
>
> Key: FLINK-7062
> URL: https://issues.apache.org/jira/browse/FLINK-7062
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API & SQL
>Reporter: Dian Fu
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
>
> In this JIRA, we will support the basic functionality of {{MATCH_RECOGNIZE}} 
> in Flink SQL API which includes the support of syntax {{MEASURES}}, 
> {{PATTERN}} and {{DEFINE}}. This would allow users write basic cep use cases 
> with SQL like the following example:
> {code}
> SELECT T.aid, T.bid, T.cid
> FROM MyTable
> MATCH_RECOGNIZE (
>   MEASURES
> A.id AS aid,
> B.id AS bid,
> C.id AS cid
>   PATTERN (A B C)
>   DEFINE
> A AS A.name = 'a',
> B AS B.name = 'b',
> C AS C.name = 'c'
> ) AS T
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7062) Support the basic functionality of MATCH_RECOGNIZE

2018-10-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7062?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16651284#comment-16651284
 ] 

ASF GitHub Bot commented on FLINK-7062:
---

dianfu commented on a change in pull request #6815:  [FLINK-7062][cep][table] 
Added basic support for MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/6815#discussion_r225402418
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamMatch.scala
 ##
 @@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import java.util
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.RelFieldCollation.Direction
+import org.apache.calcite.rel._
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rex._
+import org.apache.calcite.sql.SqlKind
+import org.apache.calcite.sql.SqlMatchRecognize.AfterOption
+import org.apache.calcite.sql.fun.SqlStdOperatorTable._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeutils.TypeComparator
+import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy
+import org.apache.flink.cep.nfa.compiler.NFACompiler
+import org.apache.flink.cep.pattern.Pattern
+import org.apache.flink.cep.pattern.Quantifier.QuantifierProperty
+import org.apache.flink.cep.pattern.conditions.BooleanConditions
+import org.apache.flink.cep.{CEP, EventComparator, PatternStream}
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment, 
TableConfig,
+  TableException}
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.plan.nodes.MatchRecognize
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalMatch
+import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.table.plan.util.RexDefaultVisitor
+import org.apache.flink.table.runtime.`match`._
+import org.apache.flink.table.runtime.aggregate.SortUtil
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+import org.apache.flink.table.runtime.{RowKeySelector, RowtimeProcessFunction}
+import org.apache.flink.types.Row
+import org.apache.flink.util.MathUtils
+
+import _root_.scala.collection.JavaConverters._
+
+/**
+  * Flink RelNode which matches along with LogicalMatch.
+  */
+class DataStreamMatch(
+  cluster: RelOptCluster,
+  traitSet: RelTraitSet,
+  input: RelNode,
+  logicalMatch: FlinkLogicalMatch,
+  schema: RowSchema,
+  inputSchema: RowSchema)
+  extends SingleRel(cluster, traitSet, input)
+with MatchRecognize
+with DataStreamRel {
+
+  private[flink] def getLogicalMatch = logicalMatch
+
+  override def deriveRowType(): RelDataType = schema.relDataType
+
+  override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): 
RelNode = {
+new DataStreamMatch(
+  cluster,
+  traitSet,
+  inputs.get(0),
+  logicalMatch,
+  schema,
+  inputSchema)
+  }
+
+  override def toString: String = {
+matchToString(logicalMatch, inputSchema, getExpressionString)
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+explainMatch(super.explainTerms(pw), logicalMatch, inputSchema, 
getExpressionString)
+  }
+
+  private def translateOrder(
+tableEnv: StreamTableEnvironment,
+crowInput: DataStream[CRow],
+orderKeys: RelCollation) = {
+
+if (orderKeys.getFieldCollations.size() == 0) {
+  throw new TableException("You must specify either rowtime or proctime 
for order by.")
+}
+
+// need to identify time between others order fields. Time needs to be 
first sort element
+val timeOrderField = SortUtil.getFirstSortField(orderKeys, 
inputSchema.relDataType)
+
+if (!FlinkTypeFactory.isTimeIndicatorType(timeOrderField.getType)) {
+  throw new TableException(
+"You must specify either rowtime or proctime for order by as the first 
one.")
+}
+
+// time ordering needs to be ascending
+if (SortUtil.getFirstSortDirection(orderKeys) 

[jira] [Commented] (FLINK-10540) Remove legacy FlinkMiniCluster

2018-10-16 Thread TisonKun (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10540?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16651288#comment-16651288
 ] 

TisonKun commented on FLINK-10540:
--

Hi [~dangdangdang], feel free to take over it.

FYI, this is somehow an umbrella issue since there are many dependencies of  
{{FlinkMiniCluster}} its subclasses {{LocalFlinkMiniCluster}} and 
{{TestingMiniCluster}}. We finally remove {{FlinkMiniCluster}} but the process 
might be a bit further than just removal.

> Remove legacy FlinkMiniCluster
> --
>
> Key: FLINK-10540
> URL: https://issues.apache.org/jira/browse/FLINK-10540
> Project: Flink
>  Issue Type: Sub-task
>Affects Versions: 1.7.0
>Reporter: TisonKun
>Priority: Major
> Fix For: 1.7.0
>
>
> {{FlinkMiniCluster}} is based on legacy cluster mode and should be no longer 
> used.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-10540) Remove legacy FlinkMiniCluster

2018-10-16 Thread TisonKun (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10540?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16651288#comment-16651288
 ] 

TisonKun edited comment on FLINK-10540 at 10/16/18 8:04 AM:


Hi [~dangdangdang], feel free to take over it.

FYI, this is somehow an umbrella issue since there are many dependencies of  
{{FlinkMiniCluster}} and its subclasses {{LocalFlinkMiniCluster}} and 
{{TestingCluster}}. We finally remove {{FlinkMiniCluster}} but the process 
might be a bit further than just removal.


was (Author: tison):
Hi [~dangdangdang], feel free to take over it.

FYI, this is somehow an umbrella issue since there are many dependencies of  
{{FlinkMiniCluster}} and its subclasses {{LocalFlinkMiniCluster}} and 
{{TestingMiniCluster}}. We finally remove {{FlinkMiniCluster}} but the process 
might be a bit further than just removal.

> Remove legacy FlinkMiniCluster
> --
>
> Key: FLINK-10540
> URL: https://issues.apache.org/jira/browse/FLINK-10540
> Project: Flink
>  Issue Type: Sub-task
>Affects Versions: 1.7.0
>Reporter: TisonKun
>Priority: Major
> Fix For: 1.7.0
>
>
> {{FlinkMiniCluster}} is based on legacy cluster mode and should be no longer 
> used.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-10540) Remove legacy FlinkMiniCluster

2018-10-16 Thread TisonKun (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10540?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16651288#comment-16651288
 ] 

TisonKun edited comment on FLINK-10540 at 10/16/18 8:04 AM:


Hi [~dangdangdang], feel free to take over it.

FYI, this is somehow an umbrella issue since there are many dependencies of  
{{FlinkMiniCluster}} and its subclasses {{LocalFlinkMiniCluster}} and 
{{TestingMiniCluster}}. We finally remove {{FlinkMiniCluster}} but the process 
might be a bit further than just removal.


was (Author: tison):
Hi [~dangdangdang], feel free to take over it.

FYI, this is somehow an umbrella issue since there are many dependencies of  
{{FlinkMiniCluster}} its subclasses {{LocalFlinkMiniCluster}} and 
{{TestingMiniCluster}}. We finally remove {{FlinkMiniCluster}} but the process 
might be a bit further than just removal.

> Remove legacy FlinkMiniCluster
> --
>
> Key: FLINK-10540
> URL: https://issues.apache.org/jira/browse/FLINK-10540
> Project: Flink
>  Issue Type: Sub-task
>Affects Versions: 1.7.0
>Reporter: TisonKun
>Priority: Major
> Fix For: 1.7.0
>
>
> {{FlinkMiniCluster}} is based on legacy cluster mode and should be no longer 
> used.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] luyee commented on issue #6075: [FLINK-9407] [hdfs connector] Support orc rolling sink writer

2018-10-16 Thread GitBox
luyee commented on issue #6075: [FLINK-9407] [hdfs connector] Support orc 
rolling sink writer
URL: https://github.com/apache/flink/pull/6075#issuecomment-430148205
 
 
   @zhangminglei  does this pr works well ,in production?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9407) Support orc rolling sink writer

2018-10-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9407?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16651308#comment-16651308
 ] 

ASF GitHub Bot commented on FLINK-9407:
---

luyee commented on issue #6075: [FLINK-9407] [hdfs connector] Support orc 
rolling sink writer
URL: https://github.com/apache/flink/pull/6075#issuecomment-430148205
 
 
   @zhangminglei  does this pr works well ,in production?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support orc rolling sink writer
> ---
>
> Key: FLINK-9407
> URL: https://issues.apache.org/jira/browse/FLINK-9407
> Project: Flink
>  Issue Type: New Feature
>  Components: filesystem-connector
>Reporter: zhangminglei
>Assignee: zhangminglei
>Priority: Major
>  Labels: pull-request-available
>
> Currently, we only support {{StringWriter}}, {{SequenceFileWriter}} and 
> {{AvroKeyValueSinkWriter}}. I would suggest add an orc writer for rolling 
> sink.
> Below, FYI.
> I tested the PR and verify the results with spark sql. Obviously, we can get 
> the results of what we had written down before. But I will give more tests in 
> the next couple of days. Including the performance under compression with 
> short checkpoint intervals. And more UTs.
> {code:java}
> scala> spark.read.orc("hdfs://10.199.196.0:9000/data/hive/man/2018-07-06--21")
> res1: org.apache.spark.sql.DataFrame = [name: string, age: int ... 1 more 
> field]
> scala>
> scala> res1.registerTempTable("tablerice")
> warning: there was one deprecation warning; re-run with -deprecation for 
> details
> scala> spark.sql("select * from tablerice")
> res3: org.apache.spark.sql.DataFrame = [name: string, age: int ... 1 more 
> field]
> scala> res3.show(3)
> +-+---+---+
> | name|age|married|
> +-+---+---+
> |Sagar| 26|  false|
> |Sagar| 30|  false|
> |Sagar| 34|  false|
> +-+---+---+
> only showing top 3 rows
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] zentol commented on a change in pull request #6850: [FLINK-10252] Handle oversized metric messges

2018-10-16 Thread GitBox
zentol commented on a change in pull request #6850: [FLINK-10252] Handle 
oversized metric messges
URL: https://github.com/apache/flink/pull/6850#discussion_r225449312
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java
 ##
 @@ -109,7 +122,17 @@ public void onReceive(Object message) {
}
} else if (message instanceof CreateDump) {

MetricDumpSerialization.MetricSerializationResult dump = 
serializer.serialize(counters, gauges, histograms, meters);
-   getSender().tell(dump, getSelf());
+
+   int realMsgSize = dump.serializedMetrics.length;
+
+   if (realMsgSize > maximumFramesize) {
+   String overSizeErrorMsg = "The metric 
dump message size : " + realMsgSize
+   + " exceeds the maximum akka 
framesize : " + maximumFramesize + ".";
+   LOG.error(overSizeErrorMsg);
+   getSender().tell(new Status.Failure(new 
IOException(overSizeErrorMsg)), getSelf());
 
 Review comment:
   While better than crashing a more reasonable option might be to create a new 
dump with reduced number of metrics.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zentol commented on a change in pull request #6850: [FLINK-10252] Handle oversized metric messges

2018-10-16 Thread GitBox
zentol commented on a change in pull request #6850: [FLINK-10252] Handle 
oversized metric messges
URL: https://github.com/apache/flink/pull/6850#discussion_r225447615
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java
 ##
 @@ -70,6 +72,17 @@ public String filterCharacters(String input) {
private final Map> histograms 
= new HashMap<>();
private final Map> meters = new 
HashMap<>();
 
+   private long maximumFramesize;
+
+   @Override
+   public void preStart() throws Exception {
+   if 
(getContext().system().settings().config().hasPath(MAXIMUM_FRAME_SIZE_PATH)) {
+   maximumFramesize = 
getContext().system().settings().config().getBytes(MAXIMUM_FRAME_SIZE_PATH);
+   } else {
+   maximumFramesize = Long.MAX_VALUE;
 
 Review comment:
   this is not a reasonable default.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zentol commented on a change in pull request #6850: [FLINK-10252] Handle oversized metric messges

2018-10-16 Thread GitBox
zentol commented on a change in pull request #6850: [FLINK-10252] Handle 
oversized metric messges
URL: https://github.com/apache/flink/pull/6850#discussion_r225448197
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java
 ##
 @@ -70,6 +72,17 @@ public String filterCharacters(String input) {
private final Map> histograms 
= new HashMap<>();
private final Map> meters = new 
HashMap<>();
 
+   private long maximumFramesize;
+
+   @Override
+   public void preStart() throws Exception {
+   if 
(getContext().system().settings().config().hasPath(MAXIMUM_FRAME_SIZE_PATH)) {
 
 Review comment:
   We should rely on `akka.framesize`instead. 
https://ci.apache.org/projects/flink/flink-docs-master/ops/config.html#akka-framesize


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zentol commented on a change in pull request #6850: [FLINK-10252] Handle oversized metric messges

2018-10-16 Thread GitBox
zentol commented on a change in pull request #6850: [FLINK-10252] Handle 
oversized metric messges
URL: https://github.com/apache/flink/pull/6850#discussion_r225449585
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java
 ##
 @@ -109,7 +122,17 @@ public void onReceive(Object message) {
}
} else if (message instanceof CreateDump) {

MetricDumpSerialization.MetricSerializationResult dump = 
serializer.serialize(counters, gauges, histograms, meters);
-   getSender().tell(dump, getSelf());
+
+   int realMsgSize = dump.serializedMetrics.length;
+
+   if (realMsgSize > maximumFramesize) {
+   String overSizeErrorMsg = "The metric 
dump message size : " + realMsgSize
+   + " exceeds the maximum akka 
framesize : " + maximumFramesize + ".";
+   LOG.error(overSizeErrorMsg);
+   getSender().tell(new Status.Failure(new 
IOException(overSizeErrorMsg)), getSelf());
 
 Review comment:
   or even pass in maxSize to the serialization logic and only serialize up to 
that point.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10252) Handle oversized metric messges

2018-10-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16651332#comment-16651332
 ] 

ASF GitHub Bot commented on FLINK-10252:


zentol commented on a change in pull request #6850: [FLINK-10252] Handle 
oversized metric messges
URL: https://github.com/apache/flink/pull/6850#discussion_r225449585
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java
 ##
 @@ -109,7 +122,17 @@ public void onReceive(Object message) {
}
} else if (message instanceof CreateDump) {

MetricDumpSerialization.MetricSerializationResult dump = 
serializer.serialize(counters, gauges, histograms, meters);
-   getSender().tell(dump, getSelf());
+
+   int realMsgSize = dump.serializedMetrics.length;
+
+   if (realMsgSize > maximumFramesize) {
+   String overSizeErrorMsg = "The metric 
dump message size : " + realMsgSize
+   + " exceeds the maximum akka 
framesize : " + maximumFramesize + ".";
+   LOG.error(overSizeErrorMsg);
+   getSender().tell(new Status.Failure(new 
IOException(overSizeErrorMsg)), getSelf());
 
 Review comment:
   or even pass in maxSize to the serialization logic and only serialize up to 
that point.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Handle oversized metric messges
> ---
>
> Key: FLINK-10252
> URL: https://issues.apache.org/jira/browse/FLINK-10252
> Project: Flink
>  Issue Type: Sub-task
>  Components: Metrics
>Affects Versions: 1.5.3, 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Assignee: vinoyang
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.7.0, 1.6.2, 1.5.5
>
>
> Since the {{MetricQueryService}} is implemented as an Akka actor, it can only 
> send messages of a smaller size then the current {{akka.framesize}}. We 
> should check similarly to FLINK-10251 whether the payload exceeds the maximum 
> framesize and fail fast if it is true.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10251) Handle oversized response messages in AkkaRpcActor

2018-10-16 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10251?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16651331#comment-16651331
 ] 

vinoyang commented on FLINK-10251:
--

hi [~Zentol] Regarding this oversize detection, do we only need to do this in 
AkkaRpcActor#handleRpcInvocation?

> Handle oversized response messages in AkkaRpcActor
> --
>
> Key: FLINK-10251
> URL: https://issues.apache.org/jira/browse/FLINK-10251
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.5.3, 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Assignee: vinoyang
>Priority: Major
> Fix For: 1.7.0, 1.6.2, 1.5.5
>
>
> The {{AkkaRpcActor}} should check whether an RPC response which is sent to a 
> remote sender does not exceed the maximum framesize of the underlying 
> {{ActorSystem}}. If this is the case we should fail fast instead. We can 
> achieve this by serializing the response and sending the serialized byte 
> array.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10252) Handle oversized metric messges

2018-10-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16651328#comment-16651328
 ] 

ASF GitHub Bot commented on FLINK-10252:


zentol commented on a change in pull request #6850: [FLINK-10252] Handle 
oversized metric messges
URL: https://github.com/apache/flink/pull/6850#discussion_r225448197
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java
 ##
 @@ -70,6 +72,17 @@ public String filterCharacters(String input) {
private final Map> histograms 
= new HashMap<>();
private final Map> meters = new 
HashMap<>();
 
+   private long maximumFramesize;
+
+   @Override
+   public void preStart() throws Exception {
+   if 
(getContext().system().settings().config().hasPath(MAXIMUM_FRAME_SIZE_PATH)) {
 
 Review comment:
   We should rely on `akka.framesize`instead. 
https://ci.apache.org/projects/flink/flink-docs-master/ops/config.html#akka-framesize


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Handle oversized metric messges
> ---
>
> Key: FLINK-10252
> URL: https://issues.apache.org/jira/browse/FLINK-10252
> Project: Flink
>  Issue Type: Sub-task
>  Components: Metrics
>Affects Versions: 1.5.3, 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Assignee: vinoyang
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.7.0, 1.6.2, 1.5.5
>
>
> Since the {{MetricQueryService}} is implemented as an Akka actor, it can only 
> send messages of a smaller size then the current {{akka.framesize}}. We 
> should check similarly to FLINK-10251 whether the payload exceeds the maximum 
> framesize and fail fast if it is true.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10252) Handle oversized metric messges

2018-10-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16651329#comment-16651329
 ] 

ASF GitHub Bot commented on FLINK-10252:


zentol commented on a change in pull request #6850: [FLINK-10252] Handle 
oversized metric messges
URL: https://github.com/apache/flink/pull/6850#discussion_r225447615
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java
 ##
 @@ -70,6 +72,17 @@ public String filterCharacters(String input) {
private final Map> histograms 
= new HashMap<>();
private final Map> meters = new 
HashMap<>();
 
+   private long maximumFramesize;
+
+   @Override
+   public void preStart() throws Exception {
+   if 
(getContext().system().settings().config().hasPath(MAXIMUM_FRAME_SIZE_PATH)) {
+   maximumFramesize = 
getContext().system().settings().config().getBytes(MAXIMUM_FRAME_SIZE_PATH);
+   } else {
+   maximumFramesize = Long.MAX_VALUE;
 
 Review comment:
   this is not a reasonable default.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Handle oversized metric messges
> ---
>
> Key: FLINK-10252
> URL: https://issues.apache.org/jira/browse/FLINK-10252
> Project: Flink
>  Issue Type: Sub-task
>  Components: Metrics
>Affects Versions: 1.5.3, 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Assignee: vinoyang
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.7.0, 1.6.2, 1.5.5
>
>
> Since the {{MetricQueryService}} is implemented as an Akka actor, it can only 
> send messages of a smaller size then the current {{akka.framesize}}. We 
> should check similarly to FLINK-10251 whether the payload exceeds the maximum 
> framesize and fail fast if it is true.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10252) Handle oversized metric messges

2018-10-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16651330#comment-16651330
 ] 

ASF GitHub Bot commented on FLINK-10252:


zentol commented on a change in pull request #6850: [FLINK-10252] Handle 
oversized metric messges
URL: https://github.com/apache/flink/pull/6850#discussion_r225449312
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java
 ##
 @@ -109,7 +122,17 @@ public void onReceive(Object message) {
}
} else if (message instanceof CreateDump) {

MetricDumpSerialization.MetricSerializationResult dump = 
serializer.serialize(counters, gauges, histograms, meters);
-   getSender().tell(dump, getSelf());
+
+   int realMsgSize = dump.serializedMetrics.length;
+
+   if (realMsgSize > maximumFramesize) {
+   String overSizeErrorMsg = "The metric 
dump message size : " + realMsgSize
+   + " exceeds the maximum akka 
framesize : " + maximumFramesize + ".";
+   LOG.error(overSizeErrorMsg);
+   getSender().tell(new Status.Failure(new 
IOException(overSizeErrorMsg)), getSelf());
 
 Review comment:
   While better than crashing a more reasonable option might be to create a new 
dump with reduced number of metrics.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Handle oversized metric messges
> ---
>
> Key: FLINK-10252
> URL: https://issues.apache.org/jira/browse/FLINK-10252
> Project: Flink
>  Issue Type: Sub-task
>  Components: Metrics
>Affects Versions: 1.5.3, 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Assignee: vinoyang
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.7.0, 1.6.2, 1.5.5
>
>
> Since the {{MetricQueryService}} is implemented as an Akka actor, it can only 
> send messages of a smaller size then the current {{akka.framesize}}. We 
> should check similarly to FLINK-10251 whether the payload exceeds the maximum 
> framesize and fail fast if it is true.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-10551) Remove legacy REST handlers

2018-10-16 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10551?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-10551.

Resolution: Fixed

master: 5fb0e3d4e7d636c81fdaa4460123cc8b34037af3

> Remove legacy REST handlers
> ---
>
> Key: FLINK-10551
> URL: https://issues.apache.org/jira/browse/FLINK-10551
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] zentol commented on a change in pull request #6814: [FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor

2018-10-16 Thread GitBox
zentol commented on a change in pull request #6814: 
[FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor
URL: https://github.com/apache/flink/pull/6814#discussion_r225451475
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
 ##
 @@ -456,7 +456,7 @@ public CheckpointStorage createCheckpointStorage(JobID 
jobId) throws IOException
priorityQueueStateType,
ttlTimeProvider,
getMemoryWatcherOptions(),
-   operatorMetricGroup
+   metricGroup
 
 Review comment:
   indentation is off


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10423) Forward RocksDB native metrics to Flink metrics reporter

2018-10-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16651341#comment-16651341
 ] 

ASF GitHub Bot commented on FLINK-10423:


zentol commented on a change in pull request #6814: 
[FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor
URL: https://github.com/apache/flink/pull/6814#discussion_r225451475
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
 ##
 @@ -456,7 +456,7 @@ public CheckpointStorage createCheckpointStorage(JobID 
jobId) throws IOException
priorityQueueStateType,
ttlTimeProvider,
getMemoryWatcherOptions(),
-   operatorMetricGroup
+   metricGroup
 
 Review comment:
   indentation is off


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Forward RocksDB native metrics to Flink metrics reporter 
> -
>
> Key: FLINK-10423
> URL: https://issues.apache.org/jira/browse/FLINK-10423
> Project: Flink
>  Issue Type: New Feature
>  Components: Metrics, State Backends, Checkpointing
>Reporter: Seth Wiesman
>Assignee: Seth Wiesman
>Priority: Major
>  Labels: pull-request-available
>
> RocksDB contains a number of metrics at the column family level about current 
> memory usage, open memtables,  etc that would be useful to users wishing 
> greater insight what rocksdb is doing. This work is inspired heavily by the 
> comments on this rocksdb issue thread 
> (https://github.com/facebook/rocksdb/issues/3216#issuecomment-348779233)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] zentol commented on a change in pull request #6814: [FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor

2018-10-16 Thread GitBox
zentol commented on a change in pull request #6814: 
[FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor
URL: https://github.com/apache/flink/pull/6814#discussion_r225452289
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializer.java
 ##
 @@ -49,5 +51,7 @@ StreamOperatorStateContext streamOperatorStateContext(
@Nonnull String operatorClassName,
@Nonnull KeyContext keyContext,
@Nullable TypeSerializer keySerializer,
-   @Nonnull CloseableRegistry streamTaskCloseableRegistry) throws 
Exception;
+   @Nonnull CloseableRegistry streamTaskCloseableRegistry,
+   @Nonnull MetricGroup metricGroup
+   ) throws Exception;
 
 Review comment:
   indentation is off


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10423) Forward RocksDB native metrics to Flink metrics reporter

2018-10-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16651344#comment-16651344
 ] 

ASF GitHub Bot commented on FLINK-10423:


zentol commented on a change in pull request #6814: 
[FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor
URL: https://github.com/apache/flink/pull/6814#discussion_r225452289
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializer.java
 ##
 @@ -49,5 +51,7 @@ StreamOperatorStateContext streamOperatorStateContext(
@Nonnull String operatorClassName,
@Nonnull KeyContext keyContext,
@Nullable TypeSerializer keySerializer,
-   @Nonnull CloseableRegistry streamTaskCloseableRegistry) throws 
Exception;
+   @Nonnull CloseableRegistry streamTaskCloseableRegistry,
+   @Nonnull MetricGroup metricGroup
+   ) throws Exception;
 
 Review comment:
   indentation is off


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Forward RocksDB native metrics to Flink metrics reporter 
> -
>
> Key: FLINK-10423
> URL: https://issues.apache.org/jira/browse/FLINK-10423
> Project: Flink
>  Issue Type: New Feature
>  Components: Metrics, State Backends, Checkpointing
>Reporter: Seth Wiesman
>Assignee: Seth Wiesman
>Priority: Major
>  Labels: pull-request-available
>
> RocksDB contains a number of metrics at the column family level about current 
> memory usage, open memtables,  etc that would be useful to users wishing 
> greater insight what rocksdb is doing. This work is inspired heavily by the 
> comments on this rocksdb issue thread 
> (https://github.com/facebook/rocksdb/issues/3216#issuecomment-348779233)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] zentol commented on a change in pull request #6814: [FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor

2018-10-16 Thread GitBox
zentol commented on a change in pull request #6814: 
[FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor
URL: https://github.com/apache/flink/pull/6814#discussion_r225452792
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImplTest.java
 ##
 @@ -192,7 +196,8 @@ public OperatorStateBackend createOperatorStateBackend(
streamOperator.getClass().getSimpleName(),
streamOperator,
typeSerializer,
-   closeableRegistry);
+   closeableRegistry,
+   
UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup());
 
 Review comment:
   use `new UnregisteredMetricsGroup()` instead. (separate class, more generic 
than the ones from `UnregisteredMetricGroups`. Also applies to other usages.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10423) Forward RocksDB native metrics to Flink metrics reporter

2018-10-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16651346#comment-16651346
 ] 

ASF GitHub Bot commented on FLINK-10423:


zentol commented on a change in pull request #6814: 
[FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor
URL: https://github.com/apache/flink/pull/6814#discussion_r225452792
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImplTest.java
 ##
 @@ -192,7 +196,8 @@ public OperatorStateBackend createOperatorStateBackend(
streamOperator.getClass().getSimpleName(),
streamOperator,
typeSerializer,
-   closeableRegistry);
+   closeableRegistry,
+   
UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup());
 
 Review comment:
   use `new UnregisteredMetricsGroup()` instead. (separate class, more generic 
than the ones from `UnregisteredMetricGroups`. Also applies to other usages.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Forward RocksDB native metrics to Flink metrics reporter 
> -
>
> Key: FLINK-10423
> URL: https://issues.apache.org/jira/browse/FLINK-10423
> Project: Flink
>  Issue Type: New Feature
>  Components: Metrics, State Backends, Checkpointing
>Reporter: Seth Wiesman
>Assignee: Seth Wiesman
>Priority: Major
>  Labels: pull-request-available
>
> RocksDB contains a number of metrics at the column family level about current 
> memory usage, open memtables,  etc that would be useful to users wishing 
> greater insight what rocksdb is doing. This work is inspired heavily by the 
> comments on this rocksdb issue thread 
> (https://github.com/facebook/rocksdb/issues/3216#issuecomment-348779233)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] zentol commented on a change in pull request #6814: [FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor

2018-10-16 Thread GitBox
zentol commented on a change in pull request #6814: 
[FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor
URL: https://github.com/apache/flink/pull/6814#discussion_r225454242
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitor.java
 ##
 @@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.View;
+
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.io.Closeable;
+import java.math.BigInteger;
+
+/**
+ * A monitor which pulls {{@link RocksDB}} native metrics
+ * and forwards them to Flink's metric group. All metrics are
+ * unsigned longs and are reported at the column family level.
+ */
+@Internal
+public class RocksDBNativeMetricMonitor implements Closeable {
+   private static final Logger LOG = 
LoggerFactory.getLogger(RocksDBNativeMetricMonitor.class);
+
+   private final RocksDBNativeMetricOptions options;
+
+   private final MetricGroup metricGroup;
+
+   private final Object lock;
+
+   @GuardedBy("lock")
+   private RocksDB rocksDB;
+
+   RocksDBNativeMetricMonitor(
+   @Nonnull RocksDB rocksDB,
+   @Nonnull RocksDBNativeMetricOptions options,
+   @Nonnull MetricGroup metricGroup
+   ) {
+   this.options = options;
+   this.metricGroup = metricGroup;
+   this.rocksDB = rocksDB;
+
+   this.lock = new Object();
+   }
+
+   /**
+* Register gauges to pull native metrics for the column family.
+* @param columnFamilyName group name for the new gauges
+* @param handle native handle to the column family
+*/
+   void registerColumnFamily(String columnFamilyName, ColumnFamilyHandle 
handle) {
+   MetricGroup group = metricGroup.addGroup(columnFamilyName);
+
+   for (String property : options.getProperties()) {
+   RocksDBNativeMetricView gauge = new 
RocksDBNativeMetricView(handle, property);
+   group.gauge(property, gauge);
+   }
+   }
+
+   /**
+* Updates the value of metricView if the reference is still valid.
+*/
+   private void setProperty(ColumnFamilyHandle handle, String property, 
RocksDBNativeMetricView metricView) {
+   try {
+   synchronized (lock) {
+   if (rocksDB != null) {
+   long value = 
rocksDB.getLongProperty(handle, property);
+   metricView.setValue(value);
+   }
+   }
+   } catch (RocksDBException e) {
+   LOG.warn("Failed to read native metric %s from 
RocksDB", property, e);
 
 Review comment:
   We've had issues in the past where the amount of logging messages where 
drowning out other messages.
   If this exception occurs, and if multiple metrics (or god forbid all of 
them) are enabled, then this could lead to a large amount of logging messages.
   
   I suggest to add a flag to each gauge so that this message is only logged 
once for each of them.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10423) Forward RocksDB native metrics to Flink metrics reporter

2018-10-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16651352#comment-16651352
 ] 

ASF GitHub Bot commented on FLINK-10423:


zentol commented on a change in pull request #6814: 
[FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor
URL: https://github.com/apache/flink/pull/6814#discussion_r225454242
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitor.java
 ##
 @@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.View;
+
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.io.Closeable;
+import java.math.BigInteger;
+
+/**
+ * A monitor which pulls {{@link RocksDB}} native metrics
+ * and forwards them to Flink's metric group. All metrics are
+ * unsigned longs and are reported at the column family level.
+ */
+@Internal
+public class RocksDBNativeMetricMonitor implements Closeable {
+   private static final Logger LOG = 
LoggerFactory.getLogger(RocksDBNativeMetricMonitor.class);
+
+   private final RocksDBNativeMetricOptions options;
+
+   private final MetricGroup metricGroup;
+
+   private final Object lock;
+
+   @GuardedBy("lock")
+   private RocksDB rocksDB;
+
+   RocksDBNativeMetricMonitor(
+   @Nonnull RocksDB rocksDB,
+   @Nonnull RocksDBNativeMetricOptions options,
+   @Nonnull MetricGroup metricGroup
+   ) {
+   this.options = options;
+   this.metricGroup = metricGroup;
+   this.rocksDB = rocksDB;
+
+   this.lock = new Object();
+   }
+
+   /**
+* Register gauges to pull native metrics for the column family.
+* @param columnFamilyName group name for the new gauges
+* @param handle native handle to the column family
+*/
+   void registerColumnFamily(String columnFamilyName, ColumnFamilyHandle 
handle) {
+   MetricGroup group = metricGroup.addGroup(columnFamilyName);
+
+   for (String property : options.getProperties()) {
+   RocksDBNativeMetricView gauge = new 
RocksDBNativeMetricView(handle, property);
+   group.gauge(property, gauge);
+   }
+   }
+
+   /**
+* Updates the value of metricView if the reference is still valid.
+*/
+   private void setProperty(ColumnFamilyHandle handle, String property, 
RocksDBNativeMetricView metricView) {
+   try {
+   synchronized (lock) {
+   if (rocksDB != null) {
+   long value = 
rocksDB.getLongProperty(handle, property);
+   metricView.setValue(value);
+   }
+   }
+   } catch (RocksDBException e) {
+   LOG.warn("Failed to read native metric %s from 
RocksDB", property, e);
 
 Review comment:
   We've had issues in the past where the amount of logging messages where 
drowning out other messages.
   If this exception occurs, and if multiple metrics (or god forbid all of 
them) are enabled, then this could lead to a large amount of logging messages.
   
   I suggest to add a flag to each gauge so that this message is only logged 
once for each of them.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@i

[GitHub] zentol commented on a change in pull request #6814: [FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor

2018-10-16 Thread GitBox
zentol commented on a change in pull request #6814: 
[FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor
URL: https://github.com/apache/flink/pull/6814#discussion_r225454449
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitor.java
 ##
 @@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.View;
+
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.io.Closeable;
+import java.math.BigInteger;
+
+/**
+ * A monitor which pulls {{@link RocksDB}} native metrics
+ * and forwards them to Flink's metric group. All metrics are
+ * unsigned longs and are reported at the column family level.
+ */
+@Internal
+public class RocksDBNativeMetricMonitor implements Closeable {
+   private static final Logger LOG = 
LoggerFactory.getLogger(RocksDBNativeMetricMonitor.class);
+
+   private final RocksDBNativeMetricOptions options;
+
+   private final MetricGroup metricGroup;
+
+   private final Object lock;
+
+   @GuardedBy("lock")
+   private RocksDB rocksDB;
+
+   RocksDBNativeMetricMonitor(
+   @Nonnull RocksDB rocksDB,
+   @Nonnull RocksDBNativeMetricOptions options,
+   @Nonnull MetricGroup metricGroup
+   ) {
+   this.options = options;
+   this.metricGroup = metricGroup;
+   this.rocksDB = rocksDB;
+
+   this.lock = new Object();
+   }
+
+   /**
+* Register gauges to pull native metrics for the column family.
+* @param columnFamilyName group name for the new gauges
+* @param handle native handle to the column family
+*/
+   void registerColumnFamily(String columnFamilyName, ColumnFamilyHandle 
handle) {
+   MetricGroup group = metricGroup.addGroup(columnFamilyName);
+
+   for (String property : options.getProperties()) {
+   RocksDBNativeMetricView gauge = new 
RocksDBNativeMetricView(handle, property);
+   group.gauge(property, gauge);
+   }
+   }
+
+   /**
+* Updates the value of metricView if the reference is still valid.
+*/
+   private void setProperty(ColumnFamilyHandle handle, String property, 
RocksDBNativeMetricView metricView) {
+   try {
+   synchronized (lock) {
+   if (rocksDB != null) {
+   long value = 
rocksDB.getLongProperty(handle, property);
+   metricView.setValue(value);
+   }
+   }
+   } catch (RocksDBException e) {
+   LOG.warn("Failed to read native metric %s from 
RocksDB", property, e);
+   }
+   }
+
+   @Override
+   public void close() {
+   synchronized (lock) {
+   rocksDB = null;
+   }
+   }
+
+   /**
+* A gauge which periodically pull a RocksDB native metric
 
 Review comment:
   pull -> pulls


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zentol commented on a change in pull request #6814: [FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor

2018-10-16 Thread GitBox
zentol commented on a change in pull request #6814: 
[FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor
URL: https://github.com/apache/flink/pull/6814#discussion_r225454760
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitorTest.java
 ##
 @@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
+import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.metrics.scope.ScopeFormats;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.rocksdb.ColumnFamilyHandle;
+
+import javax.annotation.Nullable;
+
+import java.math.BigInteger;
+import java.util.ArrayList;
+
+/**
+ * validate native metric monitor.
+ */
+public class RocksDBNativeMetricMonitorTest {
+
+   private static final String OPERATOR_NAME = "dummy";
+
+   private static final String COLUMN_FAMILY_NAME = "column-family";
+
+   private static final BigInteger ZERO = new BigInteger(1, new byte[]{0, 
0, 0, 0});
+
+   @Test
+   public void testMetricMonitorLifecycle() throws Throwable {
+   //We use a local variable here to manually control the 
life-cycle.
+   // This allows us to verify that metrics do not try to access
+   // RocksDB after the monitor was closed.
+   RocksDBResource rocksDBResource = new RocksDBResource();
+   rocksDBResource.before();
+
+   SimpleMetricRegistry registry = new SimpleMetricRegistry();
+   OperatorMetricGroup group = new OperatorMetricGroup(
+   registry,
+   
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup(),
+   new OperatorID(),
+   OPERATOR_NAME
+   );
+
+   RocksDBNativeMetricOptions options = new 
RocksDBNativeMetricOptions();
+   // always returns a non-zero
+   // value since empty memtables
+   // have overhead.
+   options.enableSizeAllMemTables();
+
+   RocksDBNativeMetricMonitor monitor = new 
RocksDBNativeMetricMonitor(
+   rocksDBResource.getRocksDB(),
+   options,
+   group
+   );
+
+   ColumnFamilyHandle handle = 
rocksDBResource.createNewColumnFamily(COLUMN_FAMILY_NAME);
+   monitor.registerColumnFamily(COLUMN_FAMILY_NAME, handle);
+
+   Assert.assertEquals("Failed to register metrics for column 
family", 1, registry.metrics.size());
+
+   RocksDBNativeMetricMonitor.RocksDBNativeMetricView view = 
registry.metrics.get(0);
+
+   view.update();
+
+   Assert.assertNotEquals("Failed to pull metric from RocksDB", 
ZERO, view.getValue());
+
+   view.setValue(0L);
+
+   //After the monitor is closed no metric should be accessing 
RocksDB anymore.
+   //If they do, then this test will likely fail with a 
segmentation fault.
+   monitor.close();
+
+   rocksDBResource.after();
+
+   view.update();
+
+   Assert.assertEquals("Failed to release RocksDB reference", 
ZERO, view.getValue());
+   }
+
+   @Test
+   public void testReturnsUnsigned() throws Throwable {
+   SimpleMetricRegistry registry = new SimpleMetricRegistry();
+   OperatorMetricGroup group = new OperatorMetricGroup(
 
 Review comment:
   this can be simplified to `MetricGroup group = new 
UnregisteredMetricGroups()`.


This is an automated message from the Apache 

[GitHub] zentol commented on a change in pull request #6814: [FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor

2018-10-16 Thread GitBox
zentol commented on a change in pull request #6814: 
[FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor
URL: https://github.com/apache/flink/pull/6814#discussion_r225454717
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitorTest.java
 ##
 @@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
+import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.metrics.scope.ScopeFormats;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.rocksdb.ColumnFamilyHandle;
+
+import javax.annotation.Nullable;
+
+import java.math.BigInteger;
+import java.util.ArrayList;
+
+/**
+ * validate native metric monitor.
+ */
+public class RocksDBNativeMetricMonitorTest {
+
+   private static final String OPERATOR_NAME = "dummy";
+
+   private static final String COLUMN_FAMILY_NAME = "column-family";
+
+   private static final BigInteger ZERO = new BigInteger(1, new byte[]{0, 
0, 0, 0});
+
+   @Test
+   public void testMetricMonitorLifecycle() throws Throwable {
+   //We use a local variable here to manually control the 
life-cycle.
+   // This allows us to verify that metrics do not try to access
+   // RocksDB after the monitor was closed.
+   RocksDBResource rocksDBResource = new RocksDBResource();
+   rocksDBResource.before();
+
+   SimpleMetricRegistry registry = new SimpleMetricRegistry();
+   OperatorMetricGroup group = new OperatorMetricGroup(
 
 Review comment:
   this can be simplified to `MetricGroup group = new 
UnregisteredMetricGroups()`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10423) Forward RocksDB native metrics to Flink metrics reporter

2018-10-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16651356#comment-16651356
 ] 

ASF GitHub Bot commented on FLINK-10423:


zentol commented on a change in pull request #6814: 
[FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor
URL: https://github.com/apache/flink/pull/6814#discussion_r225454760
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitorTest.java
 ##
 @@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
+import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.metrics.scope.ScopeFormats;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.rocksdb.ColumnFamilyHandle;
+
+import javax.annotation.Nullable;
+
+import java.math.BigInteger;
+import java.util.ArrayList;
+
+/**
+ * validate native metric monitor.
+ */
+public class RocksDBNativeMetricMonitorTest {
+
+   private static final String OPERATOR_NAME = "dummy";
+
+   private static final String COLUMN_FAMILY_NAME = "column-family";
+
+   private static final BigInteger ZERO = new BigInteger(1, new byte[]{0, 
0, 0, 0});
+
+   @Test
+   public void testMetricMonitorLifecycle() throws Throwable {
+   //We use a local variable here to manually control the 
life-cycle.
+   // This allows us to verify that metrics do not try to access
+   // RocksDB after the monitor was closed.
+   RocksDBResource rocksDBResource = new RocksDBResource();
+   rocksDBResource.before();
+
+   SimpleMetricRegistry registry = new SimpleMetricRegistry();
+   OperatorMetricGroup group = new OperatorMetricGroup(
+   registry,
+   
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup(),
+   new OperatorID(),
+   OPERATOR_NAME
+   );
+
+   RocksDBNativeMetricOptions options = new 
RocksDBNativeMetricOptions();
+   // always returns a non-zero
+   // value since empty memtables
+   // have overhead.
+   options.enableSizeAllMemTables();
+
+   RocksDBNativeMetricMonitor monitor = new 
RocksDBNativeMetricMonitor(
+   rocksDBResource.getRocksDB(),
+   options,
+   group
+   );
+
+   ColumnFamilyHandle handle = 
rocksDBResource.createNewColumnFamily(COLUMN_FAMILY_NAME);
+   monitor.registerColumnFamily(COLUMN_FAMILY_NAME, handle);
+
+   Assert.assertEquals("Failed to register metrics for column 
family", 1, registry.metrics.size());
+
+   RocksDBNativeMetricMonitor.RocksDBNativeMetricView view = 
registry.metrics.get(0);
+
+   view.update();
+
+   Assert.assertNotEquals("Failed to pull metric from RocksDB", 
ZERO, view.getValue());
+
+   view.setValue(0L);
+
+   //After the monitor is closed no metric should be accessing 
RocksDB anymore.
+   //If they do, then this test will likely fail with a 
segmentation fault.
+   monitor.close();
+
+   rocksDBResource.after();
+
+   view.update();
+
+   Assert.assertEquals("Failed to release RocksDB reference", 
ZERO, view.getValue());
+   }
+
+   @Test
+   public void testReturnsUnsigned() throws Throwable {
+   SimpleMetricRegistry registry = new SimpleMetricRegistry();
+   Operato

[jira] [Commented] (FLINK-10423) Forward RocksDB native metrics to Flink metrics reporter

2018-10-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16651355#comment-16651355
 ] 

ASF GitHub Bot commented on FLINK-10423:


zentol commented on a change in pull request #6814: 
[FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor
URL: https://github.com/apache/flink/pull/6814#discussion_r225454717
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitorTest.java
 ##
 @@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
+import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.metrics.scope.ScopeFormats;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.rocksdb.ColumnFamilyHandle;
+
+import javax.annotation.Nullable;
+
+import java.math.BigInteger;
+import java.util.ArrayList;
+
+/**
+ * validate native metric monitor.
+ */
+public class RocksDBNativeMetricMonitorTest {
+
+   private static final String OPERATOR_NAME = "dummy";
+
+   private static final String COLUMN_FAMILY_NAME = "column-family";
+
+   private static final BigInteger ZERO = new BigInteger(1, new byte[]{0, 
0, 0, 0});
+
+   @Test
+   public void testMetricMonitorLifecycle() throws Throwable {
+   //We use a local variable here to manually control the 
life-cycle.
+   // This allows us to verify that metrics do not try to access
+   // RocksDB after the monitor was closed.
+   RocksDBResource rocksDBResource = new RocksDBResource();
+   rocksDBResource.before();
+
+   SimpleMetricRegistry registry = new SimpleMetricRegistry();
+   OperatorMetricGroup group = new OperatorMetricGroup(
 
 Review comment:
   this can be simplified to `MetricGroup group = new 
UnregisteredMetricGroups()`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Forward RocksDB native metrics to Flink metrics reporter 
> -
>
> Key: FLINK-10423
> URL: https://issues.apache.org/jira/browse/FLINK-10423
> Project: Flink
>  Issue Type: New Feature
>  Components: Metrics, State Backends, Checkpointing
>Reporter: Seth Wiesman
>Assignee: Seth Wiesman
>Priority: Major
>  Labels: pull-request-available
>
> RocksDB contains a number of metrics at the column family level about current 
> memory usage, open memtables,  etc that would be useful to users wishing 
> greater insight what rocksdb is doing. This work is inspired heavily by the 
> comments on this rocksdb issue thread 
> (https://github.com/facebook/rocksdb/issues/3216#issuecomment-348779233)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10423) Forward RocksDB native metrics to Flink metrics reporter

2018-10-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16651354#comment-16651354
 ] 

ASF GitHub Bot commented on FLINK-10423:


zentol commented on a change in pull request #6814: 
[FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor
URL: https://github.com/apache/flink/pull/6814#discussion_r225454449
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitor.java
 ##
 @@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.View;
+
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.io.Closeable;
+import java.math.BigInteger;
+
+/**
+ * A monitor which pulls {{@link RocksDB}} native metrics
+ * and forwards them to Flink's metric group. All metrics are
+ * unsigned longs and are reported at the column family level.
+ */
+@Internal
+public class RocksDBNativeMetricMonitor implements Closeable {
+   private static final Logger LOG = 
LoggerFactory.getLogger(RocksDBNativeMetricMonitor.class);
+
+   private final RocksDBNativeMetricOptions options;
+
+   private final MetricGroup metricGroup;
+
+   private final Object lock;
+
+   @GuardedBy("lock")
+   private RocksDB rocksDB;
+
+   RocksDBNativeMetricMonitor(
+   @Nonnull RocksDB rocksDB,
+   @Nonnull RocksDBNativeMetricOptions options,
+   @Nonnull MetricGroup metricGroup
+   ) {
+   this.options = options;
+   this.metricGroup = metricGroup;
+   this.rocksDB = rocksDB;
+
+   this.lock = new Object();
+   }
+
+   /**
+* Register gauges to pull native metrics for the column family.
+* @param columnFamilyName group name for the new gauges
+* @param handle native handle to the column family
+*/
+   void registerColumnFamily(String columnFamilyName, ColumnFamilyHandle 
handle) {
+   MetricGroup group = metricGroup.addGroup(columnFamilyName);
+
+   for (String property : options.getProperties()) {
+   RocksDBNativeMetricView gauge = new 
RocksDBNativeMetricView(handle, property);
+   group.gauge(property, gauge);
+   }
+   }
+
+   /**
+* Updates the value of metricView if the reference is still valid.
+*/
+   private void setProperty(ColumnFamilyHandle handle, String property, 
RocksDBNativeMetricView metricView) {
+   try {
+   synchronized (lock) {
+   if (rocksDB != null) {
+   long value = 
rocksDB.getLongProperty(handle, property);
+   metricView.setValue(value);
+   }
+   }
+   } catch (RocksDBException e) {
+   LOG.warn("Failed to read native metric %s from 
RocksDB", property, e);
+   }
+   }
+
+   @Override
+   public void close() {
+   synchronized (lock) {
+   rocksDB = null;
+   }
+   }
+
+   /**
+* A gauge which periodically pull a RocksDB native metric
 
 Review comment:
   pull -> pulls


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Forward RocksDB native metrics to Flink metrics reporter 
> ---

[GitHub] zentol commented on a change in pull request #6814: [FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor

2018-10-16 Thread GitBox
zentol commented on a change in pull request #6814: 
[FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor
URL: https://github.com/apache/flink/pull/6814#discussion_r225454760
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitorTest.java
 ##
 @@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
+import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.metrics.scope.ScopeFormats;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.rocksdb.ColumnFamilyHandle;
+
+import javax.annotation.Nullable;
+
+import java.math.BigInteger;
+import java.util.ArrayList;
+
+/**
+ * validate native metric monitor.
+ */
+public class RocksDBNativeMetricMonitorTest {
+
+   private static final String OPERATOR_NAME = "dummy";
+
+   private static final String COLUMN_FAMILY_NAME = "column-family";
+
+   private static final BigInteger ZERO = new BigInteger(1, new byte[]{0, 
0, 0, 0});
+
+   @Test
+   public void testMetricMonitorLifecycle() throws Throwable {
+   //We use a local variable here to manually control the 
life-cycle.
+   // This allows us to verify that metrics do not try to access
+   // RocksDB after the monitor was closed.
+   RocksDBResource rocksDBResource = new RocksDBResource();
+   rocksDBResource.before();
+
+   SimpleMetricRegistry registry = new SimpleMetricRegistry();
+   OperatorMetricGroup group = new OperatorMetricGroup(
+   registry,
+   
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup(),
+   new OperatorID(),
+   OPERATOR_NAME
+   );
+
+   RocksDBNativeMetricOptions options = new 
RocksDBNativeMetricOptions();
+   // always returns a non-zero
+   // value since empty memtables
+   // have overhead.
+   options.enableSizeAllMemTables();
+
+   RocksDBNativeMetricMonitor monitor = new 
RocksDBNativeMetricMonitor(
+   rocksDBResource.getRocksDB(),
+   options,
+   group
+   );
+
+   ColumnFamilyHandle handle = 
rocksDBResource.createNewColumnFamily(COLUMN_FAMILY_NAME);
+   monitor.registerColumnFamily(COLUMN_FAMILY_NAME, handle);
+
+   Assert.assertEquals("Failed to register metrics for column 
family", 1, registry.metrics.size());
+
+   RocksDBNativeMetricMonitor.RocksDBNativeMetricView view = 
registry.metrics.get(0);
+
+   view.update();
+
+   Assert.assertNotEquals("Failed to pull metric from RocksDB", 
ZERO, view.getValue());
+
+   view.setValue(0L);
+
+   //After the monitor is closed no metric should be accessing 
RocksDB anymore.
+   //If they do, then this test will likely fail with a 
segmentation fault.
+   monitor.close();
+
+   rocksDBResource.after();
+
+   view.update();
+
+   Assert.assertEquals("Failed to release RocksDB reference", 
ZERO, view.getValue());
+   }
+
+   @Test
+   public void testReturnsUnsigned() throws Throwable {
+   SimpleMetricRegistry registry = new SimpleMetricRegistry();
+   OperatorMetricGroup group = new OperatorMetricGroup(
 
 Review comment:
   this can be simplified to `MetricGroup group = new 
UnregisteredMetricGroups()`.


This is an automated message from the Apache 

[GitHub] zentol commented on a change in pull request #6814: [FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor

2018-10-16 Thread GitBox
zentol commented on a change in pull request #6814: 
[FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor
URL: https://github.com/apache/flink/pull/6814#discussion_r225454717
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitorTest.java
 ##
 @@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
+import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.metrics.scope.ScopeFormats;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.rocksdb.ColumnFamilyHandle;
+
+import javax.annotation.Nullable;
+
+import java.math.BigInteger;
+import java.util.ArrayList;
+
+/**
+ * validate native metric monitor.
+ */
+public class RocksDBNativeMetricMonitorTest {
+
+   private static final String OPERATOR_NAME = "dummy";
+
+   private static final String COLUMN_FAMILY_NAME = "column-family";
+
+   private static final BigInteger ZERO = new BigInteger(1, new byte[]{0, 
0, 0, 0});
+
+   @Test
+   public void testMetricMonitorLifecycle() throws Throwable {
+   //We use a local variable here to manually control the 
life-cycle.
+   // This allows us to verify that metrics do not try to access
+   // RocksDB after the monitor was closed.
+   RocksDBResource rocksDBResource = new RocksDBResource();
+   rocksDBResource.before();
+
+   SimpleMetricRegistry registry = new SimpleMetricRegistry();
+   OperatorMetricGroup group = new OperatorMetricGroup(
 
 Review comment:
   this can be simplified to `MetricGroup group = new 
UnregisteredMetricGroups()`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10423) Forward RocksDB native metrics to Flink metrics reporter

2018-10-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16651358#comment-16651358
 ] 

ASF GitHub Bot commented on FLINK-10423:


zentol commented on a change in pull request #6814: 
[FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor
URL: https://github.com/apache/flink/pull/6814#discussion_r225454760
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitorTest.java
 ##
 @@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
+import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.metrics.scope.ScopeFormats;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.rocksdb.ColumnFamilyHandle;
+
+import javax.annotation.Nullable;
+
+import java.math.BigInteger;
+import java.util.ArrayList;
+
+/**
+ * validate native metric monitor.
+ */
+public class RocksDBNativeMetricMonitorTest {
+
+   private static final String OPERATOR_NAME = "dummy";
+
+   private static final String COLUMN_FAMILY_NAME = "column-family";
+
+   private static final BigInteger ZERO = new BigInteger(1, new byte[]{0, 
0, 0, 0});
+
+   @Test
+   public void testMetricMonitorLifecycle() throws Throwable {
+   //We use a local variable here to manually control the 
life-cycle.
+   // This allows us to verify that metrics do not try to access
+   // RocksDB after the monitor was closed.
+   RocksDBResource rocksDBResource = new RocksDBResource();
+   rocksDBResource.before();
+
+   SimpleMetricRegistry registry = new SimpleMetricRegistry();
+   OperatorMetricGroup group = new OperatorMetricGroup(
+   registry,
+   
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup(),
+   new OperatorID(),
+   OPERATOR_NAME
+   );
+
+   RocksDBNativeMetricOptions options = new 
RocksDBNativeMetricOptions();
+   // always returns a non-zero
+   // value since empty memtables
+   // have overhead.
+   options.enableSizeAllMemTables();
+
+   RocksDBNativeMetricMonitor monitor = new 
RocksDBNativeMetricMonitor(
+   rocksDBResource.getRocksDB(),
+   options,
+   group
+   );
+
+   ColumnFamilyHandle handle = 
rocksDBResource.createNewColumnFamily(COLUMN_FAMILY_NAME);
+   monitor.registerColumnFamily(COLUMN_FAMILY_NAME, handle);
+
+   Assert.assertEquals("Failed to register metrics for column 
family", 1, registry.metrics.size());
+
+   RocksDBNativeMetricMonitor.RocksDBNativeMetricView view = 
registry.metrics.get(0);
+
+   view.update();
+
+   Assert.assertNotEquals("Failed to pull metric from RocksDB", 
ZERO, view.getValue());
+
+   view.setValue(0L);
+
+   //After the monitor is closed no metric should be accessing 
RocksDB anymore.
+   //If they do, then this test will likely fail with a 
segmentation fault.
+   monitor.close();
+
+   rocksDBResource.after();
+
+   view.update();
+
+   Assert.assertEquals("Failed to release RocksDB reference", 
ZERO, view.getValue());
+   }
+
+   @Test
+   public void testReturnsUnsigned() throws Throwable {
+   SimpleMetricRegistry registry = new SimpleMetricRegistry();
+   Operato

[jira] [Commented] (FLINK-10423) Forward RocksDB native metrics to Flink metrics reporter

2018-10-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16651359#comment-16651359
 ] 

ASF GitHub Bot commented on FLINK-10423:


zentol commented on a change in pull request #6814: 
[FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor
URL: https://github.com/apache/flink/pull/6814#discussion_r225454717
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitorTest.java
 ##
 @@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
+import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.metrics.scope.ScopeFormats;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.rocksdb.ColumnFamilyHandle;
+
+import javax.annotation.Nullable;
+
+import java.math.BigInteger;
+import java.util.ArrayList;
+
+/**
+ * validate native metric monitor.
+ */
+public class RocksDBNativeMetricMonitorTest {
+
+   private static final String OPERATOR_NAME = "dummy";
+
+   private static final String COLUMN_FAMILY_NAME = "column-family";
+
+   private static final BigInteger ZERO = new BigInteger(1, new byte[]{0, 
0, 0, 0});
+
+   @Test
+   public void testMetricMonitorLifecycle() throws Throwable {
+   //We use a local variable here to manually control the 
life-cycle.
+   // This allows us to verify that metrics do not try to access
+   // RocksDB after the monitor was closed.
+   RocksDBResource rocksDBResource = new RocksDBResource();
+   rocksDBResource.before();
+
+   SimpleMetricRegistry registry = new SimpleMetricRegistry();
+   OperatorMetricGroup group = new OperatorMetricGroup(
 
 Review comment:
   this can be simplified to `MetricGroup group = new 
UnregisteredMetricGroups()`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Forward RocksDB native metrics to Flink metrics reporter 
> -
>
> Key: FLINK-10423
> URL: https://issues.apache.org/jira/browse/FLINK-10423
> Project: Flink
>  Issue Type: New Feature
>  Components: Metrics, State Backends, Checkpointing
>Reporter: Seth Wiesman
>Assignee: Seth Wiesman
>Priority: Major
>  Labels: pull-request-available
>
> RocksDB contains a number of metrics at the column family level about current 
> memory usage, open memtables,  etc that would be useful to users wishing 
> greater insight what rocksdb is doing. This work is inspired heavily by the 
> comments on this rocksdb issue thread 
> (https://github.com/facebook/rocksdb/issues/3216#issuecomment-348779233)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] zentol commented on a change in pull request #6814: [FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor

2018-10-16 Thread GitBox
zentol commented on a change in pull request #6814: 
[FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor
URL: https://github.com/apache/flink/pull/6814#discussion_r225455353
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitorTest.java
 ##
 @@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
+import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.metrics.scope.ScopeFormats;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.rocksdb.ColumnFamilyHandle;
+
+import javax.annotation.Nullable;
+
+import java.math.BigInteger;
+import java.util.ArrayList;
+
+/**
+ * validate native metric monitor.
+ */
+public class RocksDBNativeMetricMonitorTest {
+
+   private static final String OPERATOR_NAME = "dummy";
+
+   private static final String COLUMN_FAMILY_NAME = "column-family";
+
+   private static final BigInteger ZERO = new BigInteger(1, new byte[]{0, 
0, 0, 0});
+
+   @Test
+   public void testMetricMonitorLifecycle() throws Throwable {
+   //We use a local variable here to manually control the 
life-cycle.
+   // This allows us to verify that metrics do not try to access
+   // RocksDB after the monitor was closed.
+   RocksDBResource rocksDBResource = new RocksDBResource();
+   rocksDBResource.before();
+
+   SimpleMetricRegistry registry = new SimpleMetricRegistry();
+   OperatorMetricGroup group = new OperatorMetricGroup(
 
 Review comment:
   could be simplified by using a `GenericMetricGroup`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10423) Forward RocksDB native metrics to Flink metrics reporter

2018-10-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16651363#comment-16651363
 ] 

ASF GitHub Bot commented on FLINK-10423:


zentol commented on a change in pull request #6814: 
[FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor
URL: https://github.com/apache/flink/pull/6814#discussion_r225455353
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitorTest.java
 ##
 @@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
+import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.metrics.scope.ScopeFormats;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.rocksdb.ColumnFamilyHandle;
+
+import javax.annotation.Nullable;
+
+import java.math.BigInteger;
+import java.util.ArrayList;
+
+/**
+ * validate native metric monitor.
+ */
+public class RocksDBNativeMetricMonitorTest {
+
+   private static final String OPERATOR_NAME = "dummy";
+
+   private static final String COLUMN_FAMILY_NAME = "column-family";
+
+   private static final BigInteger ZERO = new BigInteger(1, new byte[]{0, 
0, 0, 0});
+
+   @Test
+   public void testMetricMonitorLifecycle() throws Throwable {
+   //We use a local variable here to manually control the 
life-cycle.
+   // This allows us to verify that metrics do not try to access
+   // RocksDB after the monitor was closed.
+   RocksDBResource rocksDBResource = new RocksDBResource();
+   rocksDBResource.before();
+
+   SimpleMetricRegistry registry = new SimpleMetricRegistry();
+   OperatorMetricGroup group = new OperatorMetricGroup(
 
 Review comment:
   could be simplified by using a `GenericMetricGroup`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Forward RocksDB native metrics to Flink metrics reporter 
> -
>
> Key: FLINK-10423
> URL: https://issues.apache.org/jira/browse/FLINK-10423
> Project: Flink
>  Issue Type: New Feature
>  Components: Metrics, State Backends, Checkpointing
>Reporter: Seth Wiesman
>Assignee: Seth Wiesman
>Priority: Major
>  Labels: pull-request-available
>
> RocksDB contains a number of metrics at the column family level about current 
> memory usage, open memtables,  etc that would be useful to users wishing 
> greater insight what rocksdb is doing. This work is inspired heavily by the 
> comments on this rocksdb issue thread 
> (https://github.com/facebook/rocksdb/issues/3216#issuecomment-348779233)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] zentol commented on a change in pull request #6814: [FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor

2018-10-16 Thread GitBox
zentol commented on a change in pull request #6814: 
[FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor
URL: https://github.com/apache/flink/pull/6814#discussion_r225455306
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitorTest.java
 ##
 @@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
+import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.metrics.scope.ScopeFormats;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.rocksdb.ColumnFamilyHandle;
+
+import javax.annotation.Nullable;
+
+import java.math.BigInteger;
+import java.util.ArrayList;
+
+/**
+ * validate native metric monitor.
+ */
+public class RocksDBNativeMetricMonitorTest {
+
+   private static final String OPERATOR_NAME = "dummy";
+
+   private static final String COLUMN_FAMILY_NAME = "column-family";
+
+   private static final BigInteger ZERO = new BigInteger(1, new byte[]{0, 
0, 0, 0});
+
+   @Test
+   public void testMetricMonitorLifecycle() throws Throwable {
+   //We use a local variable here to manually control the 
life-cycle.
+   // This allows us to verify that metrics do not try to access
+   // RocksDB after the monitor was closed.
+   RocksDBResource rocksDBResource = new RocksDBResource();
+   rocksDBResource.before();
+
+   SimpleMetricRegistry registry = new SimpleMetricRegistry();
+   OperatorMetricGroup group = new OperatorMetricGroup(
+   registry,
+   
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup(),
+   new OperatorID(),
+   OPERATOR_NAME
+   );
+
+   RocksDBNativeMetricOptions options = new 
RocksDBNativeMetricOptions();
+   // always returns a non-zero
+   // value since empty memtables
+   // have overhead.
+   options.enableSizeAllMemTables();
+
+   RocksDBNativeMetricMonitor monitor = new 
RocksDBNativeMetricMonitor(
+   rocksDBResource.getRocksDB(),
+   options,
+   group
+   );
+
+   ColumnFamilyHandle handle = 
rocksDBResource.createNewColumnFamily(COLUMN_FAMILY_NAME);
+   monitor.registerColumnFamily(COLUMN_FAMILY_NAME, handle);
+
+   Assert.assertEquals("Failed to register metrics for column 
family", 1, registry.metrics.size());
+
+   RocksDBNativeMetricMonitor.RocksDBNativeMetricView view = 
registry.metrics.get(0);
+
+   view.update();
+
+   Assert.assertNotEquals("Failed to pull metric from RocksDB", 
ZERO, view.getValue());
+
+   view.setValue(0L);
+
+   //After the monitor is closed no metric should be accessing 
RocksDB anymore.
+   //If they do, then this test will likely fail with a 
segmentation fault.
+   monitor.close();
+
+   rocksDBResource.after();
+
+   view.update();
+
+   Assert.assertEquals("Failed to release RocksDB reference", 
ZERO, view.getValue());
+   }
+
+   @Test
+   public void testReturnsUnsigned() throws Throwable {
+   SimpleMetricRegistry registry = new SimpleMetricRegistry();
+   OperatorMetricGroup group = new OperatorMetricGroup(
 
 Review comment:
   could be simplified by using a `GenericMetricGroup`


This is an automated message from the Apache Git Service.
To respond to th

[jira] [Commented] (FLINK-10423) Forward RocksDB native metrics to Flink metrics reporter

2018-10-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16651361#comment-16651361
 ] 

ASF GitHub Bot commented on FLINK-10423:


zentol commented on a change in pull request #6814: 
[FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor
URL: https://github.com/apache/flink/pull/6814#discussion_r225455306
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitorTest.java
 ##
 @@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
+import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.metrics.scope.ScopeFormats;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.rocksdb.ColumnFamilyHandle;
+
+import javax.annotation.Nullable;
+
+import java.math.BigInteger;
+import java.util.ArrayList;
+
+/**
+ * validate native metric monitor.
+ */
+public class RocksDBNativeMetricMonitorTest {
+
+   private static final String OPERATOR_NAME = "dummy";
+
+   private static final String COLUMN_FAMILY_NAME = "column-family";
+
+   private static final BigInteger ZERO = new BigInteger(1, new byte[]{0, 
0, 0, 0});
+
+   @Test
+   public void testMetricMonitorLifecycle() throws Throwable {
+   //We use a local variable here to manually control the 
life-cycle.
+   // This allows us to verify that metrics do not try to access
+   // RocksDB after the monitor was closed.
+   RocksDBResource rocksDBResource = new RocksDBResource();
+   rocksDBResource.before();
+
+   SimpleMetricRegistry registry = new SimpleMetricRegistry();
+   OperatorMetricGroup group = new OperatorMetricGroup(
+   registry,
+   
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup(),
+   new OperatorID(),
+   OPERATOR_NAME
+   );
+
+   RocksDBNativeMetricOptions options = new 
RocksDBNativeMetricOptions();
+   // always returns a non-zero
+   // value since empty memtables
+   // have overhead.
+   options.enableSizeAllMemTables();
+
+   RocksDBNativeMetricMonitor monitor = new 
RocksDBNativeMetricMonitor(
+   rocksDBResource.getRocksDB(),
+   options,
+   group
+   );
+
+   ColumnFamilyHandle handle = 
rocksDBResource.createNewColumnFamily(COLUMN_FAMILY_NAME);
+   monitor.registerColumnFamily(COLUMN_FAMILY_NAME, handle);
+
+   Assert.assertEquals("Failed to register metrics for column 
family", 1, registry.metrics.size());
+
+   RocksDBNativeMetricMonitor.RocksDBNativeMetricView view = 
registry.metrics.get(0);
+
+   view.update();
+
+   Assert.assertNotEquals("Failed to pull metric from RocksDB", 
ZERO, view.getValue());
+
+   view.setValue(0L);
+
+   //After the monitor is closed no metric should be accessing 
RocksDB anymore.
+   //If they do, then this test will likely fail with a 
segmentation fault.
+   monitor.close();
+
+   rocksDBResource.after();
+
+   view.update();
+
+   Assert.assertEquals("Failed to release RocksDB reference", 
ZERO, view.getValue());
+   }
+
+   @Test
+   public void testReturnsUnsigned() throws Throwable {
+   SimpleMetricRegistry registry = new SimpleMetricRegistry();
+   Operato

[GitHub] yanghua commented on a change in pull request #6850: [FLINK-10252] Handle oversized metric messges

2018-10-16 Thread GitBox
yanghua commented on a change in pull request #6850: [FLINK-10252] Handle 
oversized metric messges
URL: https://github.com/apache/flink/pull/6850#discussion_r225457884
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java
 ##
 @@ -109,7 +122,17 @@ public void onReceive(Object message) {
}
} else if (message instanceof CreateDump) {

MetricDumpSerialization.MetricSerializationResult dump = 
serializer.serialize(counters, gauges, histograms, meters);
-   getSender().tell(dump, getSelf());
+
+   int realMsgSize = dump.serializedMetrics.length;
+
+   if (realMsgSize > maximumFramesize) {
+   String overSizeErrorMsg = "The metric 
dump message size : " + realMsgSize
+   + " exceeds the maximum akka 
framesize : " + maximumFramesize + ".";
+   LOG.error(overSizeErrorMsg);
+   getSender().tell(new Status.Failure(new 
IOException(overSizeErrorMsg)), getSelf());
 
 Review comment:
   If we pass incomplete data, it will cause the receiver to parse failure, 
which seems to cause users trouble?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10252) Handle oversized metric messges

2018-10-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16651369#comment-16651369
 ] 

ASF GitHub Bot commented on FLINK-10252:


yanghua commented on a change in pull request #6850: [FLINK-10252] Handle 
oversized metric messges
URL: https://github.com/apache/flink/pull/6850#discussion_r225457884
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java
 ##
 @@ -109,7 +122,17 @@ public void onReceive(Object message) {
}
} else if (message instanceof CreateDump) {

MetricDumpSerialization.MetricSerializationResult dump = 
serializer.serialize(counters, gauges, histograms, meters);
-   getSender().tell(dump, getSelf());
+
+   int realMsgSize = dump.serializedMetrics.length;
+
+   if (realMsgSize > maximumFramesize) {
+   String overSizeErrorMsg = "The metric 
dump message size : " + realMsgSize
+   + " exceeds the maximum akka 
framesize : " + maximumFramesize + ".";
+   LOG.error(overSizeErrorMsg);
+   getSender().tell(new Status.Failure(new 
IOException(overSizeErrorMsg)), getSelf());
 
 Review comment:
   If we pass incomplete data, it will cause the receiver to parse failure, 
which seems to cause users trouble?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Handle oversized metric messges
> ---
>
> Key: FLINK-10252
> URL: https://issues.apache.org/jira/browse/FLINK-10252
> Project: Flink
>  Issue Type: Sub-task
>  Components: Metrics
>Affects Versions: 1.5.3, 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Assignee: vinoyang
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.7.0, 1.6.2, 1.5.5
>
>
> Since the {{MetricQueryService}} is implemented as an Akka actor, it can only 
> send messages of a smaller size then the current {{akka.framesize}}. We 
> should check similarly to FLINK-10251 whether the payload exceeds the maximum 
> framesize and fail fast if it is true.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8577) Implement proctime DataStream to Table upsert conversion.

2018-10-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16651377#comment-16651377
 ] 

ASF GitHub Bot commented on FLINK-8577:
---

hequn8128 commented on issue #6787: [FLINK-8577][table] Implement proctime 
DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#issuecomment-430163860
 
 
   Discussed with Fabian in 
[FLINK-8578](https://issues.apache.org/jira/browse/FLINK-8578), we have to 
materialize the proc-time field and convert row-time attributes into regular 
TIMESTAMP attributes. I will update the PR ASAP. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Implement proctime DataStream to Table upsert conversion.
> -
>
> Key: FLINK-8577
> URL: https://issues.apache.org/jira/browse/FLINK-8577
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>
> Api will looks like:
> {code:java}
> DataStream[(String, Long, Int)] input = ???
> // upsert with keyTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key)
> // upsert without key -> single row tableTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code}
> A simple design 
> [doc|https://docs.google.com/document/d/1yMEi9xkm3pNYNmmZykbLzqtKjFVezaWe0y7Xqd0c1zE/edit?usp=sharing]
>  about this subtask.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] hequn8128 commented on issue #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2018-10-16 Thread GitBox
hequn8128 commented on issue #6787: [FLINK-8577][table] Implement proctime 
DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#issuecomment-430163860
 
 
   Discussed with Fabian in 
[FLINK-8578](https://issues.apache.org/jira/browse/FLINK-8578), we have to 
materialize the proc-time field and convert row-time attributes into regular 
TIMESTAMP attributes. I will update the PR ASAP. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10251) Handle oversized response messages in AkkaRpcActor

2018-10-16 Thread Chesnay Schepler (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10251?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16651376#comment-16651376
 ] 

Chesnay Schepler commented on FLINK-10251:
--

I don't know enough about akka to answer that. [~till.rohrmann] may know.

> Handle oversized response messages in AkkaRpcActor
> --
>
> Key: FLINK-10251
> URL: https://issues.apache.org/jira/browse/FLINK-10251
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.5.3, 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Assignee: vinoyang
>Priority: Major
> Fix For: 1.7.0, 1.6.2, 1.5.5
>
>
> The {{AkkaRpcActor}} should check whether an RPC response which is sent to a 
> remote sender does not exceed the maximum framesize of the underlying 
> {{ActorSystem}}. If this is the case we should fail fast instead. We can 
> achieve this by serializing the response and sending the serialized byte 
> array.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] kl0u commented on a change in pull request #6776: [FLINK-9715][table] Support temporal join with event time

2018-10-16 Thread GitBox
kl0u commented on a change in pull request #6776: [FLINK-9715][table] Support 
temporal join with event time
URL: https://github.com/apache/flink/pull/6776#discussion_r225459329
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala
 ##
 @@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import java.lang.{Long => JLong}
+import java.util
+import java.util.Comparator
+
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.runtime.state.{VoidNamespace, VoidNamespaceSerializer}
+import org.apache.flink.streaming.api.SimpleTimerService
+import org.apache.flink.streaming.api.operators._
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.table.api.StreamQueryConfig
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.CRowWrappingCollector
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.typeutils.TypeCheckUtils._
+import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
+import org.apache.flink.util.Preconditions.checkState
+
+import scala.collection.JavaConversions._
+
+/**
+  * This operator works by keeping on the state collection of probe and build 
records to process
+  * on next watermark. The idea is that between watermarks we are collecting 
those elements
+  * and once we are sure that there will be no updates we emit the correct 
result and clean up the
+  * state.
+  *
+  * Cleaning up the state drops all of the "old" values from the probe side, 
where "old" is defined
+  * as older then the current watermark. Build side is also cleaned up in the 
similar fashion,
+  * however we always keep at least one record - the latest one - even if it's 
past the last
+  * watermark.
+  *
+  * One more trick is how the emitting results and cleaning up is triggered. 
It is achieved
+  * by registering timers for the keys. We could register a timer for every 
probe and build
+  * side element's event time (when watermark exceeds this timer, that's when 
we are emitting and/or
+  * cleaning up the state). However this would cause huge number of registered 
timers. For example
+  * with following evenTimes of probe records accumulated: {1, 2, 5, 8, 9}, if 
we
+  * had received Watermark(10), it would trigger 5 separate timers for the 
same key. To avoid that
+  * we always keep only one single registered timer for any given key, 
registered for the minimal
+  * value. Upon triggering it, we process all records with event times older 
then or equal to
+  * currentWatermark.
+  */
+class TemporalRowtimeJoin(
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+genJoinFuncName: String,
+genJoinFuncCode: String,
+queryConfig: StreamQueryConfig,
+leftTimeAttribute: Int,
+rightTimeAttribute: Int)
+  extends AbstractStreamOperator[CRow]
+  with TwoInputStreamOperator[CRow, CRow, CRow]
+  with Triggerable[Any, VoidNamespace]
+  with Compiler[FlatJoinFunction[Row, Row, Row]]
+  with Logging {
+
+  validateEqualsHashCode("join", leftType)
+  validateEqualsHashCode("join", rightType)
+
+  private val NEXT_LEFT_INDEX_STATE_NAME = "next-index"
+  private val LEFT_STATE_NAME = "left"
+  private val RIGHT_STATE_NAME = "right"
+  private val REGISTERED_TIMER_STATE_NAME = "timer"
+  private val TIMERS_STATE_NAME = "timers"
+
+  private val rightRowtimeComparator = new 
RowtimeComparator(rightTimeAttribute)
+
+  /**
+* Incremental index generator for `leftState`'s keys.
+*/
+  private var nextLeftIndex: ValueState[JLong] = _
+
+  /**
+* Mapping from artificial row index (generated by `nextLeftIndex`) into 
the left side `Row`.
+* We can not use List to accumulate Rows, because we need efficient 
deletes of the oldest rows.
+*
+* TODO: this could be OrderedMultiMap[Jlong, Row] indexed by row's 
timestamp, to avoid
+* full map traversals (if we have lots of rows on the state th

[GitHub] kl0u commented on a change in pull request #6776: [FLINK-9715][table] Support temporal join with event time

2018-10-16 Thread GitBox
kl0u commented on a change in pull request #6776: [FLINK-9715][table] Support 
temporal join with event time
URL: https://github.com/apache/flink/pull/6776#discussion_r225459775
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala
 ##
 @@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import java.lang.{Long => JLong}
+import java.util
+import java.util.Comparator
+
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.runtime.state.{VoidNamespace, VoidNamespaceSerializer}
+import org.apache.flink.streaming.api.SimpleTimerService
+import org.apache.flink.streaming.api.operators._
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.table.api.StreamQueryConfig
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.CRowWrappingCollector
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.typeutils.TypeCheckUtils._
+import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
+import org.apache.flink.util.Preconditions.checkState
+
+import scala.collection.JavaConversions._
+
+/**
+  * This operator works by keeping on the state collection of probe and build 
records to process
+  * on next watermark. The idea is that between watermarks we are collecting 
those elements
+  * and once we are sure that there will be no updates we emit the correct 
result and clean up the
+  * state.
+  *
+  * Cleaning up the state drops all of the "old" values from the probe side, 
where "old" is defined
+  * as older then the current watermark. Build side is also cleaned up in the 
similar fashion,
+  * however we always keep at least one record - the latest one - even if it's 
past the last
+  * watermark.
+  *
+  * One more trick is how the emitting results and cleaning up is triggered. 
It is achieved
+  * by registering timers for the keys. We could register a timer for every 
probe and build
+  * side element's event time (when watermark exceeds this timer, that's when 
we are emitting and/or
+  * cleaning up the state). However this would cause huge number of registered 
timers. For example
+  * with following evenTimes of probe records accumulated: {1, 2, 5, 8, 9}, if 
we
+  * had received Watermark(10), it would trigger 5 separate timers for the 
same key. To avoid that
+  * we always keep only one single registered timer for any given key, 
registered for the minimal
+  * value. Upon triggering it, we process all records with event times older 
then or equal to
+  * currentWatermark.
+  */
+class TemporalRowtimeJoin(
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+genJoinFuncName: String,
+genJoinFuncCode: String,
+queryConfig: StreamQueryConfig,
+leftTimeAttribute: Int,
+rightTimeAttribute: Int)
+  extends AbstractStreamOperator[CRow]
+  with TwoInputStreamOperator[CRow, CRow, CRow]
+  with Triggerable[Any, VoidNamespace]
+  with Compiler[FlatJoinFunction[Row, Row, Row]]
+  with Logging {
+
+  validateEqualsHashCode("join", leftType)
+  validateEqualsHashCode("join", rightType)
+
+  private val NEXT_LEFT_INDEX_STATE_NAME = "next-index"
+  private val LEFT_STATE_NAME = "left"
+  private val RIGHT_STATE_NAME = "right"
+  private val REGISTERED_TIMER_STATE_NAME = "timer"
+  private val TIMERS_STATE_NAME = "timers"
+
+  private val rightRowtimeComparator = new 
RowtimeComparator(rightTimeAttribute)
+
+  /**
+* Incremental index generator for `leftState`'s keys.
+*/
+  private var nextLeftIndex: ValueState[JLong] = _
+
+  /**
+* Mapping from artificial row index (generated by `nextLeftIndex`) into 
the left side `Row`.
+* We can not use List to accumulate Rows, because we need efficient 
deletes of the oldest rows.
+*
+* TODO: this could be OrderedMultiMap[Jlong, Row] indexed by row's 
timestamp, to avoid
+* full map traversals (if we have lots of rows on the state th

[jira] [Commented] (FLINK-9715) Support versioned joins with event time

2018-10-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16651380#comment-16651380
 ] 

ASF GitHub Bot commented on FLINK-9715:
---

kl0u commented on a change in pull request #6776: [FLINK-9715][table] Support 
temporal join with event time
URL: https://github.com/apache/flink/pull/6776#discussion_r225459329
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala
 ##
 @@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import java.lang.{Long => JLong}
+import java.util
+import java.util.Comparator
+
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.runtime.state.{VoidNamespace, VoidNamespaceSerializer}
+import org.apache.flink.streaming.api.SimpleTimerService
+import org.apache.flink.streaming.api.operators._
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.table.api.StreamQueryConfig
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.CRowWrappingCollector
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.typeutils.TypeCheckUtils._
+import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
+import org.apache.flink.util.Preconditions.checkState
+
+import scala.collection.JavaConversions._
+
+/**
+  * This operator works by keeping on the state collection of probe and build 
records to process
+  * on next watermark. The idea is that between watermarks we are collecting 
those elements
+  * and once we are sure that there will be no updates we emit the correct 
result and clean up the
+  * state.
+  *
+  * Cleaning up the state drops all of the "old" values from the probe side, 
where "old" is defined
+  * as older then the current watermark. Build side is also cleaned up in the 
similar fashion,
+  * however we always keep at least one record - the latest one - even if it's 
past the last
+  * watermark.
+  *
+  * One more trick is how the emitting results and cleaning up is triggered. 
It is achieved
+  * by registering timers for the keys. We could register a timer for every 
probe and build
+  * side element's event time (when watermark exceeds this timer, that's when 
we are emitting and/or
+  * cleaning up the state). However this would cause huge number of registered 
timers. For example
+  * with following evenTimes of probe records accumulated: {1, 2, 5, 8, 9}, if 
we
+  * had received Watermark(10), it would trigger 5 separate timers for the 
same key. To avoid that
+  * we always keep only one single registered timer for any given key, 
registered for the minimal
+  * value. Upon triggering it, we process all records with event times older 
then or equal to
+  * currentWatermark.
+  */
+class TemporalRowtimeJoin(
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+genJoinFuncName: String,
+genJoinFuncCode: String,
+queryConfig: StreamQueryConfig,
+leftTimeAttribute: Int,
+rightTimeAttribute: Int)
+  extends AbstractStreamOperator[CRow]
+  with TwoInputStreamOperator[CRow, CRow, CRow]
+  with Triggerable[Any, VoidNamespace]
+  with Compiler[FlatJoinFunction[Row, Row, Row]]
+  with Logging {
+
+  validateEqualsHashCode("join", leftType)
+  validateEqualsHashCode("join", rightType)
+
+  private val NEXT_LEFT_INDEX_STATE_NAME = "next-index"
+  private val LEFT_STATE_NAME = "left"
+  private val RIGHT_STATE_NAME = "right"
+  private val REGISTERED_TIMER_STATE_NAME = "timer"
+  private val TIMERS_STATE_NAME = "timers"
+
+  private val rightRowtimeComparator = new 
RowtimeComparator(rightTimeAttribute)
+
+  /**
+* Incremental index generator for `leftState`'s keys.
+*/
+  private var nextLeftIndex: ValueState[JLong] = _
+
+  /**
+* Mapping from artificial row index (generated by `nextLeftIndex`) into 
the left side `Row`.
+* We ca

[GitHub] yanghua commented on a change in pull request #6850: [FLINK-10252] Handle oversized metric messges

2018-10-16 Thread GitBox
yanghua commented on a change in pull request #6850: [FLINK-10252] Handle 
oversized metric messges
URL: https://github.com/apache/flink/pull/6850#discussion_r225460957
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java
 ##
 @@ -70,6 +72,17 @@ public String filterCharacters(String input) {
private final Map> histograms 
= new HashMap<>();
private final Map> meters = new 
HashMap<>();
 
+   private long maximumFramesize;
+
+   @Override
+   public void preStart() throws Exception {
+   if 
(getContext().system().settings().config().hasPath(MAXIMUM_FRAME_SIZE_PATH)) {
 
 Review comment:
   `AkkaRpcService` also uses this mode, do you suggest me to refactor it? From 
the history of git, it is implemented by @tillrohrmann .


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9715) Support versioned joins with event time

2018-10-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16651379#comment-16651379
 ] 

ASF GitHub Bot commented on FLINK-9715:
---

kl0u commented on a change in pull request #6776: [FLINK-9715][table] Support 
temporal join with event time
URL: https://github.com/apache/flink/pull/6776#discussion_r225459775
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala
 ##
 @@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import java.lang.{Long => JLong}
+import java.util
+import java.util.Comparator
+
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.runtime.state.{VoidNamespace, VoidNamespaceSerializer}
+import org.apache.flink.streaming.api.SimpleTimerService
+import org.apache.flink.streaming.api.operators._
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.table.api.StreamQueryConfig
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.CRowWrappingCollector
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.typeutils.TypeCheckUtils._
+import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
+import org.apache.flink.util.Preconditions.checkState
+
+import scala.collection.JavaConversions._
+
+/**
+  * This operator works by keeping on the state collection of probe and build 
records to process
+  * on next watermark. The idea is that between watermarks we are collecting 
those elements
+  * and once we are sure that there will be no updates we emit the correct 
result and clean up the
+  * state.
+  *
+  * Cleaning up the state drops all of the "old" values from the probe side, 
where "old" is defined
+  * as older then the current watermark. Build side is also cleaned up in the 
similar fashion,
+  * however we always keep at least one record - the latest one - even if it's 
past the last
+  * watermark.
+  *
+  * One more trick is how the emitting results and cleaning up is triggered. 
It is achieved
+  * by registering timers for the keys. We could register a timer for every 
probe and build
+  * side element's event time (when watermark exceeds this timer, that's when 
we are emitting and/or
+  * cleaning up the state). However this would cause huge number of registered 
timers. For example
+  * with following evenTimes of probe records accumulated: {1, 2, 5, 8, 9}, if 
we
+  * had received Watermark(10), it would trigger 5 separate timers for the 
same key. To avoid that
+  * we always keep only one single registered timer for any given key, 
registered for the minimal
+  * value. Upon triggering it, we process all records with event times older 
then or equal to
+  * currentWatermark.
+  */
+class TemporalRowtimeJoin(
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+genJoinFuncName: String,
+genJoinFuncCode: String,
+queryConfig: StreamQueryConfig,
+leftTimeAttribute: Int,
+rightTimeAttribute: Int)
+  extends AbstractStreamOperator[CRow]
+  with TwoInputStreamOperator[CRow, CRow, CRow]
+  with Triggerable[Any, VoidNamespace]
+  with Compiler[FlatJoinFunction[Row, Row, Row]]
+  with Logging {
+
+  validateEqualsHashCode("join", leftType)
+  validateEqualsHashCode("join", rightType)
+
+  private val NEXT_LEFT_INDEX_STATE_NAME = "next-index"
+  private val LEFT_STATE_NAME = "left"
+  private val RIGHT_STATE_NAME = "right"
+  private val REGISTERED_TIMER_STATE_NAME = "timer"
+  private val TIMERS_STATE_NAME = "timers"
+
+  private val rightRowtimeComparator = new 
RowtimeComparator(rightTimeAttribute)
+
+  /**
+* Incremental index generator for `leftState`'s keys.
+*/
+  private var nextLeftIndex: ValueState[JLong] = _
+
+  /**
+* Mapping from artificial row index (generated by `nextLeftIndex`) into 
the left side `Row`.
+* We ca

[jira] [Commented] (FLINK-10252) Handle oversized metric messges

2018-10-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16651382#comment-16651382
 ] 

ASF GitHub Bot commented on FLINK-10252:


yanghua commented on a change in pull request #6850: [FLINK-10252] Handle 
oversized metric messges
URL: https://github.com/apache/flink/pull/6850#discussion_r225460957
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java
 ##
 @@ -70,6 +72,17 @@ public String filterCharacters(String input) {
private final Map> histograms 
= new HashMap<>();
private final Map> meters = new 
HashMap<>();
 
+   private long maximumFramesize;
+
+   @Override
+   public void preStart() throws Exception {
+   if 
(getContext().system().settings().config().hasPath(MAXIMUM_FRAME_SIZE_PATH)) {
 
 Review comment:
   `AkkaRpcService` also uses this mode, do you suggest me to refactor it? From 
the history of git, it is implemented by @tillrohrmann .


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Handle oversized metric messges
> ---
>
> Key: FLINK-10252
> URL: https://issues.apache.org/jira/browse/FLINK-10252
> Project: Flink
>  Issue Type: Sub-task
>  Components: Metrics
>Affects Versions: 1.5.3, 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Assignee: vinoyang
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.7.0, 1.6.2, 1.5.5
>
>
> Since the {{MetricQueryService}} is implemented as an Akka actor, it can only 
> send messages of a smaller size then the current {{akka.framesize}}. We 
> should check similarly to FLINK-10251 whether the payload exceeds the maximum 
> framesize and fail fast if it is true.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10562) Relax (or document) table name constraints

2018-10-16 Thread Flavio Pompermaier (JIRA)
Flavio Pompermaier created FLINK-10562:
--

 Summary: Relax (or document) table name constraints
 Key: FLINK-10562
 URL: https://issues.apache.org/jira/browse/FLINK-10562
 Project: Flink
  Issue Type: Improvement
  Components: Table API & SQL
Affects Versions: 1.6.1
Reporter: Flavio Pompermaier


At the moment it's not possible to register a table whose name starts with a 
number (e.g. 1_test). Moreover this constraint is not reported in the 
documentation.

I propose to enable table name escaping somehow in order to enable more general 
scenarios like those having spaces in between (e.g. select * from 'my table' ).

Best,
Flavio

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] pnowojski commented on a change in pull request #6833: [FLINK-10537][network] Fix network small performance degradation after merging [FLINK-9913]

2018-10-16 Thread GitBox
pnowojski commented on a change in pull request #6833: [FLINK-10537][network] 
Fix network small performance degradation after merging [FLINK-9913]
URL: https://github.com/apache/flink/pull/6833#discussion_r225462543
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
 ##
 @@ -92,20 +92,9 @@ public void serializeRecord(T record) throws IOException {
 */
@Override
public SerializationResult copyToBufferBuilder(BufferBuilder 
targetBuffer) {
-   boolean mustCommit = false;
-   if (lengthBuffer.hasRemaining()) {
 
 Review comment:
   I don't think it matters and it would complicate the code.
   
   In case of records spanning multiple buffers (3 or more), the cost of 
calling `targetBuffer.append(lengthBuffer)` multiple times would be very small 
compared to even the cost of copying `32KB` of data. If you also account the 
actual cost of serialising the record that size, this become even more 
unimportant.  It could matters only if records are small and then the opposite 
is true (as our benchmarks are showing).


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] asfgit closed pull request #6792: [FLINK-10474][table] Don't translate IN/NOT_IN to JOIN with VALUES

2018-10-16 Thread GitBox
asfgit closed pull request #6792: [FLINK-10474][table] Don't translate 
IN/NOT_IN to JOIN with VALUES
URL: https://github.com/apache/flink/pull/6792
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
index d740c3f1f99..564fb8648a2 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
@@ -33,6 +33,7 @@ import org.apache.calcite.schema.impl.AbstractTable
 import org.apache.calcite.sql._
 import org.apache.calcite.sql.parser.SqlParser
 import org.apache.calcite.sql.util.ChainedSqlOperatorTable
+import org.apache.calcite.sql2rel.SqlToRelConverter
 import org.apache.calcite.tools._
 import org.apache.flink.api.common.functions.MapFunction
 import org.apache.flink.api.common.typeinfo.TypeInformation
@@ -108,6 +109,15 @@ abstract class TableEnvironment(val config: TableConfig) {
   // registered external catalog names -> catalog
   private val externalCatalogs = new mutable.HashMap[String, ExternalCatalog]
 
+  // configuration for SqlToRelConverter
+  private[flink] lazy val sqlToRelConverterConfig: SqlToRelConverter.Config = {
+val calciteConfig = config.getCalciteConfig
+calciteConfig.getSqlToRelConverterConfig match {
+  case Some(c) => c
+  case None => getSqlToRelConverterConfig
+}
+  }
+
   /** Returns the table config to define the runtime behavior of the Table 
API. */
   def getConfig: TableConfig = config
 
@@ -118,6 +128,17 @@ abstract class TableEnvironment(val config: TableConfig) {
 case _ => null
   }
 
+  /**
+* Returns the SqlToRelConverter config.
+*/
+  protected def getSqlToRelConverterConfig: SqlToRelConverter.Config = {
+SqlToRelConverter.configBuilder()
+  .withTrimUnusedFields(false)
+  .withConvertTableAccess(false)
+  .withInSubQueryThreshold(Integer.MAX_VALUE)
+  .build()
+  }
+
   /**
 * Returns the operator table for this environment including a custom 
Calcite configuration.
 */
@@ -698,7 +719,8 @@ abstract class TableEnvironment(val config: TableConfig) {
 * @return The result of the query as Table
 */
   def sqlQuery(query: String): Table = {
-val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, 
getTypeFactory)
+val planner = new FlinkPlannerImpl(
+  getFrameworkConfig, getPlanner, getTypeFactory, sqlToRelConverterConfig)
 // parse the sql query
 val parsed = planner.parse(query)
 if (null != parsed && parsed.getKind.belongsTo(SqlKind.QUERY)) {
@@ -758,7 +780,8 @@ abstract class TableEnvironment(val config: TableConfig) {
 * @param config The [[QueryConfig]] to use.
 */
   def sqlUpdate(stmt: String, config: QueryConfig): Unit = {
-val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, 
getTypeFactory)
+val planner = new FlinkPlannerImpl(
+  getFrameworkConfig, getPlanner, getTypeFactory, sqlToRelConverterConfig)
 // parse the sql query
 val parsed = planner.parse(stmt)
 parsed match {
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/CalciteConfig.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/CalciteConfig.scala
index accc628c914..b5d6c06e40f 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/CalciteConfig.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/CalciteConfig.scala
@@ -25,6 +25,7 @@ import org.apache.calcite.plan.RelOptRule
 import org.apache.calcite.sql.SqlOperatorTable
 import org.apache.calcite.sql.parser.SqlParser
 import org.apache.calcite.sql.util.ChainedSqlOperatorTable
+import org.apache.calcite.sql2rel.SqlToRelConverter
 import org.apache.calcite.tools.{RuleSet, RuleSets}
 import org.apache.flink.util.Preconditions
 
@@ -72,6 +73,11 @@ class CalciteConfigBuilder {
 */
   private var replaceSqlParserConfig: Option[SqlParser.Config] = None
 
+  /**
+* Defines a configuration for SqlToRelConverter.
+*/
+  private var replaceSqlToRelConverterConfig: Option[SqlToRelConverter.Config] 
= None
+
   /**
 * Replaces the built-in normalization rule set with the given rule set.
 */
@@ -183,6 +189,15 @@ class CalciteConfigBuilder {
 this
   }
 
+  /**
+* Replaces the built-in SqlToRelConverter configuration with the given 
configuration.
+*/
+  def replaceSqlToRelConverterConfig(config: SqlToRelConverter.Config): 
CalciteConfigBuilder = {
+Preconditions.checkNotNull(confi

[jira] [Commented] (FLINK-10537) Network throughput performance regression after broadcast changes

2018-10-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10537?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16651388#comment-16651388
 ] 

ASF GitHub Bot commented on FLINK-10537:


pnowojski commented on a change in pull request #6833: [FLINK-10537][network] 
Fix network small performance degradation after merging [FLINK-9913]
URL: https://github.com/apache/flink/pull/6833#discussion_r225462543
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
 ##
 @@ -92,20 +92,9 @@ public void serializeRecord(T record) throws IOException {
 */
@Override
public SerializationResult copyToBufferBuilder(BufferBuilder 
targetBuffer) {
-   boolean mustCommit = false;
-   if (lengthBuffer.hasRemaining()) {
 
 Review comment:
   I don't think it matters and it would complicate the code.
   
   In case of records spanning multiple buffers (3 or more), the cost of 
calling `targetBuffer.append(lengthBuffer)` multiple times would be very small 
compared to even the cost of copying `32KB` of data. If you also account the 
actual cost of serialising the record that size, this become even more 
unimportant.  It could matters only if records are small and then the opposite 
is true (as our benchmarks are showing).


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Network throughput performance regression after broadcast changes
> -
>
> Key: FLINK-10537
> URL: https://issues.apache.org/jira/browse/FLINK-10537
> Project: Flink
>  Issue Type: Bug
>  Components: Network
>Affects Versions: 1.7.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>
> There is a slight network throughput regression introduced in: 
> https://issues.apache.org/jira/browse/FLINK-9913
> It is visible in the following benchmark:
> [http://codespeed.dak8s.net:8000/timeline/#/?exe=1&ben=networkThroughput.1,100ms&env=2&revs=200&equid=off&quarts=on&extr=on]
> (drop in the chart that happened since 21st September.)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10537) Network throughput performance regression after broadcast changes

2018-10-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10537?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16651387#comment-16651387
 ] 

ASF GitHub Bot commented on FLINK-10537:


pnowojski commented on a change in pull request #6833: [FLINK-10537][network] 
Fix network small performance degradation after merging [FLINK-9913]
URL: https://github.com/apache/flink/pull/6833#discussion_r225459139
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
 ##
 @@ -127,6 +116,7 @@ public void reset() {
 
@Override
public void prune() {
+   serializationBuffer.clear();
 
 Review comment:
   Yes, I know that it will be called and that's not a bug fix. I've added this 
line when I was going though your PR trying to find any differences in 
behaviour vs the old code. I didn't expect this to be a performance issue and I 
still don't. However when I was reverting the changes I decided to keep it. 
From the consistency point of view it seems better to call clear 
(`this.position = 0;`) before/during pruning the buffer 
(`serializationBuffer.pruneBuffer();`). Otherwise for brief moment of time when 
we exit this method, we might have `position` pointing to some non existing 
place (potential `ArrayIndexOutOfBound`.
   
   Maybe `serializationBuffer .pruneBuffer` should be setting the position to 
0? Or maybe I should split this change to separate commit. As it is now, it 
suggests that it has something to do with performance (which I doubt).


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Network throughput performance regression after broadcast changes
> -
>
> Key: FLINK-10537
> URL: https://issues.apache.org/jira/browse/FLINK-10537
> Project: Flink
>  Issue Type: Bug
>  Components: Network
>Affects Versions: 1.7.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>
> There is a slight network throughput regression introduced in: 
> https://issues.apache.org/jira/browse/FLINK-9913
> It is visible in the following benchmark:
> [http://codespeed.dak8s.net:8000/timeline/#/?exe=1&ben=networkThroughput.1,100ms&env=2&revs=200&equid=off&quarts=on&extr=on]
> (drop in the chart that happened since 21st September.)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10474) Don't translate IN with Literals to JOIN with VALUES for streaming queries

2018-10-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16651385#comment-16651385
 ] 

ASF GitHub Bot commented on FLINK-10474:


asfgit closed pull request #6792: [FLINK-10474][table] Don't translate 
IN/NOT_IN to JOIN with VALUES
URL: https://github.com/apache/flink/pull/6792
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
index d740c3f1f99..564fb8648a2 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
@@ -33,6 +33,7 @@ import org.apache.calcite.schema.impl.AbstractTable
 import org.apache.calcite.sql._
 import org.apache.calcite.sql.parser.SqlParser
 import org.apache.calcite.sql.util.ChainedSqlOperatorTable
+import org.apache.calcite.sql2rel.SqlToRelConverter
 import org.apache.calcite.tools._
 import org.apache.flink.api.common.functions.MapFunction
 import org.apache.flink.api.common.typeinfo.TypeInformation
@@ -108,6 +109,15 @@ abstract class TableEnvironment(val config: TableConfig) {
   // registered external catalog names -> catalog
   private val externalCatalogs = new mutable.HashMap[String, ExternalCatalog]
 
+  // configuration for SqlToRelConverter
+  private[flink] lazy val sqlToRelConverterConfig: SqlToRelConverter.Config = {
+val calciteConfig = config.getCalciteConfig
+calciteConfig.getSqlToRelConverterConfig match {
+  case Some(c) => c
+  case None => getSqlToRelConverterConfig
+}
+  }
+
   /** Returns the table config to define the runtime behavior of the Table 
API. */
   def getConfig: TableConfig = config
 
@@ -118,6 +128,17 @@ abstract class TableEnvironment(val config: TableConfig) {
 case _ => null
   }
 
+  /**
+* Returns the SqlToRelConverter config.
+*/
+  protected def getSqlToRelConverterConfig: SqlToRelConverter.Config = {
+SqlToRelConverter.configBuilder()
+  .withTrimUnusedFields(false)
+  .withConvertTableAccess(false)
+  .withInSubQueryThreshold(Integer.MAX_VALUE)
+  .build()
+  }
+
   /**
 * Returns the operator table for this environment including a custom 
Calcite configuration.
 */
@@ -698,7 +719,8 @@ abstract class TableEnvironment(val config: TableConfig) {
 * @return The result of the query as Table
 */
   def sqlQuery(query: String): Table = {
-val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, 
getTypeFactory)
+val planner = new FlinkPlannerImpl(
+  getFrameworkConfig, getPlanner, getTypeFactory, sqlToRelConverterConfig)
 // parse the sql query
 val parsed = planner.parse(query)
 if (null != parsed && parsed.getKind.belongsTo(SqlKind.QUERY)) {
@@ -758,7 +780,8 @@ abstract class TableEnvironment(val config: TableConfig) {
 * @param config The [[QueryConfig]] to use.
 */
   def sqlUpdate(stmt: String, config: QueryConfig): Unit = {
-val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, 
getTypeFactory)
+val planner = new FlinkPlannerImpl(
+  getFrameworkConfig, getPlanner, getTypeFactory, sqlToRelConverterConfig)
 // parse the sql query
 val parsed = planner.parse(stmt)
 parsed match {
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/CalciteConfig.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/CalciteConfig.scala
index accc628c914..b5d6c06e40f 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/CalciteConfig.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/CalciteConfig.scala
@@ -25,6 +25,7 @@ import org.apache.calcite.plan.RelOptRule
 import org.apache.calcite.sql.SqlOperatorTable
 import org.apache.calcite.sql.parser.SqlParser
 import org.apache.calcite.sql.util.ChainedSqlOperatorTable
+import org.apache.calcite.sql2rel.SqlToRelConverter
 import org.apache.calcite.tools.{RuleSet, RuleSets}
 import org.apache.flink.util.Preconditions
 
@@ -72,6 +73,11 @@ class CalciteConfigBuilder {
 */
   private var replaceSqlParserConfig: Option[SqlParser.Config] = None
 
+  /**
+* Defines a configuration for SqlToRelConverter.
+*/
+  private var replaceSqlToRelConverterConfig: Option[SqlToRelConverter.Config] 
= None
+
   /**
 * Replaces the built-in normalization rule set with the given rule set.
 */
@@ -183,6 +189,15 @@ class CalciteConfigBuilder {

[jira] [Closed] (FLINK-10474) Don't translate IN with Literals to JOIN with VALUES for streaming queries

2018-10-16 Thread Fabian Hueske (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10474?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske closed FLINK-10474.
-
   Resolution: Fixed
Fix Version/s: 1.7.0

Fixed for 1.7.0 b40b27ab8541bac07f5b9b461c39cc784835f191

> Don't translate IN with Literals to JOIN with VALUES for streaming queries
> --
>
> Key: FLINK-10474
> URL: https://issues.apache.org/jira/browse/FLINK-10474
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.6.1, 1.7.0
>Reporter: Fabian Hueske
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> IN predicates with literals are translated to JOIN with VALUES if the number 
> of elements in the IN clause exceeds a certain threshold. This should not be 
> done, because a streaming join is very heavy and materializes both inputs 
> (which is fine for the VALUES) input but not for the other.
> There are two ways to solve this:
>  # don't translate IN to a JOIN at all
>  # translate it to a JOIN but have a special join strategy if one input is 
> bound and final (non-updating)
> Option 1. should be easy to do, option 2. requires much more effort.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] pnowojski commented on a change in pull request #6833: [FLINK-10537][network] Fix network small performance degradation after merging [FLINK-9913]

2018-10-16 Thread GitBox
pnowojski commented on a change in pull request #6833: [FLINK-10537][network] 
Fix network small performance degradation after merging [FLINK-9913]
URL: https://github.com/apache/flink/pull/6833#discussion_r225459139
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
 ##
 @@ -127,6 +116,7 @@ public void reset() {
 
@Override
public void prune() {
+   serializationBuffer.clear();
 
 Review comment:
   Yes, I know that it will be called and that's not a bug fix. I've added this 
line when I was going though your PR trying to find any differences in 
behaviour vs the old code. I didn't expect this to be a performance issue and I 
still don't. However when I was reverting the changes I decided to keep it. 
From the consistency point of view it seems better to call clear 
(`this.position = 0;`) before/during pruning the buffer 
(`serializationBuffer.pruneBuffer();`). Otherwise for brief moment of time when 
we exit this method, we might have `position` pointing to some non existing 
place (potential `ArrayIndexOutOfBound`.
   
   Maybe `serializationBuffer .pruneBuffer` should be setting the position to 
0? Or maybe I should split this change to separate commit. As it is now, it 
suggests that it has something to do with performance (which I doubt).


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6698: [FLINK-8581][network] Move flushing remote subpartitions from OutputFlusher to netty

2018-10-16 Thread GitBox
pnowojski commented on a change in pull request #6698: [FLINK-8581][network] 
Move flushing remote subpartitions from OutputFlusher to netty
URL: https://github.com/apache/flink/pull/6698#discussion_r225464137
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
 ##
 @@ -268,6 +291,25 @@ public void flushAll() {
}
}
 
+   @Override
+   public void flushAllLocal() {
+   for (ResultSubpartition localSubpartition : localSubpartitions) 
{
+   localSubpartition.flush();
+   }
+   }
+
+   @Override
+   public void setFlushTimeout(long flushTimeout) {
+   checkState(!this.flushTimeout.isPresent(), "Flush timeout can 
not be set twice");
+   for (ResultSubpartition subpartition: 
remoteSubpartitionsMissingPeriodicFlushes) {
+   checkState(subpartition.isLocal().isPresent());
+   checkState(!subpartition.isLocal().get());
+   subpartition.registerPeriodicFlush(flushTimeout);
+   }
+   remoteSubpartitionsMissingPeriodicFlushes.clear();
 
 Review comment:
   In that case I would have to re-evaluate this. Adding extra synchronisation 
would further complicate the code and could cause regressions.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-8581) Improve performance for low latency network

2018-10-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16651390#comment-16651390
 ] 

ASF GitHub Bot commented on FLINK-8581:
---

pnowojski commented on a change in pull request #6698: [FLINK-8581][network] 
Move flushing remote subpartitions from OutputFlusher to netty
URL: https://github.com/apache/flink/pull/6698#discussion_r225464137
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
 ##
 @@ -268,6 +291,25 @@ public void flushAll() {
}
}
 
+   @Override
+   public void flushAllLocal() {
+   for (ResultSubpartition localSubpartition : localSubpartitions) 
{
+   localSubpartition.flush();
+   }
+   }
+
+   @Override
+   public void setFlushTimeout(long flushTimeout) {
+   checkState(!this.flushTimeout.isPresent(), "Flush timeout can 
not be set twice");
+   for (ResultSubpartition subpartition: 
remoteSubpartitionsMissingPeriodicFlushes) {
+   checkState(subpartition.isLocal().isPresent());
+   checkState(!subpartition.isLocal().get());
+   subpartition.registerPeriodicFlush(flushTimeout);
+   }
+   remoteSubpartitionsMissingPeriodicFlushes.clear();
 
 Review comment:
   In that case I would have to re-evaluate this. Adding extra synchronisation 
would further complicate the code and could cause regressions.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve performance for low latency network
> ---
>
> Key: FLINK-8581
> URL: https://issues.apache.org/jira/browse/FLINK-8581
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] twalthr commented on issue #6791: [FLINK-8865][sql-client] Add CLI query code completion in SQL Client

2018-10-16 Thread GitBox
twalthr commented on issue #6791: [FLINK-8865][sql-client] Add CLI query code 
completion in SQL Client
URL: https://github.com/apache/flink/pull/6791#issuecomment-430168672
 
 
   Thank you @xueyumusic. I found a couple of issues regarding escaping. I will 
fix them and merge this.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-8865) Add CLI query code completion in SQL Client

2018-10-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8865?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16651392#comment-16651392
 ] 

ASF GitHub Bot commented on FLINK-8865:
---

twalthr commented on issue #6791: [FLINK-8865][sql-client] Add CLI query code 
completion in SQL Client
URL: https://github.com/apache/flink/pull/6791#issuecomment-430168672
 
 
   Thank you @xueyumusic. I found a couple of issues regarding escaping. I will 
fix them and merge this.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add CLI query code completion in SQL Client
> ---
>
> Key: FLINK-8865
> URL: https://issues.apache.org/jira/browse/FLINK-8865
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: xueyu
>Priority: Major
>  Labels: pull-request-available
>
> This issue is a subtask of part two "Full Embedded SQL Client" of the 
> implementation plan mentioned in 
> [FLIP-24|https://cwiki.apache.org/confluence/display/FLINK/FLIP-24+-+SQL+Client].
> Calcite already offers a code completion functionality. It would be great if 
> we could expose this feature also through the SQL CLI Client.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10562) Relax (or document) table name constraints

2018-10-16 Thread Timo Walther (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10562?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16651400#comment-16651400
 ] 

Timo Walther commented on FLINK-10562:
--

[~f.pompermaier] Escaping in SQL queries is already possible using backticks:

{code}
select * from `my table`
{code}

> Relax (or document) table name constraints
> --
>
> Key: FLINK-10562
> URL: https://issues.apache.org/jira/browse/FLINK-10562
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.6.1
>Reporter: Flavio Pompermaier
>Priority: Minor
>
> At the moment it's not possible to register a table whose name starts with a 
> number (e.g. 1_test). Moreover this constraint is not reported in the 
> documentation.
> I propose to enable table name escaping somehow in order to enable more 
> general scenarios like those having spaces in between (e.g. select * from 'my 
> table' ).
> Best,
> Flavio
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] zentol commented on a change in pull request #6850: [FLINK-10252] Handle oversized metric messges

2018-10-16 Thread GitBox
zentol commented on a change in pull request #6850: [FLINK-10252] Handle 
oversized metric messges
URL: https://github.com/apache/flink/pull/6850#discussion_r225473371
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java
 ##
 @@ -109,7 +122,17 @@ public void onReceive(Object message) {
}
} else if (message instanceof CreateDump) {

MetricDumpSerialization.MetricSerializationResult dump = 
serializer.serialize(counters, gauges, histograms, meters);
-   getSender().tell(dump, getSelf());
+
+   int realMsgSize = dump.serializedMetrics.length;
+
+   if (realMsgSize > maximumFramesize) {
+   String overSizeErrorMsg = "The metric 
dump message size : " + realMsgSize
+   + " exceeds the maximum akka 
framesize : " + maximumFramesize + ".";
+   LOG.error(overSizeErrorMsg);
+   getSender().tell(new Status.Failure(new 
IOException(overSizeErrorMsg)), getSelf());
 
 Review comment:
   well then don't pass incomplete data. metrics are serialized one at a time, 
so you can just remember during serialization the last metrics that fit into 
the buffer, and once you go over the limit go back to that.
   You can also use other interesting heuristics like dropping histograms first 
(since they are significantly larger).


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10252) Handle oversized metric messges

2018-10-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16651411#comment-16651411
 ] 

ASF GitHub Bot commented on FLINK-10252:


zentol commented on a change in pull request #6850: [FLINK-10252] Handle 
oversized metric messges
URL: https://github.com/apache/flink/pull/6850#discussion_r225473371
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java
 ##
 @@ -109,7 +122,17 @@ public void onReceive(Object message) {
}
} else if (message instanceof CreateDump) {

MetricDumpSerialization.MetricSerializationResult dump = 
serializer.serialize(counters, gauges, histograms, meters);
-   getSender().tell(dump, getSelf());
+
+   int realMsgSize = dump.serializedMetrics.length;
+
+   if (realMsgSize > maximumFramesize) {
+   String overSizeErrorMsg = "The metric 
dump message size : " + realMsgSize
+   + " exceeds the maximum akka 
framesize : " + maximumFramesize + ".";
+   LOG.error(overSizeErrorMsg);
+   getSender().tell(new Status.Failure(new 
IOException(overSizeErrorMsg)), getSelf());
 
 Review comment:
   well then don't pass incomplete data. metrics are serialized one at a time, 
so you can just remember during serialization the last metrics that fit into 
the buffer, and once you go over the limit go back to that.
   You can also use other interesting heuristics like dropping histograms first 
(since they are significantly larger).


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Handle oversized metric messges
> ---
>
> Key: FLINK-10252
> URL: https://issues.apache.org/jira/browse/FLINK-10252
> Project: Flink
>  Issue Type: Sub-task
>  Components: Metrics
>Affects Versions: 1.5.3, 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Assignee: vinoyang
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.7.0, 1.6.2, 1.5.5
>
>
> Since the {{MetricQueryService}} is implemented as an Akka actor, it can only 
> send messages of a smaller size then the current {{akka.framesize}}. We 
> should check similarly to FLINK-10251 whether the payload exceeds the maximum 
> framesize and fail fast if it is true.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10252) Handle oversized metric messges

2018-10-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16651417#comment-16651417
 ] 

ASF GitHub Bot commented on FLINK-10252:


zentol commented on a change in pull request #6850: [FLINK-10252] Handle 
oversized metric messges
URL: https://github.com/apache/flink/pull/6850#discussion_r225473977
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java
 ##
 @@ -70,6 +72,17 @@ public String filterCharacters(String input) {
private final Map> histograms 
= new HashMap<>();
private final Map> meters = new 
HashMap<>();
 
+   private long maximumFramesize;
+
+   @Override
+   public void preStart() throws Exception {
+   if 
(getContext().system().settings().config().hasPath(MAXIMUM_FRAME_SIZE_PATH)) {
 
 Review comment:
   keep it then, but also copy the comment for the else branch since it is 
important.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Handle oversized metric messges
> ---
>
> Key: FLINK-10252
> URL: https://issues.apache.org/jira/browse/FLINK-10252
> Project: Flink
>  Issue Type: Sub-task
>  Components: Metrics
>Affects Versions: 1.5.3, 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Assignee: vinoyang
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.7.0, 1.6.2, 1.5.5
>
>
> Since the {{MetricQueryService}} is implemented as an Akka actor, it can only 
> send messages of a smaller size then the current {{akka.framesize}}. We 
> should check similarly to FLINK-10251 whether the payload exceeds the maximum 
> framesize and fail fast if it is true.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] zentol commented on a change in pull request #6850: [FLINK-10252] Handle oversized metric messges

2018-10-16 Thread GitBox
zentol commented on a change in pull request #6850: [FLINK-10252] Handle 
oversized metric messges
URL: https://github.com/apache/flink/pull/6850#discussion_r225473977
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java
 ##
 @@ -70,6 +72,17 @@ public String filterCharacters(String input) {
private final Map> histograms 
= new HashMap<>();
private final Map> meters = new 
HashMap<>();
 
+   private long maximumFramesize;
+
+   @Override
+   public void preStart() throws Exception {
+   if 
(getContext().system().settings().config().hasPath(MAXIMUM_FRAME_SIZE_PATH)) {
 
 Review comment:
   keep it then, but also copy the comment for the else branch since it is 
important.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zentol commented on a change in pull request #6850: [FLINK-10252] Handle oversized metric messges

2018-10-16 Thread GitBox
zentol commented on a change in pull request #6850: [FLINK-10252] Handle 
oversized metric messges
URL: https://github.com/apache/flink/pull/6850#discussion_r225474296
 
 

 ##
 File path: 
flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
 ##
 @@ -285,6 +286,13 @@ object AkkaUtils {
 | throughput = $akkaThroughput
 |   }
 | }
+|
+| remote {
+|   netty.tcp {
+| maximum-frame-size = $akkaMaximumFrameSize
 
 Review comment:
   does this change imply that `AkkaOptions.FRAMESIZE` is currently broken?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10252) Handle oversized metric messges

2018-10-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16651419#comment-16651419
 ] 

ASF GitHub Bot commented on FLINK-10252:


zentol commented on a change in pull request #6850: [FLINK-10252] Handle 
oversized metric messges
URL: https://github.com/apache/flink/pull/6850#discussion_r225474296
 
 

 ##
 File path: 
flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
 ##
 @@ -285,6 +286,13 @@ object AkkaUtils {
 | throughput = $akkaThroughput
 |   }
 | }
+|
+| remote {
+|   netty.tcp {
+| maximum-frame-size = $akkaMaximumFrameSize
 
 Review comment:
   does this change imply that `AkkaOptions.FRAMESIZE` is currently broken?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Handle oversized metric messges
> ---
>
> Key: FLINK-10252
> URL: https://issues.apache.org/jira/browse/FLINK-10252
> Project: Flink
>  Issue Type: Sub-task
>  Components: Metrics
>Affects Versions: 1.5.3, 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Assignee: vinoyang
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.7.0, 1.6.2, 1.5.5
>
>
> Since the {{MetricQueryService}} is implemented as an Akka actor, it can only 
> send messages of a smaller size then the current {{akka.framesize}}. We 
> should check similarly to FLINK-10251 whether the payload exceeds the maximum 
> framesize and fail fast if it is true.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] zentol commented on a change in pull request #6850: [FLINK-10252] Handle oversized metric messges

2018-10-16 Thread GitBox
zentol commented on a change in pull request #6850: [FLINK-10252] Handle 
oversized metric messges
URL: https://github.com/apache/flink/pull/6850#discussion_r225474296
 
 

 ##
 File path: 
flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
 ##
 @@ -285,6 +286,13 @@ object AkkaUtils {
 | throughput = $akkaThroughput
 |   }
 | }
+|
+| remote {
+|   netty.tcp {
+| maximum-frame-size = $akkaMaximumFrameSize
 
 Review comment:
   does this change imply that `AkkaOptions.FRAMESIZE` is currently broken, 
since it wasn't passed to akka?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10252) Handle oversized metric messges

2018-10-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16651420#comment-16651420
 ] 

ASF GitHub Bot commented on FLINK-10252:


zentol commented on a change in pull request #6850: [FLINK-10252] Handle 
oversized metric messges
URL: https://github.com/apache/flink/pull/6850#discussion_r225474296
 
 

 ##
 File path: 
flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
 ##
 @@ -285,6 +286,13 @@ object AkkaUtils {
 | throughput = $akkaThroughput
 |   }
 | }
+|
+| remote {
+|   netty.tcp {
+| maximum-frame-size = $akkaMaximumFrameSize
 
 Review comment:
   does this change imply that `AkkaOptions.FRAMESIZE` is currently broken, 
since it wasn't passed to akka?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Handle oversized metric messges
> ---
>
> Key: FLINK-10252
> URL: https://issues.apache.org/jira/browse/FLINK-10252
> Project: Flink
>  Issue Type: Sub-task
>  Components: Metrics
>Affects Versions: 1.5.3, 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Assignee: vinoyang
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.7.0, 1.6.2, 1.5.5
>
>
> Since the {{MetricQueryService}} is implemented as an Akka actor, it can only 
> send messages of a smaller size then the current {{akka.framesize}}. We 
> should check similarly to FLINK-10251 whether the payload exceeds the maximum 
> framesize and fail fast if it is true.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10563) Expose shaded Presto S3 filesystem under "s3p" scheme

2018-10-16 Thread Aljoscha Krettek (JIRA)
Aljoscha Krettek created FLINK-10563:


 Summary: Expose shaded Presto S3 filesystem under "s3p" scheme
 Key: FLINK-10563
 URL: https://issues.apache.org/jira/browse/FLINK-10563
 Project: Flink
  Issue Type: Improvement
Reporter: Aljoscha Krettek
Assignee: Aljoscha Krettek
 Fix For: 1.7.0


Currently, you can't use the shaded hadoop S3 filesystem and the presto S3 
filesystem at the same time. If we exposed the presto filesystem under an 
additional scheme we enable using both at the same time.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] zhijiangW commented on a change in pull request #6833: [FLINK-10537][network] Fix network small performance degradation after merging [FLINK-9913]

2018-10-16 Thread GitBox
zhijiangW commented on a change in pull request #6833: [FLINK-10537][network] 
Fix network small performance degradation after merging [FLINK-9913]
URL: https://github.com/apache/flink/pull/6833#discussion_r225475687
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
 ##
 @@ -127,6 +116,7 @@ public void reset() {
 
@Override
public void prune() {
+   serializationBuffer.clear();
 
 Review comment:
   Although I can not understand why it would affect the performance in theory, 
I agree with your point from the consistency of view  to keep this logic.
   
   Maybe it seems better set this position to 0 inside the 
`serializationBuffer.pruneBuffer`. It's up to you for the decision. :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10537) Network throughput performance regression after broadcast changes

2018-10-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10537?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16651425#comment-16651425
 ] 

ASF GitHub Bot commented on FLINK-10537:


zhijiangW commented on a change in pull request #6833: [FLINK-10537][network] 
Fix network small performance degradation after merging [FLINK-9913]
URL: https://github.com/apache/flink/pull/6833#discussion_r225475687
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
 ##
 @@ -127,6 +116,7 @@ public void reset() {
 
@Override
public void prune() {
+   serializationBuffer.clear();
 
 Review comment:
   Although I can not understand why it would affect the performance in theory, 
I agree with your point from the consistency of view  to keep this logic.
   
   Maybe it seems better set this position to 0 inside the 
`serializationBuffer.pruneBuffer`. It's up to you for the decision. :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Network throughput performance regression after broadcast changes
> -
>
> Key: FLINK-10537
> URL: https://issues.apache.org/jira/browse/FLINK-10537
> Project: Flink
>  Issue Type: Bug
>  Components: Network
>Affects Versions: 1.7.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>
> There is a slight network throughput regression introduced in: 
> https://issues.apache.org/jira/browse/FLINK-9913
> It is visible in the following benchmark:
> [http://codespeed.dak8s.net:8000/timeline/#/?exe=1&ben=networkThroughput.1,100ms&env=2&revs=200&equid=off&quarts=on&extr=on]
> (drop in the chart that happened since 21st September.)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] pnowojski commented on a change in pull request #6776: [FLINK-9715][table] Support temporal join with event time

2018-10-16 Thread GitBox
pnowojski commented on a change in pull request #6776: [FLINK-9715][table] 
Support temporal join with event time
URL: https://github.com/apache/flink/pull/6776#discussion_r225475981
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala
 ##
 @@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import java.lang.{Long => JLong}
+import java.util
+import java.util.Comparator
+
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.runtime.state.{VoidNamespace, VoidNamespaceSerializer}
+import org.apache.flink.streaming.api.SimpleTimerService
+import org.apache.flink.streaming.api.operators._
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.table.api.StreamQueryConfig
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.CRowWrappingCollector
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.typeutils.TypeCheckUtils._
+import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
+import org.apache.flink.util.Preconditions.checkState
+
+import scala.collection.JavaConversions._
+
+/**
+  * This operator works by keeping on the state collection of probe and build 
records to process
+  * on next watermark. The idea is that between watermarks we are collecting 
those elements
+  * and once we are sure that there will be no updates we emit the correct 
result and clean up the
+  * state.
+  *
+  * Cleaning up the state drops all of the "old" values from the probe side, 
where "old" is defined
+  * as older then the current watermark. Build side is also cleaned up in the 
similar fashion,
+  * however we always keep at least one record - the latest one - even if it's 
past the last
+  * watermark.
+  *
+  * One more trick is how the emitting results and cleaning up is triggered. 
It is achieved
+  * by registering timers for the keys. We could register a timer for every 
probe and build
+  * side element's event time (when watermark exceeds this timer, that's when 
we are emitting and/or
+  * cleaning up the state). However this would cause huge number of registered 
timers. For example
+  * with following evenTimes of probe records accumulated: {1, 2, 5, 8, 9}, if 
we
+  * had received Watermark(10), it would trigger 5 separate timers for the 
same key. To avoid that
+  * we always keep only one single registered timer for any given key, 
registered for the minimal
+  * value. Upon triggering it, we process all records with event times older 
then or equal to
+  * currentWatermark.
+  */
+class TemporalRowtimeJoin(
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+genJoinFuncName: String,
+genJoinFuncCode: String,
+queryConfig: StreamQueryConfig,
+leftTimeAttribute: Int,
+rightTimeAttribute: Int)
+  extends AbstractStreamOperator[CRow]
+  with TwoInputStreamOperator[CRow, CRow, CRow]
+  with Triggerable[Any, VoidNamespace]
+  with Compiler[FlatJoinFunction[Row, Row, Row]]
+  with Logging {
+
+  validateEqualsHashCode("join", leftType)
+  validateEqualsHashCode("join", rightType)
+
+  private val NEXT_LEFT_INDEX_STATE_NAME = "next-index"
+  private val LEFT_STATE_NAME = "left"
+  private val RIGHT_STATE_NAME = "right"
+  private val REGISTERED_TIMER_STATE_NAME = "timer"
+  private val TIMERS_STATE_NAME = "timers"
+
+  private val rightRowtimeComparator = new 
RowtimeComparator(rightTimeAttribute)
+
+  /**
+* Incremental index generator for `leftState`'s keys.
+*/
+  private var nextLeftIndex: ValueState[JLong] = _
+
+  /**
+* Mapping from artificial row index (generated by `nextLeftIndex`) into 
the left side `Row`.
+* We can not use List to accumulate Rows, because we need efficient 
deletes of the oldest rows.
+*
+* TODO: this could be OrderedMultiMap[Jlong, Row] indexed by row's 
timestamp, to avoid
+* full map traversals (if we have lots of rows on the sta

[jira] [Commented] (FLINK-9715) Support versioned joins with event time

2018-10-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16651426#comment-16651426
 ] 

ASF GitHub Bot commented on FLINK-9715:
---

pnowojski commented on a change in pull request #6776: [FLINK-9715][table] 
Support temporal join with event time
URL: https://github.com/apache/flink/pull/6776#discussion_r225475981
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala
 ##
 @@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.join
+
+import java.lang.{Long => JLong}
+import java.util
+import java.util.Comparator
+
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.runtime.state.{VoidNamespace, VoidNamespaceSerializer}
+import org.apache.flink.streaming.api.SimpleTimerService
+import org.apache.flink.streaming.api.operators._
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.table.api.StreamQueryConfig
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.CRowWrappingCollector
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.typeutils.TypeCheckUtils._
+import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
+import org.apache.flink.util.Preconditions.checkState
+
+import scala.collection.JavaConversions._
+
+/**
+  * This operator works by keeping on the state collection of probe and build 
records to process
+  * on next watermark. The idea is that between watermarks we are collecting 
those elements
+  * and once we are sure that there will be no updates we emit the correct 
result and clean up the
+  * state.
+  *
+  * Cleaning up the state drops all of the "old" values from the probe side, 
where "old" is defined
+  * as older then the current watermark. Build side is also cleaned up in the 
similar fashion,
+  * however we always keep at least one record - the latest one - even if it's 
past the last
+  * watermark.
+  *
+  * One more trick is how the emitting results and cleaning up is triggered. 
It is achieved
+  * by registering timers for the keys. We could register a timer for every 
probe and build
+  * side element's event time (when watermark exceeds this timer, that's when 
we are emitting and/or
+  * cleaning up the state). However this would cause huge number of registered 
timers. For example
+  * with following evenTimes of probe records accumulated: {1, 2, 5, 8, 9}, if 
we
+  * had received Watermark(10), it would trigger 5 separate timers for the 
same key. To avoid that
+  * we always keep only one single registered timer for any given key, 
registered for the minimal
+  * value. Upon triggering it, we process all records with event times older 
then or equal to
+  * currentWatermark.
+  */
+class TemporalRowtimeJoin(
+leftType: TypeInformation[Row],
+rightType: TypeInformation[Row],
+genJoinFuncName: String,
+genJoinFuncCode: String,
+queryConfig: StreamQueryConfig,
+leftTimeAttribute: Int,
+rightTimeAttribute: Int)
+  extends AbstractStreamOperator[CRow]
+  with TwoInputStreamOperator[CRow, CRow, CRow]
+  with Triggerable[Any, VoidNamespace]
+  with Compiler[FlatJoinFunction[Row, Row, Row]]
+  with Logging {
+
+  validateEqualsHashCode("join", leftType)
+  validateEqualsHashCode("join", rightType)
+
+  private val NEXT_LEFT_INDEX_STATE_NAME = "next-index"
+  private val LEFT_STATE_NAME = "left"
+  private val RIGHT_STATE_NAME = "right"
+  private val REGISTERED_TIMER_STATE_NAME = "timer"
+  private val TIMERS_STATE_NAME = "timers"
+
+  private val rightRowtimeComparator = new 
RowtimeComparator(rightTimeAttribute)
+
+  /**
+* Incremental index generator for `leftState`'s keys.
+*/
+  private var nextLeftIndex: ValueState[JLong] = _
+
+  /**
+* Mapping from artificial row index (generated by `nextLeftIndex`) into 
the left side `Row`.
+* 

[jira] [Updated] (FLINK-10563) Expose shaded Presto S3 filesystem under "s3p" scheme

2018-10-16 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10563?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-10563:
---
Labels: pull-request-available  (was: )

> Expose shaded Presto S3 filesystem under "s3p" scheme
> -
>
> Key: FLINK-10563
> URL: https://issues.apache.org/jira/browse/FLINK-10563
> Project: Flink
>  Issue Type: Improvement
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Currently, you can't use the shaded hadoop S3 filesystem and the presto S3 
> filesystem at the same time. If we exposed the presto filesystem under an 
> additional scheme we enable using both at the same time.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10563) Expose shaded Presto S3 filesystem under "s3p" scheme

2018-10-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10563?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16651430#comment-16651430
 ] 

ASF GitHub Bot commented on FLINK-10563:


aljoscha opened a new pull request #6855: [FLINK-10563] Expose shaded Presto S3 
filesystem under "s3p" scheme
URL: https://github.com/apache/flink/pull/6855
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Expose shaded Presto S3 filesystem under "s3p" scheme
> -
>
> Key: FLINK-10563
> URL: https://issues.apache.org/jira/browse/FLINK-10563
> Project: Flink
>  Issue Type: Improvement
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Currently, you can't use the shaded hadoop S3 filesystem and the presto S3 
> filesystem at the same time. If we exposed the presto filesystem under an 
> additional scheme we enable using both at the same time.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] aljoscha opened a new pull request #6855: [FLINK-10563] Expose shaded Presto S3 filesystem under "s3p" scheme

2018-10-16 Thread GitBox
aljoscha opened a new pull request #6855: [FLINK-10563] Expose shaded Presto S3 
filesystem under "s3p" scheme
URL: https://github.com/apache/flink/pull/6855
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


  1   2   3   4   5   >