[GitHub] flink pull request #3539: [FLINK-4256] Flip1: fine gained recovery
GitHub user shuai-xu opened a pull request: https://github.com/apache/flink/pull/3539 [FLINK-4256] Flip1: fine gained recovery This is an informal pr for the implementation of flip1 version 1. It enable that when a task fail, only restart the minimal pipelined connected executions instead of the whole execution graph. Main changes: 1. ExecutionGraph doesn't manage the failover any more, it only record the finished JobVertex number and turn to FINISHED when all vertexes finish(maybe later FailoverCoordinator will take over this). Its state can only be CREATED, RUNNING, FAILED, FINISHED or SUSPENDED now. 2. FailoverCoordinator will manage the failover now. It will generate several FailoverRegions when the EG is attached. It listens for the fail of executions. When an execution fail, it finds a FailoverRegion to finish the failover. 3. When JM need the EG to be canceled or failed, EG will also notice FailoverCoordinator, FailoverCoordinator will notice all FailoverRegions to cancel their executions and when all executions are canceled, FailoverCoordinator will notice EG to be CANCELED or FAILED. 4. FailoverCoordinator has server state, RUNNING, FAILING, CANCELLING, FAILED, CANCELED. 5. FailoverRegion contains the minimal pipelined connected executions and manager the failover of them. 6. FailoverRegion has CREATED, RUNNING, CANCELLING, CANCELLED. 7. One FailoverRegion may be the succeeding or preceding of others. When a preceding region failover, its all succeedings should failover too. And the succeedings should just reset its executions and wait for the preceding to start it when preceding finish. Preceding should wait for its succeedings to be CREATED and then schedule again. You can merge this pull request into a Git repository by running: $ git pull https://github.com/shuai-xu/flink jira-4256 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3539.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3539 commit 363f1536838064edbdd5f39e41f3f19f6c511fc4 Author: shuai.xus Date: 2017-03-15T03:36:11Z [FLINK-4256] Flip1: fine gained recovery --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4256) Fine-grained recovery
[ https://issues.apache.org/jira/browse/FLINK-4256?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15925615#comment-15925615 ] ASF GitHub Bot commented on FLINK-4256: --- GitHub user shuai-xu opened a pull request: https://github.com/apache/flink/pull/3539 [FLINK-4256] Flip1: fine gained recovery This is an informal pr for the implementation of flip1 version 1. It enable that when a task fail, only restart the minimal pipelined connected executions instead of the whole execution graph. Main changes: 1. ExecutionGraph doesn't manage the failover any more, it only record the finished JobVertex number and turn to FINISHED when all vertexes finish(maybe later FailoverCoordinator will take over this). Its state can only be CREATED, RUNNING, FAILED, FINISHED or SUSPENDED now. 2. FailoverCoordinator will manage the failover now. It will generate several FailoverRegions when the EG is attached. It listens for the fail of executions. When an execution fail, it finds a FailoverRegion to finish the failover. 3. When JM need the EG to be canceled or failed, EG will also notice FailoverCoordinator, FailoverCoordinator will notice all FailoverRegions to cancel their executions and when all executions are canceled, FailoverCoordinator will notice EG to be CANCELED or FAILED. 4. FailoverCoordinator has server state, RUNNING, FAILING, CANCELLING, FAILED, CANCELED. 5. FailoverRegion contains the minimal pipelined connected executions and manager the failover of them. 6. FailoverRegion has CREATED, RUNNING, CANCELLING, CANCELLED. 7. One FailoverRegion may be the succeeding or preceding of others. When a preceding region failover, its all succeedings should failover too. And the succeedings should just reset its executions and wait for the preceding to start it when preceding finish. Preceding should wait for its succeedings to be CREATED and then schedule again. You can merge this pull request into a Git repository by running: $ git pull https://github.com/shuai-xu/flink jira-4256 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3539.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3539 commit 363f1536838064edbdd5f39e41f3f19f6c511fc4 Author: shuai.xus Date: 2017-03-15T03:36:11Z [FLINK-4256] Flip1: fine gained recovery > Fine-grained recovery > - > > Key: FLINK-4256 > URL: https://issues.apache.org/jira/browse/FLINK-4256 > Project: Flink > Issue Type: Improvement > Components: JobManager >Affects Versions: 1.1.0 >Reporter: Stephan Ewen >Assignee: Stephan Ewen > > When a task fails during execution, Flink currently resets the entire > execution graph and triggers complete re-execution from the last completed > checkpoint. This is more expensive than just re-executing the failed tasks. > In many cases, more fine-grained recovery is possible. > The full description and design is in the corresponding FLIP. > https://cwiki.apache.org/confluence/display/FLINK/FLIP-1+%3A+Fine+Grained+Recovery+from+Task+Failures > The detail desgin for version1 is > https://docs.google.com/document/d/1_PqPLA1TJgjlqz8fqnVE3YSisYBDdFsrRX_URgRSj74/edit# -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5756) When there are many values under the same key in ListState, RocksDBStateBackend performances poor
[ https://issues.apache.org/jira/browse/FLINK-5756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15925592#comment-15925592 ] Wenlong Lyu commented on FLINK-5756: In RocksDB , the merge operation is processed in both compaction and get but not in merge. When merging two Slices by a StringAppendOperator, you will need to create a new string, which can be time costly when there are thousands of slice to merge. I think that is why it is slow to get the value after you added five thousand of items to List. If you call {{rocksdb.compactRange()}} before get, it will be quite quickly. In really application scenario, the compaction happens more often than what is in the test, and the performance will be much better in real environment except for in the extreme test scenario. > When there are many values under the same key in ListState, > RocksDBStateBackend performances poor > - > > Key: FLINK-5756 > URL: https://issues.apache.org/jira/browse/FLINK-5756 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.2.0 > Environment: CentOS 7.2 >Reporter: Syinchwun Leo > > When using RocksDB as the StateBackend, if there are many values under the > same key in ListState, the windowState.get() operator performances very poor. > I also the the RocksDB using version 4.11.2, the performance is also very > poor. The problem is likely to related to RocksDB itself's get() operator > after using merge(). The problem may influences the window operation's > performance when the size is very large using ListState. I try to merge 5 > values under the same key in RocksDB, It costs 120 seconds to execute get() > operation. > /// > The flink's code is as follows: > {code} > class SEventSource extends RichSourceFunction [SEvent] { > private var count = 0L > private val alphabet = > "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWZYX0987654321" > override def run(sourceContext: SourceContext[SEvent]): Unit = { > while (true) { > for (i <- 0 until 5000) { > sourceContext.collect(SEvent(1, "hello-"+count, alphabet,1)) > count += 1L > } > Thread.sleep(1000) > } > } > } > env.addSource(new SEventSource) > .assignTimestampsAndWatermarks(new > AssignerWithPeriodicWatermarks[SEvent] { > override def getCurrentWatermark: Watermark = { > new Watermark(System.currentTimeMillis()) > } > override def extractTimestamp(t: SEvent, l: Long): Long = { > System.currentTimeMillis() > } > }) > .keyBy(0) > .window(SlidingEventTimeWindows.of(Time.seconds(20), Time.seconds(2))) > .apply(new WindowStatistic) > .map(x => (System.currentTimeMillis(), x)) > .print() > {code} > > The RocksDB Test code: > {code} > val stringAppendOperator = new StringAppendOperator > val options = new Options() > options.setCompactionStyle(CompactionStyle.LEVEL) > .setCompressionType(CompressionType.SNAPPY_COMPRESSION) > .setLevelCompactionDynamicLevelBytes(true) > .setIncreaseParallelism(4) > .setUseFsync(true) > .setMaxOpenFiles(-1) > .setCreateIfMissing(true) > .setMergeOperator(stringAppendOperator) > val write_options = new WriteOptions > write_options.setSync(false) > val rocksDB = RocksDB.open(options, "/**/Data/") > val key = "key" > val value = > "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321" > val beginmerge = System.currentTimeMillis() > for(i <- 0 to 5) { > rocksDB.merge(key.getBytes(), ("s"+ i + value).getBytes()) > //rocksDB.put(key.getBytes, value.getBytes) > } > println("finish") > val begin = System.currentTimeMillis() > rocksDB.get(key.getBytes) > val end = System.currentTimeMillis() > println("merge cost:" + (begin - beginmerge)) > println("Time consuming:" + (end - begin)) > } > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6055) Supported setting timers on a Non-Keyed Stream
[ https://issues.apache.org/jira/browse/FLINK-6055?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15925575#comment-15925575 ] Xiaogang Shi commented on FLINK-6055: - I think it's very challenging because the storing and restoring of timers in non-keyed streams is very difficult. Do you have any idea? > Supported setting timers on a Non-Keyed Stream > -- > > Key: FLINK-6055 > URL: https://issues.apache.org/jira/browse/FLINK-6055 > Project: Flink > Issue Type: New Feature >Reporter: sunjincheng >Assignee: sunjincheng > > After [FLINK-4460] Allow ProcessFunction on non-keyed streams, I want > supported setting timers on a Non-Keyed Stream. What do you think? > [~aljoscha] -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6055) Supported setting timers on a Non-Keyed Stream
sunjincheng created FLINK-6055: -- Summary: Supported setting timers on a Non-Keyed Stream Key: FLINK-6055 URL: https://issues.apache.org/jira/browse/FLINK-6055 Project: Flink Issue Type: New Feature Reporter: sunjincheng Assignee: sunjincheng After [FLINK-4460] Allow ProcessFunction on non-keyed streams, I want supported setting timers on a Non-Keyed Stream. What do you think? [~aljoscha] -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3520: [FLINK-3849] [table] Add FilterableTableSource int...
Github user godfreyhe commented on a diff in the pull request: https://github.com/apache/flink/pull/3520#discussion_r106073030 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExtractor.scala --- @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.util + +import org.apache.calcite.plan.RelOptUtil +import org.apache.calcite.rex._ +import org.apache.calcite.sql.{SqlFunction, SqlPostfixOperator} +import org.apache.flink.table.api.TableException +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.expressions.{Expression, Literal, ResolvedFieldReference} +import org.apache.flink.table.validate.FunctionCatalog +import org.apache.flink.util.Preconditions + +import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ +import scala.collection.mutable +import scala.util.{Failure, Success, Try} + +object RexProgramExtractor { + + /** +* Extracts the indices of input fields which accessed by the RexProgram. +* +* @param rexProgram The RexProgram to analyze +* @return The indices of accessed input fields +*/ + def extractRefInputFields(rexProgram: RexProgram): Array[Int] = { +val visitor = new InputRefVisitor + +// extract referenced input fields from projections +rexProgram.getProjectList.foreach( + exp => rexProgram.expandLocalRef(exp).accept(visitor)) + +// extract referenced input fields from condition +val condition = rexProgram.getCondition +if (condition != null) { + rexProgram.expandLocalRef(condition).accept(visitor) +} + +visitor.getFields + } + + /** +* Extract condition from RexProgram and convert it into independent CNF expressions. +* +* @param rexProgram The RexProgram to analyze +* @return converted expressions as well as RexNodes which cannot be translated +*/ + def extractConjunctiveConditions( + rexProgram: RexProgram, + rexBuilder: RexBuilder, + catalog: FunctionCatalog): (Array[Expression], Array[RexNode]) = { + +rexProgram.getCondition match { + case condition: RexLocalRef => +val expanded = rexProgram.expandLocalRef(condition) +// converts the expanded expression to conjunctive normal form, +// like "(a AND b) OR c" will be converted to "(a OR c) AND (b OR c)" +val cnf = RexUtil.toCnf(rexBuilder, expanded) +// converts the cnf condition to a list of AND conditions +val conjunctions = RelOptUtil.conjunctions(cnf) + +val convertedExpressions = new mutable.ArrayBuffer[Expression] +val unconvertedRexNodes = new mutable.ArrayBuffer[RexNode] +val inputNames = rexProgram.getInputRowType.getFieldNames.asScala.toArray +val converter = new ConvertToExpression(inputNames, catalog) + +conjunctions.asScala.foreach(rex => { + rex.accept(converter) match { +case Some(expression) => convertedExpressions += expression +case None => unconvertedRexNodes += rex + } +}) +(convertedExpressions.toArray, unconvertedRexNodes.toArray) + + case _ => (Array.empty, Array.empty) +} + } +} + +/** + * An RexVisitor to extract all referenced input fields + */ +class InputRefVisitor extends RexVisitorImpl[Unit](true) { + + private var fields = mutable.LinkedHashSet[Int]() + + def getFields: Array[Int] = fields.toArray + + override def visitInputRef(inputRef: RexInputRef): Unit = +fields += inputRef.getIndex + + override def visitCall(call: RexCall): Unit = +call.operands.foreach(operand => operand.accept(this)) +} + +/** + * An RexVisitor to convert RexNode to Expres
[GitHub] flink pull request #3520: [FLINK-3849] [table] Add FilterableTableSource int...
Github user godfreyhe commented on a diff in the pull request: https://github.com/apache/flink/pull/3520#discussion_r106072764 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExtractor.scala --- @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.util + +import org.apache.calcite.plan.RelOptUtil +import org.apache.calcite.rex._ +import org.apache.calcite.sql.{SqlFunction, SqlPostfixOperator} +import org.apache.flink.table.api.TableException +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.expressions.{Expression, Literal, ResolvedFieldReference} +import org.apache.flink.table.validate.FunctionCatalog +import org.apache.flink.util.Preconditions + +import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ +import scala.collection.mutable +import scala.util.{Failure, Success, Try} + +object RexProgramExtractor { + + /** +* Extracts the indices of input fields which accessed by the RexProgram. +* +* @param rexProgram The RexProgram to analyze +* @return The indices of accessed input fields +*/ + def extractRefInputFields(rexProgram: RexProgram): Array[Int] = { +val visitor = new InputRefVisitor + +// extract referenced input fields from projections +rexProgram.getProjectList.foreach( + exp => rexProgram.expandLocalRef(exp).accept(visitor)) + +// extract referenced input fields from condition +val condition = rexProgram.getCondition +if (condition != null) { + rexProgram.expandLocalRef(condition).accept(visitor) +} + +visitor.getFields + } + + /** +* Extract condition from RexProgram and convert it into independent CNF expressions. +* +* @param rexProgram The RexProgram to analyze +* @return converted expressions as well as RexNodes which cannot be translated +*/ + def extractConjunctiveConditions( + rexProgram: RexProgram, + rexBuilder: RexBuilder, + catalog: FunctionCatalog): (Array[Expression], Array[RexNode]) = { + +rexProgram.getCondition match { + case condition: RexLocalRef => +val expanded = rexProgram.expandLocalRef(condition) +// converts the expanded expression to conjunctive normal form, +// like "(a AND b) OR c" will be converted to "(a OR c) AND (b OR c)" +val cnf = RexUtil.toCnf(rexBuilder, expanded) +// converts the cnf condition to a list of AND conditions +val conjunctions = RelOptUtil.conjunctions(cnf) + +val convertedExpressions = new mutable.ArrayBuffer[Expression] +val unconvertedRexNodes = new mutable.ArrayBuffer[RexNode] +val inputNames = rexProgram.getInputRowType.getFieldNames.asScala.toArray +val converter = new ConvertToExpression(inputNames, catalog) + +conjunctions.asScala.foreach(rex => { + rex.accept(converter) match { +case Some(expression) => convertedExpressions += expression +case None => unconvertedRexNodes += rex + } +}) +(convertedExpressions.toArray, unconvertedRexNodes.toArray) + + case _ => (Array.empty, Array.empty) +} + } +} + +/** + * An RexVisitor to extract all referenced input fields + */ +class InputRefVisitor extends RexVisitorImpl[Unit](true) { + + private var fields = mutable.LinkedHashSet[Int]() --- End diff -- `var` â `val` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INF
[GitHub] flink issue #3486: [FLINK-5981][SECURITY]make ssl version and cipher suites ...
Github user WangTaoTheTonic commented on the issue: https://github.com/apache/flink/pull/3486 @vijikarthi I've checked the [JDK doc](http://docs.oracle.com/javase/8/docs/technotes/guides/security/StandardNames.html#ciphersuites) and not found any notes about combination of ssl version and ciper suites. About cihper suites, it says `The following list contains the standard JSSE cipher suite names. Over time, various groups have added additional cipher suites to the SSL/TLS namespace. `, so i think we better not add additional description about that and let user to follow JRE/JDK rules. @StephanEwen Two comments you mentioned was fixed :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15925442#comment-15925442 ] ASF GitHub Bot commented on FLINK-3849: --- Github user godfreyhe commented on a diff in the pull request: https://github.com/apache/flink/pull/3520#discussion_r106072764 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExtractor.scala --- @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.util + +import org.apache.calcite.plan.RelOptUtil +import org.apache.calcite.rex._ +import org.apache.calcite.sql.{SqlFunction, SqlPostfixOperator} +import org.apache.flink.table.api.TableException +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.expressions.{Expression, Literal, ResolvedFieldReference} +import org.apache.flink.table.validate.FunctionCatalog +import org.apache.flink.util.Preconditions + +import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ +import scala.collection.mutable +import scala.util.{Failure, Success, Try} + +object RexProgramExtractor { + + /** +* Extracts the indices of input fields which accessed by the RexProgram. +* +* @param rexProgram The RexProgram to analyze +* @return The indices of accessed input fields +*/ + def extractRefInputFields(rexProgram: RexProgram): Array[Int] = { +val visitor = new InputRefVisitor + +// extract referenced input fields from projections +rexProgram.getProjectList.foreach( + exp => rexProgram.expandLocalRef(exp).accept(visitor)) + +// extract referenced input fields from condition +val condition = rexProgram.getCondition +if (condition != null) { + rexProgram.expandLocalRef(condition).accept(visitor) +} + +visitor.getFields + } + + /** +* Extract condition from RexProgram and convert it into independent CNF expressions. +* +* @param rexProgram The RexProgram to analyze +* @return converted expressions as well as RexNodes which cannot be translated +*/ + def extractConjunctiveConditions( + rexProgram: RexProgram, + rexBuilder: RexBuilder, + catalog: FunctionCatalog): (Array[Expression], Array[RexNode]) = { + +rexProgram.getCondition match { + case condition: RexLocalRef => +val expanded = rexProgram.expandLocalRef(condition) +// converts the expanded expression to conjunctive normal form, +// like "(a AND b) OR c" will be converted to "(a OR c) AND (b OR c)" +val cnf = RexUtil.toCnf(rexBuilder, expanded) +// converts the cnf condition to a list of AND conditions +val conjunctions = RelOptUtil.conjunctions(cnf) + +val convertedExpressions = new mutable.ArrayBuffer[Expression] +val unconvertedRexNodes = new mutable.ArrayBuffer[RexNode] +val inputNames = rexProgram.getInputRowType.getFieldNames.asScala.toArray +val converter = new ConvertToExpression(inputNames, catalog) + +conjunctions.asScala.foreach(rex => { + rex.accept(converter) match { +case Some(expression) => convertedExpressions += expression +case None => unconvertedRexNodes += rex + } +}) +(convertedExpressions.toArray, unconvertedRexNodes.toArray) + + case _ => (Array.empty, Array.empty) +} + } +} + +/** + * An RexVisitor to extract all referenced input fields + */ +class InputRefVisitor extends RexVisitorImpl[Unit](true) { + + private var fields = mutable.LinkedHashSet[Int]() --- End diff -- `var` → `val` > Add FilterableTableSource interface and translation rule
[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule
[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15925441#comment-15925441 ] ASF GitHub Bot commented on FLINK-3849: --- Github user godfreyhe commented on a diff in the pull request: https://github.com/apache/flink/pull/3520#discussion_r106073030 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExtractor.scala --- @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.util + +import org.apache.calcite.plan.RelOptUtil +import org.apache.calcite.rex._ +import org.apache.calcite.sql.{SqlFunction, SqlPostfixOperator} +import org.apache.flink.table.api.TableException +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.expressions.{Expression, Literal, ResolvedFieldReference} +import org.apache.flink.table.validate.FunctionCatalog +import org.apache.flink.util.Preconditions + +import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ +import scala.collection.mutable +import scala.util.{Failure, Success, Try} + +object RexProgramExtractor { + + /** +* Extracts the indices of input fields which accessed by the RexProgram. +* +* @param rexProgram The RexProgram to analyze +* @return The indices of accessed input fields +*/ + def extractRefInputFields(rexProgram: RexProgram): Array[Int] = { +val visitor = new InputRefVisitor + +// extract referenced input fields from projections +rexProgram.getProjectList.foreach( + exp => rexProgram.expandLocalRef(exp).accept(visitor)) + +// extract referenced input fields from condition +val condition = rexProgram.getCondition +if (condition != null) { + rexProgram.expandLocalRef(condition).accept(visitor) +} + +visitor.getFields + } + + /** +* Extract condition from RexProgram and convert it into independent CNF expressions. +* +* @param rexProgram The RexProgram to analyze +* @return converted expressions as well as RexNodes which cannot be translated +*/ + def extractConjunctiveConditions( + rexProgram: RexProgram, + rexBuilder: RexBuilder, + catalog: FunctionCatalog): (Array[Expression], Array[RexNode]) = { + +rexProgram.getCondition match { + case condition: RexLocalRef => +val expanded = rexProgram.expandLocalRef(condition) +// converts the expanded expression to conjunctive normal form, +// like "(a AND b) OR c" will be converted to "(a OR c) AND (b OR c)" +val cnf = RexUtil.toCnf(rexBuilder, expanded) +// converts the cnf condition to a list of AND conditions +val conjunctions = RelOptUtil.conjunctions(cnf) + +val convertedExpressions = new mutable.ArrayBuffer[Expression] +val unconvertedRexNodes = new mutable.ArrayBuffer[RexNode] +val inputNames = rexProgram.getInputRowType.getFieldNames.asScala.toArray +val converter = new ConvertToExpression(inputNames, catalog) + +conjunctions.asScala.foreach(rex => { + rex.accept(converter) match { +case Some(expression) => convertedExpressions += expression +case None => unconvertedRexNodes += rex + } +}) +(convertedExpressions.toArray, unconvertedRexNodes.toArray) + + case _ => (Array.empty, Array.empty) +} + } +} + +/** + * An RexVisitor to extract all referenced input fields + */ +class InputRefVisitor extends RexVisitorImpl[Unit](true) { + + private var fields = mutable.LinkedHashSet[Int]() + + def getFields: Array[Int] = fields.toArray + + override def visitInputRef(inputRef
[jira] [Updated] (FLINK-6039) Row of TableFunction should support flexible number of fields
[ https://issues.apache.org/jira/browse/FLINK-6039?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kurt Young updated FLINK-6039: -- Component/s: Table API & SQL > Row of TableFunction should support flexible number of fields > - > > Key: FLINK-6039 > URL: https://issues.apache.org/jira/browse/FLINK-6039 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Zhuoluo Yang >Assignee: Zhuoluo Yang > > In actual world, especially while processing logs with TableFunction. The > formats of the logs in actual world are flexible. Thus, the number of fields > should not be fixed. > For examples, we should make the three following types of of TableFunction > work. > {code} > // Test for incomplete row > class TableFunc4 extends TableFunction[Row] { > def eval(str: String): Unit = { > if (str.contains("#")) { > str.split("#").foreach({ s => > val row = new Row(3) > row.setField(0, s) // And we only set values for one column > collect(row) > }) > } > } > override def getResultType: TypeInformation[Row] = { > new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, > BasicTypeInfo.INT_TYPE_INFO, > BasicTypeInfo.INT_TYPE_INFO) > } > } > // Test for incomplete row > class TableFunc5 extends TableFunction[Row] { > def eval(str: String): Unit = { > if (str.contains("#")) { > str.split("#").foreach({ s => > val row = new Row(1) // ResultType is three columns, we have only > one here > row.setField(0, s) > collect(row) > }) > } > } > override def getResultType: TypeInformation[Row] = { > new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, > BasicTypeInfo.INT_TYPE_INFO, > BasicTypeInfo.INT_TYPE_INFO) > } > } > // Test for overflow row > class TableFunc6 extends TableFunction[Row] { > def eval(str: String): Unit = { > if (str.contains("#")) { > str.split("#").foreach({ s => > val row = new Row(5) // ResultType is two columns, we have five > columns here > row.setField(0, s) > row.setField(1, s.length) > row.setField(2, s.length) > row.setField(3, s.length) > row.setField(4, s.length) > collect(row) > }) > } > } > override def getResultType: TypeInformation[Row] = { > new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, > BasicTypeInfo.INT_TYPE_INFO) > } > } > {code} > Actually, the TableFunc4 and TableFunc6 has already worked correctly with > current version. This issue will make TableFunc5 works. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-6036) Let catalog support partition
[ https://issues.apache.org/jira/browse/FLINK-6036?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] jingzhang updated FLINK-6036: - Description: Now catalog only support CRUD at database and table level. But in some kind of catalog, for example for hive, we also need do CRUD operations at partition level. This issue aims to let catalog support partition. was: Now catalog only support CRUD at database and table level. But in some kind of catalog, for example for hive, we also need do CRUD operations on partition level. This issue aims to let catalog support partition. > Let catalog support partition > - > > Key: FLINK-6036 > URL: https://issues.apache.org/jira/browse/FLINK-6036 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: jingzhang >Assignee: jingzhang > > Now catalog only support CRUD at database and table level. But in some kind > of catalog, for example for hive, we also need do CRUD operations at > partition level. > This issue aims to let catalog support partition. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-6036) Let catalog support partition
[ https://issues.apache.org/jira/browse/FLINK-6036?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] jingzhang updated FLINK-6036: - Description: Now catalog only support CRUD at database and table level. But in some kind of catalog, for example for hive, we also need do CRUD operations on partition level. This issue aims to let catalog support partition. > Let catalog support partition > - > > Key: FLINK-6036 > URL: https://issues.apache.org/jira/browse/FLINK-6036 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: jingzhang > > Now catalog only support CRUD at database and table level. But in some kind > of catalog, for example for hive, we also need do CRUD operations on > partition level. > This issue aims to let catalog support partition. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Assigned] (FLINK-6036) Let catalog support partition
[ https://issues.apache.org/jira/browse/FLINK-6036?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] jingzhang reassigned FLINK-6036: Assignee: jingzhang > Let catalog support partition > - > > Key: FLINK-6036 > URL: https://issues.apache.org/jira/browse/FLINK-6036 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: jingzhang >Assignee: jingzhang > > Now catalog only support CRUD at database and table level. But in some kind > of catalog, for example for hive, we also need do CRUD operations on > partition level. > This issue aims to let catalog support partition. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5981) SSL version and ciper suites cannot be constrained as configured
[ https://issues.apache.org/jira/browse/FLINK-5981?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15925423#comment-15925423 ] ASF GitHub Bot commented on FLINK-5981: --- Github user WangTaoTheTonic commented on the issue: https://github.com/apache/flink/pull/3486 @vijikarthi I've checked the [JDK doc](http://docs.oracle.com/javase/8/docs/technotes/guides/security/StandardNames.html#ciphersuites) and not found any notes about combination of ssl version and ciper suites. About cihper suites, it says `The following list contains the standard JSSE cipher suite names. Over time, various groups have added additional cipher suites to the SSL/TLS namespace. `, so i think we better not add additional description about that and let user to follow JRE/JDK rules. @StephanEwen Two comments you mentioned was fixed :) > SSL version and ciper suites cannot be constrained as configured > > > Key: FLINK-5981 > URL: https://issues.apache.org/jira/browse/FLINK-5981 > Project: Flink > Issue Type: Bug > Components: Security >Reporter: Tao Wang >Assignee: Tao Wang > > I configured ssl and start flink job, but found configured properties cannot > apply properly: > akka port: only ciper suites apply right, ssl version not > blob server/netty server: both ssl version and ciper suites are not like what > I configured > I've found out the reason why: > http://stackoverflow.com/questions/11504173/sslcontext-initialization (for > blob server and netty server) > https://groups.google.com/forum/#!topic/akka-user/JH6bGnWE8kY(for akka ssl > version, it's fixed in akka 2.4:https://github.com/akka/akka/pull/21078) > I'll fix the issue on blob server and netty server, and it seems like only > upgrade for akka can solve issue in akka side(we'll consider later as upgrade > is not a small action). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Comment Edited] (FLINK-5756) When there are many values under the same key in ListState, RocksDBStateBackend performances poor
[ https://issues.apache.org/jira/browse/FLINK-5756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15925393#comment-15925393 ] Syinchwun Leo edited comment on FLINK-5756 at 3/15/17 1:43 AM: --- Is it possible that avoiding using merge() operation. I notice that the result of RocksDB's get() is a byte array. My point is that when calling add() method of RocksDBListState, call get() first and get byte array, then append new value's serialized byte[] to byte array, then set back to Rocks. The method make it is possible there is only one byte[] under the key. I haven't test the idea, maybe the performance is not perfect and awkward. was (Author: syinchwunleo): Is it possible that avoiding using merge() operation. I notice that the result of RocksDB's get() is a byte array. My point is that when calling add() method of RocksDBListState, call get() first and get byte array, then append new value's serialized byte[] to byte array, then set to Rocks. I haven't test the idea, maybe the performance is not perfect and awkward. > When there are many values under the same key in ListState, > RocksDBStateBackend performances poor > - > > Key: FLINK-5756 > URL: https://issues.apache.org/jira/browse/FLINK-5756 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.2.0 > Environment: CentOS 7.2 >Reporter: Syinchwun Leo > > When using RocksDB as the StateBackend, if there are many values under the > same key in ListState, the windowState.get() operator performances very poor. > I also the the RocksDB using version 4.11.2, the performance is also very > poor. The problem is likely to related to RocksDB itself's get() operator > after using merge(). The problem may influences the window operation's > performance when the size is very large using ListState. I try to merge 5 > values under the same key in RocksDB, It costs 120 seconds to execute get() > operation. > /// > The flink's code is as follows: > {code} > class SEventSource extends RichSourceFunction [SEvent] { > private var count = 0L > private val alphabet = > "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWZYX0987654321" > override def run(sourceContext: SourceContext[SEvent]): Unit = { > while (true) { > for (i <- 0 until 5000) { > sourceContext.collect(SEvent(1, "hello-"+count, alphabet,1)) > count += 1L > } > Thread.sleep(1000) > } > } > } > env.addSource(new SEventSource) > .assignTimestampsAndWatermarks(new > AssignerWithPeriodicWatermarks[SEvent] { > override def getCurrentWatermark: Watermark = { > new Watermark(System.currentTimeMillis()) > } > override def extractTimestamp(t: SEvent, l: Long): Long = { > System.currentTimeMillis() > } > }) > .keyBy(0) > .window(SlidingEventTimeWindows.of(Time.seconds(20), Time.seconds(2))) > .apply(new WindowStatistic) > .map(x => (System.currentTimeMillis(), x)) > .print() > {code} > > The RocksDB Test code: > {code} > val stringAppendOperator = new StringAppendOperator > val options = new Options() > options.setCompactionStyle(CompactionStyle.LEVEL) > .setCompressionType(CompressionType.SNAPPY_COMPRESSION) > .setLevelCompactionDynamicLevelBytes(true) > .setIncreaseParallelism(4) > .setUseFsync(true) > .setMaxOpenFiles(-1) > .setCreateIfMissing(true) > .setMergeOperator(stringAppendOperator) > val write_options = new WriteOptions > write_options.setSync(false) > val rocksDB = RocksDB.open(options, "/**/Data/") > val key = "key" > val value = > "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321" > val beginmerge = System.currentTimeMillis() > for(i <- 0 to 5) { > rocksDB.merge(key.getBytes(), ("s"+ i + value).getBytes()) > //rocksDB.put(key.getBytes, value.getBytes) > } > println("finish") > val begin = System.currentTimeMillis() > rocksDB.get(key.getBytes) > val end = System.currentTimeMillis() > println("merge cost:" + (begin - beginmerge)) > println("Time consuming:" + (end - begin)) > } > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5756) When there are many values under the same key in ListState, RocksDBStateBackend performances poor
[ https://issues.apache.org/jira/browse/FLINK-5756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15925393#comment-15925393 ] Syinchwun Leo commented on FLINK-5756: -- Is it possible that avoiding using merge() operation. I notice that the result of RocksDB's get() is a byte array. My point is that when calling add() method of RocksDBListState, call get() first and get byte array, then append new value's serialized byte[] to byte array, then set to Rocks. I haven't test the idea, maybe the performance is not perfect and awkward. > When there are many values under the same key in ListState, > RocksDBStateBackend performances poor > - > > Key: FLINK-5756 > URL: https://issues.apache.org/jira/browse/FLINK-5756 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.2.0 > Environment: CentOS 7.2 >Reporter: Syinchwun Leo > > When using RocksDB as the StateBackend, if there are many values under the > same key in ListState, the windowState.get() operator performances very poor. > I also the the RocksDB using version 4.11.2, the performance is also very > poor. The problem is likely to related to RocksDB itself's get() operator > after using merge(). The problem may influences the window operation's > performance when the size is very large using ListState. I try to merge 5 > values under the same key in RocksDB, It costs 120 seconds to execute get() > operation. > /// > The flink's code is as follows: > {code} > class SEventSource extends RichSourceFunction [SEvent] { > private var count = 0L > private val alphabet = > "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWZYX0987654321" > override def run(sourceContext: SourceContext[SEvent]): Unit = { > while (true) { > for (i <- 0 until 5000) { > sourceContext.collect(SEvent(1, "hello-"+count, alphabet,1)) > count += 1L > } > Thread.sleep(1000) > } > } > } > env.addSource(new SEventSource) > .assignTimestampsAndWatermarks(new > AssignerWithPeriodicWatermarks[SEvent] { > override def getCurrentWatermark: Watermark = { > new Watermark(System.currentTimeMillis()) > } > override def extractTimestamp(t: SEvent, l: Long): Long = { > System.currentTimeMillis() > } > }) > .keyBy(0) > .window(SlidingEventTimeWindows.of(Time.seconds(20), Time.seconds(2))) > .apply(new WindowStatistic) > .map(x => (System.currentTimeMillis(), x)) > .print() > {code} > > The RocksDB Test code: > {code} > val stringAppendOperator = new StringAppendOperator > val options = new Options() > options.setCompactionStyle(CompactionStyle.LEVEL) > .setCompressionType(CompressionType.SNAPPY_COMPRESSION) > .setLevelCompactionDynamicLevelBytes(true) > .setIncreaseParallelism(4) > .setUseFsync(true) > .setMaxOpenFiles(-1) > .setCreateIfMissing(true) > .setMergeOperator(stringAppendOperator) > val write_options = new WriteOptions > write_options.setSync(false) > val rocksDB = RocksDB.open(options, "/**/Data/") > val key = "key" > val value = > "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321" > val beginmerge = System.currentTimeMillis() > for(i <- 0 to 5) { > rocksDB.merge(key.getBytes(), ("s"+ i + value).getBytes()) > //rocksDB.put(key.getBytes, value.getBytes) > } > println("finish") > val begin = System.currentTimeMillis() > rocksDB.get(key.getBytes) > val end = System.currentTimeMillis() > println("merge cost:" + (begin - beginmerge)) > println("Time consuming:" + (end - begin)) > } > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6040) DataStreamUserDefinedFunctionITCase occasionally fails
[ https://issues.apache.org/jira/browse/FLINK-6040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15925373#comment-15925373 ] ASF GitHub Bot commented on FLINK-6040: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3530 > DataStreamUserDefinedFunctionITCase occasionally fails > -- > > Key: FLINK-6040 > URL: https://issues.apache.org/jira/browse/FLINK-6040 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.3.0 >Reporter: Zhuoluo Yang >Assignee: Zhuoluo Yang >Priority: Trivial > Labels: test > Fix For: 1.3.0 > > > Three test cases in DataStreamUserDefinedFunctionITCase forgot to call the > StreamITCase.clear method. This will cause it occasionally fails. Because the > result of one case may affect another sometimes. > {code} > java.lang.AssertionError: > Expected :MutableList(Anna#44,1, Anna#44,2, Anna#44,Anna#44, Jack#22,1, > Jack#22,2, Jack#22,Jack#22, John#19,1, John#19,2, John#19,John#19, nosharp,1, > nosharp,2, nosharp,nosharp) > Actual :MutableList(Anna#44,1, Anna#44,2, Anna#44,Anna#44, Jack#22,1, > Jack#22,2, Jack#22,Jack#22, John#19,1, John#19,2, John#19,John#19, > Miller,13.56, Smith,180.2, Williams,4.68, Williams,69.0, nosharp,1, > nosharp,2, nosharp,nosharp) > at org.junit.Assert.fail(Assert.java:88) > at org.junit.Assert.failNotEquals(Assert.java:834) > at org.junit.Assert.assertEquals(Assert.java:118) > at org.junit.Assert.assertEquals(Assert.java:144) > at > org.apache.flink.table.runtime.datastream.DataStreamUserDefinedFunctionITCase.testTableFunctionWithVariableArguments(DataStreamUserDefinedFunctionITCase.scala:226) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3530: [FLINK-6040] [table] DataStreamUserDefinedFunction...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3530 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Closed] (FLINK-6040) DataStreamUserDefinedFunctionITCase occasionally fails
[ https://issues.apache.org/jira/browse/FLINK-6040?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kurt Young closed FLINK-6040. - Resolution: Fixed > DataStreamUserDefinedFunctionITCase occasionally fails > -- > > Key: FLINK-6040 > URL: https://issues.apache.org/jira/browse/FLINK-6040 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.3.0 >Reporter: Zhuoluo Yang >Assignee: Zhuoluo Yang >Priority: Trivial > Labels: test > Fix For: 1.3.0 > > > Three test cases in DataStreamUserDefinedFunctionITCase forgot to call the > StreamITCase.clear method. This will cause it occasionally fails. Because the > result of one case may affect another sometimes. > {code} > java.lang.AssertionError: > Expected :MutableList(Anna#44,1, Anna#44,2, Anna#44,Anna#44, Jack#22,1, > Jack#22,2, Jack#22,Jack#22, John#19,1, John#19,2, John#19,John#19, nosharp,1, > nosharp,2, nosharp,nosharp) > Actual :MutableList(Anna#44,1, Anna#44,2, Anna#44,Anna#44, Jack#22,1, > Jack#22,2, Jack#22,Jack#22, John#19,1, John#19,2, John#19,John#19, > Miller,13.56, Smith,180.2, Williams,4.68, Williams,69.0, nosharp,1, > nosharp,2, nosharp,nosharp) > at org.junit.Assert.fail(Assert.java:88) > at org.junit.Assert.failNotEquals(Assert.java:834) > at org.junit.Assert.assertEquals(Assert.java:118) > at org.junit.Assert.assertEquals(Assert.java:144) > at > org.apache.flink.table.runtime.datastream.DataStreamUserDefinedFunctionITCase.testTableFunctionWithVariableArguments(DataStreamUserDefinedFunctionITCase.scala:226) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3535: [FLINK-5713] Protect against NPE in WindowOperator...
Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3535#discussion_r106065855 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java --- @@ -354,22 +354,27 @@ public void merge(W mergeResult, } }); - // drop if the window is already late - if (isLate(actualWindow)) { - mergingWindows.retireWindow(actualWindow); - continue; - } + context.key = key; + context.window = actualWindow; W stateWindow = mergingWindows.getStateWindow(actualWindow); if (stateWindow == null) { throw new IllegalStateException("Window " + window + " is not in in-flight window set."); } windowState.setCurrentNamespace(stateWindow); - windowState.add(element.getValue()); - context.key = key; - context.window = actualWindow; + // Drop if the window is already late. In rare cases (with a misbehaving + // WindowAssigner) it can happen that a window becomes late that already has + // state (contents, state and timers). That's why we first get the window state + // above and then drop everything. + if (isLate(actualWindow)) { + clearAllState(actualWindow, windowState, mergingWindows); + mergingWindows.persist(); --- End diff -- Yesï¼in factï¼I had seen that, but I did not realize that it could be deleted.Haha, You are very quick-witted.:) Anyway, I must say thanks for your explaining. Best, SunJincheng --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5713) Protect against NPE in WindowOperator window cleanup
[ https://issues.apache.org/jira/browse/FLINK-5713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15925371#comment-15925371 ] ASF GitHub Bot commented on FLINK-5713: --- Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3535#discussion_r106065855 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java --- @@ -354,22 +354,27 @@ public void merge(W mergeResult, } }); - // drop if the window is already late - if (isLate(actualWindow)) { - mergingWindows.retireWindow(actualWindow); - continue; - } + context.key = key; + context.window = actualWindow; W stateWindow = mergingWindows.getStateWindow(actualWindow); if (stateWindow == null) { throw new IllegalStateException("Window " + window + " is not in in-flight window set."); } windowState.setCurrentNamespace(stateWindow); - windowState.add(element.getValue()); - context.key = key; - context.window = actualWindow; + // Drop if the window is already late. In rare cases (with a misbehaving + // WindowAssigner) it can happen that a window becomes late that already has + // state (contents, state and timers). That's why we first get the window state + // above and then drop everything. + if (isLate(actualWindow)) { + clearAllState(actualWindow, windowState, mergingWindows); + mergingWindows.persist(); --- End diff -- Yes,in fact,I had seen that, but I did not realize that it could be deleted.Haha, You are very quick-witted.:) Anyway, I must say thanks for your explaining. Best, SunJincheng > Protect against NPE in WindowOperator window cleanup > > > Key: FLINK-5713 > URL: https://issues.apache.org/jira/browse/FLINK-5713 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.2.0 >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek > Fix For: 1.2.1 > > > Some (misbehaved) WindowAssigners can cause windows to be dropped from the > merging window set while a cleanup timer is still active. This will trigger a > NullPointerException when that timer fires. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5756) When there are many values under the same key in ListState, RocksDBStateBackend performances poor
[ https://issues.apache.org/jira/browse/FLINK-5756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15925364#comment-15925364 ] Syinchwun Leo commented on FLINK-5756: -- OK, this problem is not only influence the performance of UDF windows but also the checkpoint. Poor window performance leads to many tuples waiting for being processed in IO buffer and the barrier could not be processed timely. This may result in failure of checkpoints. > When there are many values under the same key in ListState, > RocksDBStateBackend performances poor > - > > Key: FLINK-5756 > URL: https://issues.apache.org/jira/browse/FLINK-5756 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.2.0 > Environment: CentOS 7.2 >Reporter: Syinchwun Leo > > When using RocksDB as the StateBackend, if there are many values under the > same key in ListState, the windowState.get() operator performances very poor. > I also the the RocksDB using version 4.11.2, the performance is also very > poor. The problem is likely to related to RocksDB itself's get() operator > after using merge(). The problem may influences the window operation's > performance when the size is very large using ListState. I try to merge 5 > values under the same key in RocksDB, It costs 120 seconds to execute get() > operation. > /// > The flink's code is as follows: > {code} > class SEventSource extends RichSourceFunction [SEvent] { > private var count = 0L > private val alphabet = > "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWZYX0987654321" > override def run(sourceContext: SourceContext[SEvent]): Unit = { > while (true) { > for (i <- 0 until 5000) { > sourceContext.collect(SEvent(1, "hello-"+count, alphabet,1)) > count += 1L > } > Thread.sleep(1000) > } > } > } > env.addSource(new SEventSource) > .assignTimestampsAndWatermarks(new > AssignerWithPeriodicWatermarks[SEvent] { > override def getCurrentWatermark: Watermark = { > new Watermark(System.currentTimeMillis()) > } > override def extractTimestamp(t: SEvent, l: Long): Long = { > System.currentTimeMillis() > } > }) > .keyBy(0) > .window(SlidingEventTimeWindows.of(Time.seconds(20), Time.seconds(2))) > .apply(new WindowStatistic) > .map(x => (System.currentTimeMillis(), x)) > .print() > {code} > > The RocksDB Test code: > {code} > val stringAppendOperator = new StringAppendOperator > val options = new Options() > options.setCompactionStyle(CompactionStyle.LEVEL) > .setCompressionType(CompressionType.SNAPPY_COMPRESSION) > .setLevelCompactionDynamicLevelBytes(true) > .setIncreaseParallelism(4) > .setUseFsync(true) > .setMaxOpenFiles(-1) > .setCreateIfMissing(true) > .setMergeOperator(stringAppendOperator) > val write_options = new WriteOptions > write_options.setSync(false) > val rocksDB = RocksDB.open(options, "/**/Data/") > val key = "key" > val value = > "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321" > val beginmerge = System.currentTimeMillis() > for(i <- 0 to 5) { > rocksDB.merge(key.getBytes(), ("s"+ i + value).getBytes()) > //rocksDB.put(key.getBytes, value.getBytes) > } > println("finish") > val begin = System.currentTimeMillis() > rocksDB.get(key.getBytes) > val end = System.currentTimeMillis() > println("merge cost:" + (begin - beginmerge)) > println("Time consuming:" + (end - begin)) > } > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6040) DataStreamUserDefinedFunctionITCase occasionally fails
[ https://issues.apache.org/jira/browse/FLINK-6040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15925359#comment-15925359 ] ASF GitHub Bot commented on FLINK-6040: --- Github user KurtYoung commented on the issue: https://github.com/apache/flink/pull/3530 looks good, merging > DataStreamUserDefinedFunctionITCase occasionally fails > -- > > Key: FLINK-6040 > URL: https://issues.apache.org/jira/browse/FLINK-6040 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.3.0 >Reporter: Zhuoluo Yang >Assignee: Zhuoluo Yang >Priority: Trivial > Labels: test > Fix For: 1.3.0 > > > Three test cases in DataStreamUserDefinedFunctionITCase forgot to call the > StreamITCase.clear method. This will cause it occasionally fails. Because the > result of one case may affect another sometimes. > {code} > java.lang.AssertionError: > Expected :MutableList(Anna#44,1, Anna#44,2, Anna#44,Anna#44, Jack#22,1, > Jack#22,2, Jack#22,Jack#22, John#19,1, John#19,2, John#19,John#19, nosharp,1, > nosharp,2, nosharp,nosharp) > Actual :MutableList(Anna#44,1, Anna#44,2, Anna#44,Anna#44, Jack#22,1, > Jack#22,2, Jack#22,Jack#22, John#19,1, John#19,2, John#19,John#19, > Miller,13.56, Smith,180.2, Williams,4.68, Williams,69.0, nosharp,1, > nosharp,2, nosharp,nosharp) > at org.junit.Assert.fail(Assert.java:88) > at org.junit.Assert.failNotEquals(Assert.java:834) > at org.junit.Assert.assertEquals(Assert.java:118) > at org.junit.Assert.assertEquals(Assert.java:144) > at > org.apache.flink.table.runtime.datastream.DataStreamUserDefinedFunctionITCase.testTableFunctionWithVariableArguments(DataStreamUserDefinedFunctionITCase.scala:226) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3530: [FLINK-6040] [table] DataStreamUserDefinedFunctionITCase ...
Github user KurtYoung commented on the issue: https://github.com/apache/flink/pull/3530 looks good, merging --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6053) Gauge should only take subclasses of Number, rather than everything
[ https://issues.apache.org/jira/browse/FLINK-6053?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15925257#comment-15925257 ] Bowen Li commented on FLINK-6053: - hm I found in Flink's Gauge class is used like this in `org.apache.flink.runtime.checkpoint.CheckpointStatsTracker`, which doesn't make much sense to me ``` private class LatestCompletedCheckpointExternalPathGauge implements Gauge { @Override public String getValue() { CompletedCheckpointStats completed = latestSnapshot.getHistory().getLatestCompletedCheckpoint(); if (completed != null) { return completed.getExternalPath(); } else { return "n/a"; } } } ``` > Gauge should only take subclasses of Number, rather than everything > -- > > Key: FLINK-6053 > URL: https://issues.apache.org/jira/browse/FLINK-6053 > Project: Flink > Issue Type: Improvement > Components: Metrics >Affects Versions: 1.2.0 >Reporter: Bowen Li > Fix For: 1.3.0 > > > Currently, Flink's Gauge is defined as > ```java > public interface Gauge extends Metric { > T getValue(); > } > ``` > But it doesn't make sense to have Gauge take generic types other than Number. > And it blocks I from finishing FLINK-6013, because I cannot assume Gauge is > only about Number. So the class should be like > ```java > public interface Gauge extends Metric { > T getValue(); > } > ``` -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6054) Add new state backend that dynamically stores data in memory and external storage
Sergio Esteves created FLINK-6054: - Summary: Add new state backend that dynamically stores data in memory and external storage Key: FLINK-6054 URL: https://issues.apache.org/jira/browse/FLINK-6054 Project: Flink Issue Type: New Feature Components: State Backends, Checkpointing Reporter: Sergio Esteves Priority: Minor This feature would be useful for memory-intensive applications that need to maintain state for long periods of time; e.g., event-time streaming application with long-lived windows that tolerate large amounts of lateness. This feature would allow to scale the state and, in the example above, tolerate a very large (possibly unbounded) amount of lateness, which can be useful in a set of scenarios, like the one of Photon in the Google Advertising System (white paper: "Photon: Fault-tolerant and Scalable Joining of Continuous Data Streams"). In a nutshell, the idea would be to have a quota for the maximum memory that a state cell (different keys and namespaces) can occupy. When that quota gets fully occupied, new state data would be written out to disk. Then, when state needs to be retrieved, data is read entirely from memory - persisted data is loaded into memory in the background at the same time that data pertaining to the quota is being fetched (this reduces I/O overhead). Different policies, defining when to offload/load data from/to memory, can be implemented to govern the overall memory utilization. We already have a preliminary implementation with promising results in terms of memory savings (in the context of streaming applications with windows that tolerate lateness). More details are to be given soon through a design document. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-6053) Gauge should only take subclasses of Number, rather than everything
[ https://issues.apache.org/jira/browse/FLINK-6053?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bowen Li updated FLINK-6053: Description: Currently, Flink's Gauge is defined as ```java public interface Gauge extends Metric { T getValue(); } ``` But it doesn't make sense to have Gauge take generic types other than Number. And it blocks I from finishing FLINK-6013, because I cannot assume Gauge is only about Number. So the class should be like ```java public interface Gauge extends Metric { T getValue(); } ``` was: Currently, Flink's Gauge is defined as ```java /** * A Gauge is a {@link Metric} that calculates a specific value at a point in time. */ public interface Gauge extends Metric { T getValue(); } ``` But it doesn't make sense to have Gauge take generic types other than Number. And it blocks I from finishing FLINK-6013, because I cannot assume Gauge is only about Number. So the class should be like ``` /** * A Gauge is a {@link Metric} that calculates a specific value at a point in time. */ public interface Gauge extends Metric { T getValue(); } ``` > Gauge should only take subclasses of Number, rather than everything > -- > > Key: FLINK-6053 > URL: https://issues.apache.org/jira/browse/FLINK-6053 > Project: Flink > Issue Type: Improvement > Components: Metrics >Affects Versions: 1.2.0 >Reporter: Bowen Li > Fix For: 1.3.0 > > > Currently, Flink's Gauge is defined as > ```java > public interface Gauge extends Metric { > T getValue(); > } > ``` > But it doesn't make sense to have Gauge take generic types other than Number. > And it blocks I from finishing FLINK-6013, because I cannot assume Gauge is > only about Number. So the class should be like > ```java > public interface Gauge extends Metric { > T getValue(); > } > ``` -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-6053) Gauge should only take subclasses of Number, rather than everything
[ https://issues.apache.org/jira/browse/FLINK-6053?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bowen Li updated FLINK-6053: Description: Currently, Flink's Gauge is defined as ```java /** * A Gauge is a {@link Metric} that calculates a specific value at a point in time. */ public interface Gauge extends Metric { T getValue(); } ``` But it doesn't make sense to have Gauge take generic types other than Number. And it blocks I from finishing FLINK-6013, because I cannot assume Gauge is only about Number. So the class should be like ``` /** * A Gauge is a {@link Metric} that calculates a specific value at a point in time. */ public interface Gauge extends Metric { T getValue(); } ``` was: Currently, Flink's Gauge is defined as ``` /** * A Gauge is a {@link Metric} that calculates a specific value at a point in time. */ public interface Gauge extends Metric { T getValue(); } ``` But it doesn't make sense to have Gauge take generic types other than Number. So the class should be like ``` /** * A Gauge is a {@link Metric} that calculates a specific value at a point in time. */ public interface Gauge extends Metric { T getValue(); } ``` > Gauge should only take subclasses of Number, rather than everything > -- > > Key: FLINK-6053 > URL: https://issues.apache.org/jira/browse/FLINK-6053 > Project: Flink > Issue Type: Improvement > Components: Metrics >Affects Versions: 1.2.0 >Reporter: Bowen Li > Fix For: 1.3.0 > > > Currently, Flink's Gauge is defined as > ```java > /** > * A Gauge is a {@link Metric} that calculates a specific value at a point in > time. > */ > public interface Gauge extends Metric { > T getValue(); > } > ``` > But it doesn't make sense to have Gauge take generic types other than Number. > And it blocks I from finishing FLINK-6013, because I cannot assume Gauge is > only about Number. So the class should be like > ``` > /** > * A Gauge is a {@link Metric} that calculates a specific value at a point in > time. > */ > public interface Gauge extends Metric { > T getValue(); > } > ``` -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6053) Gauge should only take subclasses of Number, rather than everything
Bowen Li created FLINK-6053: --- Summary: Gauge should only take subclasses of Number, rather than everything Key: FLINK-6053 URL: https://issues.apache.org/jira/browse/FLINK-6053 Project: Flink Issue Type: Improvement Components: Metrics Affects Versions: 1.2.0 Reporter: Bowen Li Fix For: 1.3.0 Currently, Flink's Gauge is defined as ``` /** * A Gauge is a {@link Metric} that calculates a specific value at a point in time. */ public interface Gauge extends Metric { T getValue(); } ``` But it doesn't make sense to have Gauge take generic types other than Number. So the class should be like ``` /** * A Gauge is a {@link Metric} that calculates a specific value at a point in time. */ public interface Gauge extends Metric { T getValue(); } ``` -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3509: [FLINK-5808] Fix Missing verification for setParal...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3509#discussion_r106040063 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java --- @@ -38,14 +38,10 @@ private final ContextEnvironment ctx; protected StreamContextEnvironment(ContextEnvironment ctx) { + super(GlobalConfiguration.loadConfiguration().getInteger( --- End diff -- @StephanEwen ping ð --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5808) Missing verification for setParallelism and setMaxParallelism
[ https://issues.apache.org/jira/browse/FLINK-5808?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15925127#comment-15925127 ] ASF GitHub Bot commented on FLINK-5808: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3509#discussion_r106040063 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java --- @@ -38,14 +38,10 @@ private final ContextEnvironment ctx; protected StreamContextEnvironment(ContextEnvironment ctx) { + super(GlobalConfiguration.loadConfiguration().getInteger( --- End diff -- @StephanEwen ping 😉 > Missing verification for setParallelism and setMaxParallelism > - > > Key: FLINK-5808 > URL: https://issues.apache.org/jira/browse/FLINK-5808 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.2.0 >Reporter: Aljoscha Krettek >Priority: Blocker > Fix For: 1.3.0, 1.2.1 > > > When {{setParallelism()}} is called we don't verify that it is <= than max > parallelism. Likewise, when {{setMaxParallelism()}} is called we don't check > that the new value doesn't clash with a previously set parallelism. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5985) Flink treats every task as stateful (making topology changes impossible)
[ https://issues.apache.org/jira/browse/FLINK-5985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15924934#comment-15924934 ] ASF GitHub Bot commented on FLINK-5985: --- Github user gyfora commented on the issue: https://github.com/apache/flink/pull/3523 It also doesnt seem to work starting from a clean state and then savepoint redeploy with changed topology so maybe I am really screwing up something > Flink treats every task as stateful (making topology changes impossible) > > > Key: FLINK-5985 > URL: https://issues.apache.org/jira/browse/FLINK-5985 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.2.0 >Reporter: Gyula Fora >Priority: Critical > > It seems that Flink treats every Task as stateful so changing the topology > is not possible without setting uid on every single operator. > If the topology has an iteration this is virtually impossible (or at least > gets super hacky) -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3523: [FLINK-5985] Report no task states for stateless tasks in...
Github user gyfora commented on the issue: https://github.com/apache/flink/pull/3523 It also doesnt seem to work starting from a clean state and then savepoint redeploy with changed topology so maybe I am really screwing up something --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3523: [FLINK-5985] Report no task states for stateless tasks in...
Github user gyfora commented on the issue: https://github.com/apache/flink/pull/3523 Hm, doesnt seem to work for the first try. What I did is I updated the client with the new jar based on your backport branch. Redeployed the job with a savepoint (to get the new Flink version), took a savepoint and tried to redeploy with the changed topology. I still seem to get the same error. Is it possible that the previous checkpoints have an effect on this? In any case I will double check tomorrow morning and try to do the test again. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5985) Flink treats every task as stateful (making topology changes impossible)
[ https://issues.apache.org/jira/browse/FLINK-5985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15924869#comment-15924869 ] ASF GitHub Bot commented on FLINK-5985: --- Github user gyfora commented on the issue: https://github.com/apache/flink/pull/3523 Hm, doesnt seem to work for the first try. What I did is I updated the client with the new jar based on your backport branch. Redeployed the job with a savepoint (to get the new Flink version), took a savepoint and tried to redeploy with the changed topology. I still seem to get the same error. Is it possible that the previous checkpoints have an effect on this? In any case I will double check tomorrow morning and try to do the test again. > Flink treats every task as stateful (making topology changes impossible) > > > Key: FLINK-5985 > URL: https://issues.apache.org/jira/browse/FLINK-5985 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.2.0 >Reporter: Gyula Fora >Priority: Critical > > It seems that Flink treats every Task as stateful so changing the topology > is not possible without setting uid on every single operator. > If the topology has an iteration this is virtually impossible (or at least > gets super hacky) -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6052) Potential null access in ZooKeeperCompletedCheckpointStore#getLatestCheckpoint()
Ted Yu created FLINK-6052: - Summary: Potential null access in ZooKeeperCompletedCheckpointStore#getLatestCheckpoint() Key: FLINK-6052 URL: https://issues.apache.org/jira/browse/FLINK-6052 Project: Flink Issue Type: Bug Reporter: Ted Yu Priority: Minor {code} Tuple2, String> checkpointStateHandle = checkpointStateHandles.peekLast(); try { return retrieveCompletedCheckpoint(checkpointStateHandle); } catch (Exception e) { LOG.warn("Could not retrieve latest checkpoint. Removing it from " + "the completed checkpoint store.", e); try { // remove the checkpoint with broken state handle removeBrokenStateHandle(checkpointStateHandles.pollLast()); } catch (Exception removeException) { {code} The code should handle the case where peekLast() / pollLast() returns null. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-3930) Implement Service-Level Authorization
[ https://issues.apache.org/jira/browse/FLINK-3930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15924834#comment-15924834 ] ASF GitHub Bot commented on FLINK-3930: --- Github user EronWright commented on the issue: https://github.com/apache/flink/pull/2425 @StephanEwen keep in mind that Flink's current SSL support in Flink doesn't achieve _mutual authentication_ - there's no client certificate there.With SSL enabled, an untrusted client can launch jobs in your Flink cluster and thus gain access to the Kerberos credential associated with the cluster. SSL mutual authentication is a good alternative to a shared secret, but at the time we were limited to built-in Akka functionality (which doesn't include mutual auth). Given the "flakka" fork that's now in place, a pure SSL solution might now be possible (I haven't thought it through completely). The fact remains that, today, _all the secrets known to a Flink job are exposed to everyone who can connect to the cluster's endpoint_. It would be nice to construct a holistic plan that worked out how the Web UI would support authentication and also incorporated FLIP-6. Both YARN and Mesos interpose a web proxy for the UI with its own limitations, notably no support for SSL mutual auth. > Implement Service-Level Authorization > - > > Key: FLINK-3930 > URL: https://issues.apache.org/jira/browse/FLINK-3930 > Project: Flink > Issue Type: New Feature > Components: Security >Reporter: Eron Wright >Assignee: Vijay Srinivasaraghavan > Labels: security > Original Estimate: 672h > Remaining Estimate: 672h > > _This issue is part of a series of improvements detailed in the [Secure Data > Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing] > design doc._ > Service-level authorization is the initial authorization mechanism to ensure > clients (or servers) connecting to the Flink cluster are authorized to do so. > The purpose is to prevent a cluster from being used by an unauthorized > user, whether to execute jobs, disrupt cluster functionality, or gain access > to secrets stored within the cluster. > Implement service-level authorization as described in the design doc. > - Introduce a shared secret cookie > - Enable Akka security cookie > - Implement data transfer authentication > - Secure the web dashboard -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #2425: FLINK-3930 Added shared secret based authorization for Fl...
Github user EronWright commented on the issue: https://github.com/apache/flink/pull/2425 @StephanEwen keep in mind that Flink's current SSL support in Flink doesn't achieve _mutual authentication_ - there's no client certificate there.With SSL enabled, an untrusted client can launch jobs in your Flink cluster and thus gain access to the Kerberos credential associated with the cluster. SSL mutual authentication is a good alternative to a shared secret, but at the time we were limited to built-in Akka functionality (which doesn't include mutual auth). Given the "flakka" fork that's now in place, a pure SSL solution might now be possible (I haven't thought it through completely). The fact remains that, today, _all the secrets known to a Flink job are exposed to everyone who can connect to the cluster's endpoint_. It would be nice to construct a holistic plan that worked out how the Web UI would support authentication and also incorporated FLIP-6. Both YARN and Mesos interpose a web proxy for the UI with its own limitations, notably no support for SSL mutual auth. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6000) Can not start HA cluster with start-cluster.sh
[ https://issues.apache.org/jira/browse/FLINK-6000?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15924751#comment-15924751 ] ASF GitHub Bot commented on FLINK-6000: --- Github user dawidwys commented on the issue: https://github.com/apache/flink/pull/3506 @uce You replied to my message on mailing-list than maybe you could have a look at this PR ;) Recently another user reported it at SO: http://stackoverflow.com/questions/42793598/flink-1-2-does-not-start-in-ha-cluster-mode > Can not start HA cluster with start-cluster.sh > -- > > Key: FLINK-6000 > URL: https://issues.apache.org/jira/browse/FLINK-6000 > Project: Flink > Issue Type: Bug > Components: Startup Shell Scripts >Affects Versions: 1.2.0 >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz > > Right know it is impossible to start a cluster in zookeeper HA mode as > described in the documentation by setting: > in con/flink-conf.yaml: > {code} > high-availability: zookeeper > ... > {code} > in conf/masters: > {code} > localhost:8081 > localhost:8082 > {code} > The problem is with the {{bin/config.sh}} file. If value "zookeeper" is read > from config file the variable {{HIGH_AVAILABILITY}} will be reset to "none" > with the else branch. See the below code: > {code} > if [ -z "${HIGH_AVAILABILITY}" ]; then > HIGH_AVAILABILITY=$(readFromConfig ${KEY_HIGH_AVAILABILITY} "" > "${YAML_CONF}") > if [ -z "${HIGH_AVAILABILITY}" ]; then > # Try deprecated value > DEPRECATED_HA=$(readFromConfig "recovery.mode" "" "${YAML_CONF}") > if [ -z "${DEPRECATED_HA}" ]; then > HIGH_AVAILABILITY="none" > elif [ ${DEPRECATED_HA} == "standalone" ]; then > # Standalone is now 'none' > HIGH_AVAILABILITY="none" > else > HIGH_AVAILABILITY=${DEPRECATED_HA} > fi > else > HIGH_AVAILABILITY="none" <-- it exits here > fi > fi > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3506: [FLINK-6000] Fix starting HA cluster with start-cluster.s...
Github user dawidwys commented on the issue: https://github.com/apache/flink/pull/3506 @uce You replied to my message on mailing-list than maybe you could have a look at this PR ;) Recently another user reported it at SO: http://stackoverflow.com/questions/42793598/flink-1-2-does-not-start-in-ha-cluster-mode --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Resolved] (FLINK-3398) Flink Kafka consumer should support auto-commit opt-outs
[ https://issues.apache.org/jira/browse/FLINK-3398?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai resolved FLINK-3398. Resolution: Fixed Fixed for 1.3.0 with http://git-wip-us.apache.org/repos/asf/flink/commit/90c7415. > Flink Kafka consumer should support auto-commit opt-outs > > > Key: FLINK-3398 > URL: https://issues.apache.org/jira/browse/FLINK-3398 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Shikhar Bhushan >Assignee: Tzu-Li (Gordon) Tai >Priority: Critical > Fix For: 1.3.0 > > > Currently the Kafka source will commit consumer offsets to Zookeeper, either > upon a checkpoint if checkpointing is enabled, otherwise periodically based > on {{auto.commit.interval.ms}} > It should be possible to opt-out of committing consumer offsets to Zookeeper. > Kafka has this config as {{auto.commit.enable}} (0.8) and > {{enable.auto.commit}} (0.9). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6001) NPE on TumblingEventTimeWindows with ContinuousEventTimeTrigger and allowedLateness
[ https://issues.apache.org/jira/browse/FLINK-6001?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15924615#comment-15924615 ] Vladislav Pernin commented on FLINK-6001: - Maybe related to FLINK-5713 ? I have to test. > NPE on TumblingEventTimeWindows with ContinuousEventTimeTrigger and > allowedLateness > --- > > Key: FLINK-6001 > URL: https://issues.apache.org/jira/browse/FLINK-6001 > Project: Flink > Issue Type: Bug > Components: DataStream API, Streaming >Affects Versions: 1.2.0 >Reporter: Vladislav Pernin >Priority: Critical > > I try to isolate the problem in a small and simple reproducer by extracting > the data from my real setup. > I fails with NPE at : > {noformat} > java.lang.NullPointerException: null > at > org.apache.flink.streaming.api.windowing.triggers.ContinuousEventTimeTrigger.onEventTime(ContinuousEventTimeTrigger.java:81) > ~[flink-streaming-java_2.11-1.2.0.jar:1.2.0] > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.onEventTime(WindowOperator.java:721) > ~[flink-streaming-java_2.11-1.2.0.jar:1.2.0] > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.java:425) > ~[flink-streaming-java_2.11-1.2.0.jar:1.2.0] > at > org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:276) > ~[flink-streaming-java_2.11-1.2.0.jar:1.2.0] > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:858) > ~[flink-streaming-java_2.11-1.2.0.jar:1.2.0] > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:168) > ~[flink-streaming-java_2.11-1.2.0.jar:1.2.0] > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:63) > ~[flink-streaming-java_2.11-1.2.0.jar:1.2.0] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:272) > ~[flink-streaming-java_2.11-1.2.0.jar:1.2.0] > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655) > ~[flink-runtime_2.11-1.2.0.jar:1.2.0] > at java.lang.Thread.run(Thread.java:745) ~[na:1.8.0_121] > {noformat} > It fails only with the Thread.sleep. If you uncomment it, it won't fail. > So, you may have to increase the sleep time depending of your environment. > I know this is not a very rigourous test, but this is the only way I've found > to reproduce it. > You can find the reproducer here : > https://github.com/vpernin/flink-window-npe -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Resolved] (FLINK-5949) Flink on YARN checks for Kerberos credentials for non-Kerberos authentication methods
[ https://issues.apache.org/jira/browse/FLINK-5949?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai resolved FLINK-5949. Resolution: Fixed Fixed for 1.3.0 with http://git-wip-us.apache.org/repos/asf/flink/commit/87779ad. Fixed for 1.2.1 with http://git-wip-us.apache.org/repos/asf/flink/commit/0c532ed. > Flink on YARN checks for Kerberos credentials for non-Kerberos authentication > methods > - > > Key: FLINK-5949 > URL: https://issues.apache.org/jira/browse/FLINK-5949 > Project: Flink > Issue Type: Bug > Components: Security, YARN >Affects Versions: 1.2.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Blocker > Fix For: 1.3.0, 1.2.1 > > > Reported in ML: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Yarn-and-MapR-Kerberos-issue-td11996.html > The problem is that the Flink on YARN client incorrectly assumes > {{UserGroupInformation.isSecurityEnabled()}} returns {{true}} only for > Kerberos authentication modes, whereas it actually returns {{true}} for other > kinds of authentications too. > We could make use of {{UserGroupInformation.getAuthenticationMethod()}} to > check for {{KERBEROS}} only. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6006) Kafka Consumer can lose state if queried partition list is incomplete on restore
[ https://issues.apache.org/jira/browse/FLINK-6006?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15924608#comment-15924608 ] ASF GitHub Bot commented on FLINK-6006: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3507 Doing another Travis run locally before merging just to be safe: https://travis-ci.org/tzulitai/flink/builds/211031758 > Kafka Consumer can lose state if queried partition list is incomplete on > restore > > > Key: FLINK-6006 > URL: https://issues.apache.org/jira/browse/FLINK-6006 > Project: Flink > Issue Type: Bug > Components: Kafka Connector, Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Blocker > Fix For: 1.1.5, 1.2.1 > > > In 1.1.x and 1.2.x, the FlinkKafkaConsumer performs partition list querying > on restore. Then, only restored state of partitions that exists in the > queried list is used to initialize the fetcher's state holders. > If in any case the returned partition list is incomplete (i.e. missing > partitions that existed before, perhaps due to temporary ZK / broker > downtime), then the state of the missing partitions is dropped and cannot be > recovered anymore. > In 1.3-SNAPSHOT, this is fixed by changes in FLINK-4280, so only 1.1 and 1.2 > is affected. > We can backport some of the behavioural changes there to 1.1 and 1.2. > Generally, we should not depend on the current partition list in Kafka when > restoring, but just restore all previous state into the fetcher's state > holders. > This would therefore also require some checking on how the consumer threads / > Kafka clients behave when its assigned partitions cannot be reached. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3507: [backport-1.1] [FLINK-6006] [kafka] Always use complete r...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3507 Doing another Travis run locally before merging just to be safe: https://travis-ci.org/tzulitai/flink/builds/211031758 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5481) Simplify Row creation
[ https://issues.apache.org/jira/browse/FLINK-5481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15924605#comment-15924605 ] ASF GitHub Bot commented on FLINK-5481: --- Github user tonycox commented on a diff in the pull request: https://github.com/apache/flink/pull/3127#discussion_r105972317 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/Types.scala --- @@ -17,29 +17,34 @@ */ package org.apache.flink.table.api -import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, SqlTimeTypeInfo} +import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.table.typeutils.TimeIntervalTypeInfo +import org.apache.flink.api.java.typeutils.{Types => JTypes} /** * This class enumerates all supported types of the Table API. */ -object Types { +object Types extends JTypes { - val STRING = BasicTypeInfo.STRING_TYPE_INFO - val BOOLEAN = BasicTypeInfo.BOOLEAN_TYPE_INFO + val STRING = JTypes.STRING + val BOOLEAN = JTypes.BOOLEAN - val BYTE = BasicTypeInfo.BYTE_TYPE_INFO - val SHORT = BasicTypeInfo.SHORT_TYPE_INFO - val INT = BasicTypeInfo.INT_TYPE_INFO - val LONG = BasicTypeInfo.LONG_TYPE_INFO - val FLOAT = BasicTypeInfo.FLOAT_TYPE_INFO - val DOUBLE = BasicTypeInfo.DOUBLE_TYPE_INFO - val DECIMAL = BasicTypeInfo.BIG_DEC_TYPE_INFO + val BYTE = JTypes.BYTE + val SHORT = JTypes.SHORT + val INT = JTypes.INT + val LONG = JTypes.LONG + val FLOAT = JTypes.FLOAT + val DOUBLE = JTypes.DOUBLE + val DECIMAL = JTypes.DECIMAL - val DATE = SqlTimeTypeInfo.DATE - val TIME = SqlTimeTypeInfo.TIME - val TIMESTAMP = SqlTimeTypeInfo.TIMESTAMP + val DATE = JTypes.DATE + val TIME = JTypes.TIME + val TIMESTAMP = JTypes.TIMESTAMP val INTERVAL_MONTHS = TimeIntervalTypeInfo.INTERVAL_MONTHS val INTERVAL_MILLIS = TimeIntervalTypeInfo.INTERVAL_MILLIS + def ROW(types: TypeInformation[_]*) = JTypes.ROW(types: _*) + + def ROW(fieldNames: Array[String], types: TypeInformation[_]*) = --- End diff -- Done > Simplify Row creation > - > > Key: FLINK-5481 > URL: https://issues.apache.org/jira/browse/FLINK-5481 > Project: Flink > Issue Type: Bug > Components: DataSet API, Table API & SQL >Affects Versions: 1.2.0 >Reporter: Anton Solovev >Assignee: Anton Solovev >Priority: Trivial > > When we use {{ExecutionEnvironment#fromElements(X... data)}} it takes first > element of {{data}} to define a type. If first Row in collection has wrong > number of fields (there are nulls) TypeExtractor returns not RowTypeInfo, but > GenericType -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5481) Simplify Row creation
[ https://issues.apache.org/jira/browse/FLINK-5481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15924603#comment-15924603 ] ASF GitHub Bot commented on FLINK-5481: --- Github user tonycox commented on a diff in the pull request: https://github.com/apache/flink/pull/3127#discussion_r105972293 --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/Types.java --- @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils; + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; + +import java.math.BigDecimal; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; + +public class Types { + + public static final BasicTypeInfo STRING = BasicTypeInfo.STRING_TYPE_INFO; + public static final BasicTypeInfo BOOLEAN = BasicTypeInfo.BOOLEAN_TYPE_INFO; + public static final BasicTypeInfo BYTE = BasicTypeInfo.BYTE_TYPE_INFO; + public static final BasicTypeInfo SHORT = BasicTypeInfo.SHORT_TYPE_INFO; + public static final BasicTypeInfo INT = BasicTypeInfo.INT_TYPE_INFO; + public static final BasicTypeInfo LONG = BasicTypeInfo.LONG_TYPE_INFO; + public static final BasicTypeInfo FLOAT = BasicTypeInfo.FLOAT_TYPE_INFO; + public static final BasicTypeInfo DOUBLE = BasicTypeInfo.DOUBLE_TYPE_INFO; + public static final BasicTypeInfo DECIMAL = BasicTypeInfo.BIG_DEC_TYPE_INFO; + + public static final SqlTimeTypeInfo DATE = SqlTimeTypeInfo.DATE; --- End diff -- Done > Simplify Row creation > - > > Key: FLINK-5481 > URL: https://issues.apache.org/jira/browse/FLINK-5481 > Project: Flink > Issue Type: Bug > Components: DataSet API, Table API & SQL >Affects Versions: 1.2.0 >Reporter: Anton Solovev >Assignee: Anton Solovev >Priority: Trivial > > When we use {{ExecutionEnvironment#fromElements(X... data)}} it takes first > element of {{data}} to define a type. If first Row in collection has wrong > number of fields (there are nulls) TypeExtractor returns not RowTypeInfo, but > GenericType -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3127: [FLINK-5481] Simplify Row creation
Github user tonycox commented on a diff in the pull request: https://github.com/apache/flink/pull/3127#discussion_r105972293 --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/Types.java --- @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils; + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; + +import java.math.BigDecimal; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; + +public class Types { + + public static final BasicTypeInfo STRING = BasicTypeInfo.STRING_TYPE_INFO; + public static final BasicTypeInfo BOOLEAN = BasicTypeInfo.BOOLEAN_TYPE_INFO; + public static final BasicTypeInfo BYTE = BasicTypeInfo.BYTE_TYPE_INFO; + public static final BasicTypeInfo SHORT = BasicTypeInfo.SHORT_TYPE_INFO; + public static final BasicTypeInfo INT = BasicTypeInfo.INT_TYPE_INFO; + public static final BasicTypeInfo LONG = BasicTypeInfo.LONG_TYPE_INFO; + public static final BasicTypeInfo FLOAT = BasicTypeInfo.FLOAT_TYPE_INFO; + public static final BasicTypeInfo DOUBLE = BasicTypeInfo.DOUBLE_TYPE_INFO; + public static final BasicTypeInfo DECIMAL = BasicTypeInfo.BIG_DEC_TYPE_INFO; + + public static final SqlTimeTypeInfo DATE = SqlTimeTypeInfo.DATE; --- End diff -- Done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3127: [FLINK-5481] Simplify Row creation
Github user tonycox commented on a diff in the pull request: https://github.com/apache/flink/pull/3127#discussion_r105972317 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/Types.scala --- @@ -17,29 +17,34 @@ */ package org.apache.flink.table.api -import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, SqlTimeTypeInfo} +import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.table.typeutils.TimeIntervalTypeInfo +import org.apache.flink.api.java.typeutils.{Types => JTypes} /** * This class enumerates all supported types of the Table API. */ -object Types { +object Types extends JTypes { - val STRING = BasicTypeInfo.STRING_TYPE_INFO - val BOOLEAN = BasicTypeInfo.BOOLEAN_TYPE_INFO + val STRING = JTypes.STRING + val BOOLEAN = JTypes.BOOLEAN - val BYTE = BasicTypeInfo.BYTE_TYPE_INFO - val SHORT = BasicTypeInfo.SHORT_TYPE_INFO - val INT = BasicTypeInfo.INT_TYPE_INFO - val LONG = BasicTypeInfo.LONG_TYPE_INFO - val FLOAT = BasicTypeInfo.FLOAT_TYPE_INFO - val DOUBLE = BasicTypeInfo.DOUBLE_TYPE_INFO - val DECIMAL = BasicTypeInfo.BIG_DEC_TYPE_INFO + val BYTE = JTypes.BYTE + val SHORT = JTypes.SHORT + val INT = JTypes.INT + val LONG = JTypes.LONG + val FLOAT = JTypes.FLOAT + val DOUBLE = JTypes.DOUBLE + val DECIMAL = JTypes.DECIMAL - val DATE = SqlTimeTypeInfo.DATE - val TIME = SqlTimeTypeInfo.TIME - val TIMESTAMP = SqlTimeTypeInfo.TIMESTAMP + val DATE = JTypes.DATE + val TIME = JTypes.TIME + val TIMESTAMP = JTypes.TIMESTAMP val INTERVAL_MONTHS = TimeIntervalTypeInfo.INTERVAL_MONTHS val INTERVAL_MILLIS = TimeIntervalTypeInfo.INTERVAL_MILLIS + def ROW(types: TypeInformation[_]*) = JTypes.ROW(types: _*) + + def ROW(fieldNames: Array[String], types: TypeInformation[_]*) = --- End diff -- Done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1579) Create a Flink History Server
[ https://issues.apache.org/jira/browse/FLINK-1579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15924598#comment-15924598 ] ASF GitHub Bot commented on FLINK-1579: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/3460 @uce I've addressed most of your comments. Missing is some javadocs, renaming of the web-ui, bounding the size of the map in the ArchiveFetcher. > Create a Flink History Server > - > > Key: FLINK-1579 > URL: https://issues.apache.org/jira/browse/FLINK-1579 > Project: Flink > Issue Type: New Feature > Components: Distributed Coordination >Affects Versions: 0.9 >Reporter: Robert Metzger >Assignee: Chesnay Schepler > > Right now its not possible to analyze the job results for jobs that ran on > YARN, because we'll loose the information once the JobManager has stopped. > Therefore, I propose to implement a "Flink History Server" which serves the > results from these jobs. > I haven't started thinking about the implementation, but I suspect it > involves some JSON files stored in HDFS :) -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3460: [FLINK-1579] Implement History Server
Github user zentol commented on the issue: https://github.com/apache/flink/pull/3460 @uce I've addressed most of your comments. Missing is some javadocs, renaming of the web-ui, bounding the size of the map in the ArchiveFetcher. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3538: FLINK-6051: Correct metrics scope names
GitHub user hadronzoo opened a pull request: https://github.com/apache/flink/pull/3538 FLINK-6051: Correct metrics scope names Closes FLINK-6051. You can merge this pull request into a Git repository by running: $ git pull https://github.com/orgsync/flink FLINK-6051 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3538.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3538 commit ab4332801428e14b8077a2a7669a51f8a1607ca3 Author: Joshua Griffith Date: 2017-03-14T17:10:12Z Correct metrics scope names Closes FLINK-6051. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6006) Kafka Consumer can lose state if queried partition list is incomplete on restore
[ https://issues.apache.org/jira/browse/FLINK-6006?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15924586#comment-15924586 ] ASF GitHub Bot commented on FLINK-6006: --- Github user tzulitai closed the pull request at: https://github.com/apache/flink/pull/3505 > Kafka Consumer can lose state if queried partition list is incomplete on > restore > > > Key: FLINK-6006 > URL: https://issues.apache.org/jira/browse/FLINK-6006 > Project: Flink > Issue Type: Bug > Components: Kafka Connector, Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Blocker > Fix For: 1.1.5, 1.2.1 > > > In 1.1.x and 1.2.x, the FlinkKafkaConsumer performs partition list querying > on restore. Then, only restored state of partitions that exists in the > queried list is used to initialize the fetcher's state holders. > If in any case the returned partition list is incomplete (i.e. missing > partitions that existed before, perhaps due to temporary ZK / broker > downtime), then the state of the missing partitions is dropped and cannot be > recovered anymore. > In 1.3-SNAPSHOT, this is fixed by changes in FLINK-4280, so only 1.1 and 1.2 > is affected. > We can backport some of the behavioural changes there to 1.1 and 1.2. > Generally, we should not depend on the current partition list in Kafka when > restoring, but just restore all previous state into the fetcher's state > holders. > This would therefore also require some checking on how the consumer threads / > Kafka clients behave when its assigned partitions cannot be reached. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6051) Wrong metric scope names in documentation
[ https://issues.apache.org/jira/browse/FLINK-6051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15924587#comment-15924587 ] ASF GitHub Bot commented on FLINK-6051: --- GitHub user hadronzoo opened a pull request: https://github.com/apache/flink/pull/3538 FLINK-6051: Correct metrics scope names Closes FLINK-6051. You can merge this pull request into a Git repository by running: $ git pull https://github.com/orgsync/flink FLINK-6051 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3538.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3538 commit ab4332801428e14b8077a2a7669a51f8a1607ca3 Author: Joshua Griffith Date: 2017-03-14T17:10:12Z Correct metrics scope names Closes FLINK-6051. > Wrong metric scope names in documentation > - > > Key: FLINK-6051 > URL: https://issues.apache.org/jira/browse/FLINK-6051 > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.2.0 >Reporter: Joshua Griffith >Priority: Minor > > FLINK-4402 fixed the following metrics names: > metrics.scope.tm.task → metrics.scope.task > metrics.scope.tm.operator → metrics.scope.operator > However, the [1.2.0 > documentation|https://github.com/apache/flink/blob/dabeb74c10e755c655a06cdc8846dc7227d63cb9/docs/setup/config.md#metrics] > lists the incorrect metric names again. > As a side note, it would be convenient to have all of the configuration > options documented in a machine readable format. To build docker images I > [process the config file with > sed|https://github.com/orgsync/docker-flink/blob/a08ec4d102b623bfba74a61d3012931e28ef92e7/Dockerfile#L36] > to extract the options and turn them into env vars, which is how I noticed > this issue. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3505: [backport-1.2] [FLINK-6006] [kafka] Always use com...
Github user tzulitai closed the pull request at: https://github.com/apache/flink/pull/3505 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3398) Flink Kafka consumer should support auto-commit opt-outs
[ https://issues.apache.org/jira/browse/FLINK-3398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15924570#comment-15924570 ] ASF GitHub Bot commented on FLINK-3398: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3527 > Flink Kafka consumer should support auto-commit opt-outs > > > Key: FLINK-3398 > URL: https://issues.apache.org/jira/browse/FLINK-3398 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Shikhar Bhushan >Assignee: Tzu-Li (Gordon) Tai >Priority: Critical > Fix For: 1.3.0 > > > Currently the Kafka source will commit consumer offsets to Zookeeper, either > upon a checkpoint if checkpointing is enabled, otherwise periodically based > on {{auto.commit.interval.ms}} > It should be possible to opt-out of committing consumer offsets to Zookeeper. > Kafka has this config as {{auto.commit.enable}} (0.8) and > {{enable.auto.commit}} (0.9). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5949) Flink on YARN checks for Kerberos credentials for non-Kerberos authentication methods
[ https://issues.apache.org/jira/browse/FLINK-5949?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15924569#comment-15924569 ] ASF GitHub Bot commented on FLINK-5949: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3528 > Flink on YARN checks for Kerberos credentials for non-Kerberos authentication > methods > - > > Key: FLINK-5949 > URL: https://issues.apache.org/jira/browse/FLINK-5949 > Project: Flink > Issue Type: Bug > Components: Security, YARN >Affects Versions: 1.2.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Blocker > Fix For: 1.3.0, 1.2.1 > > > Reported in ML: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Yarn-and-MapR-Kerberos-issue-td11996.html > The problem is that the Flink on YARN client incorrectly assumes > {{UserGroupInformation.isSecurityEnabled()}} returns {{true}} only for > Kerberos authentication modes, whereas it actually returns {{true}} for other > kinds of authentications too. > We could make use of {{UserGroupInformation.getAuthenticationMethod()}} to > check for {{KERBEROS}} only. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6051) Wrong metric scope names in documentation
Joshua Griffith created FLINK-6051: -- Summary: Wrong metric scope names in documentation Key: FLINK-6051 URL: https://issues.apache.org/jira/browse/FLINK-6051 Project: Flink Issue Type: Bug Components: Documentation Affects Versions: 1.2.0 Reporter: Joshua Griffith Priority: Minor FLINK-4402 fixed the following metrics names: metrics.scope.tm.task → metrics.scope.task metrics.scope.tm.operator → metrics.scope.operator However, the [1.2.0 documentation|https://github.com/apache/flink/blob/dabeb74c10e755c655a06cdc8846dc7227d63cb9/docs/setup/config.md#metrics] lists the incorrect metric names again. As a side note, it would be convenient to have all of the configuration options documented in a machine readable format. To build docker images I [process the config file with sed|https://github.com/orgsync/docker-flink/blob/a08ec4d102b623bfba74a61d3012931e28ef92e7/Dockerfile#L36] to extract the options and turn them into env vars, which is how I noticed this issue. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3528: [FLINK-5949] [yarn] Don't check Kerberos credentia...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3528 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3527: [FLINK-3398] [kafka] Allow disabling offset commit...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3527 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3527: [FLINK-3398] [kafka] Allow disabling offset committing fo...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3527 Did a final test run on a Kafka installation, and things worked as expected. One minor improvement would be to add logs for what exactly the commit mode is used when it is determined in `open()`. I think it's a safe call to add the log and then merge this :-) Will proceed to merge for `master`. Thanks for all the recent reviews @rmetzger :-D --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5756) When there are many values under the same key in ListState, RocksDBStateBackend performances poor
[ https://issues.apache.org/jira/browse/FLINK-5756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15924537#comment-15924537 ] Stephan Ewen commented on FLINK-5756: - I would suggest to see if we get a response from the RocksDB community. If we cannot expect a fix soon, we will have to build around that using the "range iterator" workaround described above. > When there are many values under the same key in ListState, > RocksDBStateBackend performances poor > - > > Key: FLINK-5756 > URL: https://issues.apache.org/jira/browse/FLINK-5756 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.2.0 > Environment: CentOS 7.2 >Reporter: Syinchwun Leo > > When using RocksDB as the StateBackend, if there are many values under the > same key in ListState, the windowState.get() operator performances very poor. > I also the the RocksDB using version 4.11.2, the performance is also very > poor. The problem is likely to related to RocksDB itself's get() operator > after using merge(). The problem may influences the window operation's > performance when the size is very large using ListState. I try to merge 5 > values under the same key in RocksDB, It costs 120 seconds to execute get() > operation. > /// > The flink's code is as follows: > {code} > class SEventSource extends RichSourceFunction [SEvent] { > private var count = 0L > private val alphabet = > "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWZYX0987654321" > override def run(sourceContext: SourceContext[SEvent]): Unit = { > while (true) { > for (i <- 0 until 5000) { > sourceContext.collect(SEvent(1, "hello-"+count, alphabet,1)) > count += 1L > } > Thread.sleep(1000) > } > } > } > env.addSource(new SEventSource) > .assignTimestampsAndWatermarks(new > AssignerWithPeriodicWatermarks[SEvent] { > override def getCurrentWatermark: Watermark = { > new Watermark(System.currentTimeMillis()) > } > override def extractTimestamp(t: SEvent, l: Long): Long = { > System.currentTimeMillis() > } > }) > .keyBy(0) > .window(SlidingEventTimeWindows.of(Time.seconds(20), Time.seconds(2))) > .apply(new WindowStatistic) > .map(x => (System.currentTimeMillis(), x)) > .print() > {code} > > The RocksDB Test code: > {code} > val stringAppendOperator = new StringAppendOperator > val options = new Options() > options.setCompactionStyle(CompactionStyle.LEVEL) > .setCompressionType(CompressionType.SNAPPY_COMPRESSION) > .setLevelCompactionDynamicLevelBytes(true) > .setIncreaseParallelism(4) > .setUseFsync(true) > .setMaxOpenFiles(-1) > .setCreateIfMissing(true) > .setMergeOperator(stringAppendOperator) > val write_options = new WriteOptions > write_options.setSync(false) > val rocksDB = RocksDB.open(options, "/**/Data/") > val key = "key" > val value = > "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321" > val beginmerge = System.currentTimeMillis() > for(i <- 0 to 5) { > rocksDB.merge(key.getBytes(), ("s"+ i + value).getBytes()) > //rocksDB.put(key.getBytes, value.getBytes) > } > println("finish") > val begin = System.currentTimeMillis() > rocksDB.get(key.getBytes) > val end = System.currentTimeMillis() > println("merge cost:" + (begin - beginmerge)) > println("Time consuming:" + (end - begin)) > } > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4545) Flink automatically manages TM network buffer
[ https://issues.apache.org/jira/browse/FLINK-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15924535#comment-15924535 ] Nico Kruber commented on FLINK-4545: We did some thinking and would probably add the following three new configuration parameters (with the given defaults) to finally replace the {{taskmanager.network.numberOfBuffers}} parameter: - {{taskmanager.network.memory.fraction}} (default: 0.1): fraction of JVM memory to use for network buffers (by reducing {{taskmanager.memory.fraction}} from 0.7 to 0.6) - {{taskmanager.network.memory.min}} (default: 64MB): minimum memory size for network buffers - {{taskmanager.network.memory.max}} (default: 1GB): maximum memory size for network buffers A fixed size may be achieved by setting the latter two to the same value, {{taskmanager.network.numberOfBuffers}} will be marked deprecated and used only if the other three are not given, e.g. due to old config files being used. > Flink automatically manages TM network buffer > - > > Key: FLINK-4545 > URL: https://issues.apache.org/jira/browse/FLINK-4545 > Project: Flink > Issue Type: Wish > Components: Network >Reporter: Zhenzhong Xu > > Currently, the number of network buffer per task manager is preconfigured and > the memory is pre-allocated through taskmanager.network.numberOfBuffers > config. In a Job DAG with shuffle phase, this number can go up very high > depends on the TM cluster size. The formula for calculating the buffer count > is documented here > (https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers). > > #slots-per-TM^2 * #TMs * 4 > In a standalone deployment, we may need to control the task manager cluster > size dynamically and then leverage the up-coming Flink feature to support > scaling job parallelism/rescaling at runtime. > If the buffer count config is static at runtime and cannot be changed without > restarting task manager process, this may add latency and complexity for > scaling process. I am wondering if there is already any discussion around > whether the network buffer should be automatically managed by Flink or at > least expose some API to allow it to be reconfigured. Let me know if there is > any existing JIRA that I should follow. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3528: [FLINK-5949] [yarn] Don't check Kerberos credentials for ...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3528 Thanks for the review :-) Failing tests seem to be something instable with Maven. Merging this to `master` and `release-1.2` .. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6050) Improve failure reporting when using Future.thenAccept
[ https://issues.apache.org/jira/browse/FLINK-6050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15924518#comment-15924518 ] ASF GitHub Bot commented on FLINK-6050: --- GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/3537 [FLINK-6050] [robustness] Register exception handler on thenAccept futures When applying an AcceptFunction on a Future x, then we should register the exception handler on the returned thenAccept future instead of on x. This has the advantage that we also catch exceptions which are thrown inside of the AcceptFunction and not only those which originate from x. The PR adapts the code respectively. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink hardenAcceptFutureCalls Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3537.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3537 > Improve failure reporting when using Future.thenAccept > -- > > Key: FLINK-6050 > URL: https://issues.apache.org/jira/browse/FLINK-6050 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.3.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Minor > > When applying {{Future.thenAccept(Async)}} onto a {{Future}}, then we should > register the exception handler on the returned {{Future}} and not on > the original future. This has the advantage that we also catch exceptions > which are thrown in the {{AcceptFunction}} and not only those originating > from the original {{Future}}. This improve Flink's behaviour, because > exceptions are not swallowed in the returned {{Future}}. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3537: [FLINK-6050] [robustness] Register exception handl...
GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/3537 [FLINK-6050] [robustness] Register exception handler on thenAccept futures When applying an AcceptFunction on a Future x, then we should register the exception handler on the returned thenAccept future instead of on x. This has the advantage that we also catch exceptions which are thrown inside of the AcceptFunction and not only those which originate from x. The PR adapts the code respectively. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink hardenAcceptFutureCalls Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3537.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3537 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3398) Flink Kafka consumer should support auto-commit opt-outs
[ https://issues.apache.org/jira/browse/FLINK-3398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15924511#comment-15924511 ] ASF GitHub Bot commented on FLINK-3398: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3527 Did a final test run on a Kafka installation, and things worked as expected. One minor improvement would be to add logs for what exactly the commit mode is used when it is determined in `open()`. I think it's a safe call to add the log and then merge this :-) Will proceed to merge for `master`. Thanks for all the recent reviews @rmetzger :-D > Flink Kafka consumer should support auto-commit opt-outs > > > Key: FLINK-3398 > URL: https://issues.apache.org/jira/browse/FLINK-3398 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Shikhar Bhushan >Assignee: Tzu-Li (Gordon) Tai >Priority: Critical > Fix For: 1.3.0 > > > Currently the Kafka source will commit consumer offsets to Zookeeper, either > upon a checkpoint if checkpointing is enabled, otherwise periodically based > on {{auto.commit.interval.ms}} > It should be possible to opt-out of committing consumer offsets to Zookeeper. > Kafka has this config as {{auto.commit.enable}} (0.8) and > {{enable.auto.commit}} (0.9). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6050) Improve failure reporting when using Future.thenAccept
Till Rohrmann created FLINK-6050: Summary: Improve failure reporting when using Future.thenAccept Key: FLINK-6050 URL: https://issues.apache.org/jira/browse/FLINK-6050 Project: Flink Issue Type: Improvement Components: Distributed Coordination Affects Versions: 1.3.0 Reporter: Till Rohrmann Assignee: Till Rohrmann Priority: Minor When applying {{Future.thenAccept(Async)}} onto a {{Future}}, then we should register the exception handler on the returned {{Future}} and not on the original future. This has the advantage that we also catch exceptions which are thrown in the {{AcceptFunction}} and not only those originating from the original {{Future}}. This improve Flink's behaviour, because exceptions are not swallowed in the returned {{Future}}. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3505: [backport-1.2] [FLINK-6006] [kafka] Always use complete r...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3505 Merging .. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6006) Kafka Consumer can lose state if queried partition list is incomplete on restore
[ https://issues.apache.org/jira/browse/FLINK-6006?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15924512#comment-15924512 ] ASF GitHub Bot commented on FLINK-6006: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3505 Merging .. > Kafka Consumer can lose state if queried partition list is incomplete on > restore > > > Key: FLINK-6006 > URL: https://issues.apache.org/jira/browse/FLINK-6006 > Project: Flink > Issue Type: Bug > Components: Kafka Connector, Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Blocker > Fix For: 1.1.5, 1.2.1 > > > In 1.1.x and 1.2.x, the FlinkKafkaConsumer performs partition list querying > on restore. Then, only restored state of partitions that exists in the > queried list is used to initialize the fetcher's state holders. > If in any case the returned partition list is incomplete (i.e. missing > partitions that existed before, perhaps due to temporary ZK / broker > downtime), then the state of the missing partitions is dropped and cannot be > recovered anymore. > In 1.3-SNAPSHOT, this is fixed by changes in FLINK-4280, so only 1.1 and 1.2 > is affected. > We can backport some of the behavioural changes there to 1.1 and 1.2. > Generally, we should not depend on the current partition list in Kafka when > restoring, but just restore all previous state into the fetcher's state > holders. > This would therefore also require some checking on how the consumer threads / > Kafka clients behave when its assigned partitions cannot be reached. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5713) Protect against NPE in WindowOperator window cleanup
[ https://issues.apache.org/jira/browse/FLINK-5713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15924507#comment-15924507 ] ASF GitHub Bot commented on FLINK-5713: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3535#discussion_r105956185 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java --- @@ -354,22 +354,27 @@ public void merge(W mergeResult, } }); - // drop if the window is already late - if (isLate(actualWindow)) { - mergingWindows.retireWindow(actualWindow); - continue; - } + context.key = key; + context.window = actualWindow; W stateWindow = mergingWindows.getStateWindow(actualWindow); if (stateWindow == null) { throw new IllegalStateException("Window " + window + " is not in in-flight window set."); } windowState.setCurrentNamespace(stateWindow); - windowState.add(element.getValue()); - context.key = key; - context.window = actualWindow; + // Drop if the window is already late. In rare cases (with a misbehaving + // WindowAssigner) it can happen that a window becomes late that already has + // state (contents, state and timers). That's why we first get the window state + // above and then drop everything. + if (isLate(actualWindow)) { + clearAllState(actualWindow, windowState, mergingWindows); + mergingWindows.persist(); --- End diff -- The null check is not needed here since since we know from the if block we're in (`if (windowAssigner instanceof MergingWindowAssigner)`) that we do in fact have merging windows. Now that I look at it, though, I realise that the `mergingWindows.persist()` call is not necessary because we already call it at the end of the if block. So thanks for making me notice! 😃 I moved the call out of `clearAllState()` in the first place because all places where `clearAllState()` are called already persist afterwards. See, for example, `onEventTime()` and `onProcessingTime()`. > Protect against NPE in WindowOperator window cleanup > > > Key: FLINK-5713 > URL: https://issues.apache.org/jira/browse/FLINK-5713 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.2.0 >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek > Fix For: 1.2.1 > > > Some (misbehaved) WindowAssigners can cause windows to be dropped from the > merging window set while a cleanup timer is still active. This will trigger a > NullPointerException when that timer fires. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3535: [FLINK-5713] Protect against NPE in WindowOperator...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3535#discussion_r105956185 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java --- @@ -354,22 +354,27 @@ public void merge(W mergeResult, } }); - // drop if the window is already late - if (isLate(actualWindow)) { - mergingWindows.retireWindow(actualWindow); - continue; - } + context.key = key; + context.window = actualWindow; W stateWindow = mergingWindows.getStateWindow(actualWindow); if (stateWindow == null) { throw new IllegalStateException("Window " + window + " is not in in-flight window set."); } windowState.setCurrentNamespace(stateWindow); - windowState.add(element.getValue()); - context.key = key; - context.window = actualWindow; + // Drop if the window is already late. In rare cases (with a misbehaving + // WindowAssigner) it can happen that a window becomes late that already has + // state (contents, state and timers). That's why we first get the window state + // above and then drop everything. + if (isLate(actualWindow)) { + clearAllState(actualWindow, windowState, mergingWindows); + mergingWindows.persist(); --- End diff -- The null check is not needed here since since we know from the if block we're in (`if (windowAssigner instanceof MergingWindowAssigner)`) that we do in fact have merging windows. Now that I look at it, though, I realise that the `mergingWindows.persist()` call is not necessary because we already call it at the end of the if block. So thanks for making me notice! ð I moved the call out of `clearAllState()` in the first place because all places where `clearAllState()` are called already persist afterwards. See, for example, `onEventTime()` and `onProcessingTime()`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5949) Flink on YARN checks for Kerberos credentials for non-Kerberos authentication methods
[ https://issues.apache.org/jira/browse/FLINK-5949?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15924505#comment-15924505 ] ASF GitHub Bot commented on FLINK-5949: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3528 Thanks for the review :-) Failing tests seem to be something instable with Maven. Merging this to `master` and `release-1.2` .. > Flink on YARN checks for Kerberos credentials for non-Kerberos authentication > methods > - > > Key: FLINK-5949 > URL: https://issues.apache.org/jira/browse/FLINK-5949 > Project: Flink > Issue Type: Bug > Components: Security, YARN >Affects Versions: 1.2.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Blocker > Fix For: 1.3.0, 1.2.1 > > > Reported in ML: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Yarn-and-MapR-Kerberos-issue-td11996.html > The problem is that the Flink on YARN client incorrectly assumes > {{UserGroupInformation.isSecurityEnabled()}} returns {{true}} only for > Kerberos authentication modes, whereas it actually returns {{true}} for other > kinds of authentications too. > We could make use of {{UserGroupInformation.getAuthenticationMethod()}} to > check for {{KERBEROS}} only. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5846) CEP: make the operators backwards compatible.
[ https://issues.apache.org/jira/browse/FLINK-5846?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15924489#comment-15924489 ] ASF GitHub Bot commented on FLINK-5846: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3445#discussion_r105936184 --- Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigrationTestBase.java --- @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.operator; + +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeutils.base.ByteSerializer; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.cep.Event; +import org.apache.flink.cep.SubEvent; +import org.apache.flink.cep.nfa.NFA; +import org.apache.flink.cep.nfa.compiler.NFACompiler; +import org.apache.flink.cep.pattern.Pattern; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; + +import java.net.URL; +import java.util.Map; +import java.util.concurrent.ConcurrentLinkedQueue; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * Base class for the migration tests between different versions. + * See {@link CEPMigration11to13Test} for an example of migration test + * between Flink-1.1 and Flink-1.3. + * */ --- End diff -- Should have a newline, or at least not two stars on one line 😉 > CEP: make the operators backwards compatible. > - > > Key: FLINK-5846 > URL: https://issues.apache.org/jira/browse/FLINK-5846 > Project: Flink > Issue Type: Sub-task > Components: CEP >Affects Versions: 1.3.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas > Fix For: 1.3.0 > > > This targets making the new CEP operators compatible with their previous > versions from Flink 1.1 and Flink 1.2. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3445: [FLINK-5846] [cep] Make the CEP operators backward...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3445#discussion_r105936184 --- Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigrationTestBase.java --- @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.operator; + +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeutils.base.ByteSerializer; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.cep.Event; +import org.apache.flink.cep.SubEvent; +import org.apache.flink.cep.nfa.NFA; +import org.apache.flink.cep.nfa.compiler.NFACompiler; +import org.apache.flink.cep.pattern.Pattern; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; + +import java.net.URL; +import java.util.Map; +import java.util.concurrent.ConcurrentLinkedQueue; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * Base class for the migration tests between different versions. + * See {@link CEPMigration11to13Test} for an example of migration test + * between Flink-1.1 and Flink-1.3. + * */ --- End diff -- Should have a newline, or at least not two stars on one line ð --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5846) CEP: make the operators backwards compatible.
[ https://issues.apache.org/jira/browse/FLINK-5846?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15924490#comment-15924490 ] ASF GitHub Bot commented on FLINK-5846: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3445#discussion_r105937053 --- Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java --- @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.operator; + +import org.junit.Test; + +public class CEPMigration11to13Test extends CEPMigrationTestBase { + + @Test + public void testKeyedCEPFunctionMigration11() throws Exception { + testKeyedCEPFunctionMigration("cep-keyed-savepoint-1.1"); + } + + @Test + public void testNonKeyedCEPFunctionMigration11() throws Exception { + testNonKeyedCEPFunctionMigration("cep-non-keyed-savepoint-1.1"); + } +} + +/* +FLINK 1.1 CODE TO PRODUCE THE SAVEPOINTS. (INCLUDE ALSO THE PATTERN CODE AT THE BOTTOM FOR BOTH CASES) + +@Test +public void keyedCEPOperatorSavepointGen() throws Exception { + + KeySelector keySelector = new KeySelector() { + private static final long serialVersionUID = -4873366487571254798L; + + @Override + public Integer getKey(Event value) throws Exception { + return value.getId(); + } + }; + + OneInputStreamOperatorTestHarness> harness = new OneInputStreamOperatorTestHarness<>( + new KeyedCEPPatternOperator<>( + Event.createTypeSerializer(), + false, + keySelector, + IntSerializer.INSTANCE, + new NFAFactory())); + harness.configureForKeyedStream(keySelector, BasicTypeInfo.INT_TYPE_INFO); + harness.open(); + + Event startEvent = new Event(42, "start", 1.0); + SubEvent middleEvent = new SubEvent(42, "foo", 1.0, 10.0); + Event endEvent= new Event(42, "end", 1.0); + + harness.processElement(new StreamRecord(startEvent, 1)); + harness.processElement(new StreamRecord(new Event(42, "foobar", 1.0), 2)); + harness.processElement(new StreamRecord(new SubEvent(42, "barfoo", 1.0, 5.0), 3)); + + harness.processWatermark(new Watermark(2)); + + // simulate snapshot/restore with empty element queue but NFA state + StreamTaskState snapshot = harness.snapshot(1, 1); + + FileOutputStream out = new FileOutputStream("/Users/kkloudas/Desktop/cep-keyed-savepoint-1.1"); --- End diff -- This should be something like `"src/test/resources/cep-keyed-savepoint-1.1"`, same for the other path down below. With this, it would generate directly into the directory where the test currently is and where the test expects to find it. > CEP: make the operators backwards compatible. > - > > Key: FLINK-5846 > URL: https://issues.apache.org/jira/browse/FLINK-5846 > Project: Flink > Issue Type: Sub-task > Components: CEP >Affects Versions: 1.3.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas > Fix For: 1.3.0 > > > This targets making the new CEP operators compatible with their previous > versions from Flink 1.1 and Flink 1.2. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3445: [FLINK-5846] [cep] Make the CEP operators backward...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3445#discussion_r105937053 --- Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java --- @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.operator; + +import org.junit.Test; + +public class CEPMigration11to13Test extends CEPMigrationTestBase { + + @Test + public void testKeyedCEPFunctionMigration11() throws Exception { + testKeyedCEPFunctionMigration("cep-keyed-savepoint-1.1"); + } + + @Test + public void testNonKeyedCEPFunctionMigration11() throws Exception { + testNonKeyedCEPFunctionMigration("cep-non-keyed-savepoint-1.1"); + } +} + +/* +FLINK 1.1 CODE TO PRODUCE THE SAVEPOINTS. (INCLUDE ALSO THE PATTERN CODE AT THE BOTTOM FOR BOTH CASES) + +@Test +public void keyedCEPOperatorSavepointGen() throws Exception { + + KeySelector keySelector = new KeySelector() { + private static final long serialVersionUID = -4873366487571254798L; + + @Override + public Integer getKey(Event value) throws Exception { + return value.getId(); + } + }; + + OneInputStreamOperatorTestHarness> harness = new OneInputStreamOperatorTestHarness<>( + new KeyedCEPPatternOperator<>( + Event.createTypeSerializer(), + false, + keySelector, + IntSerializer.INSTANCE, + new NFAFactory())); + harness.configureForKeyedStream(keySelector, BasicTypeInfo.INT_TYPE_INFO); + harness.open(); + + Event startEvent = new Event(42, "start", 1.0); + SubEvent middleEvent = new SubEvent(42, "foo", 1.0, 10.0); + Event endEvent= new Event(42, "end", 1.0); + + harness.processElement(new StreamRecord(startEvent, 1)); + harness.processElement(new StreamRecord(new Event(42, "foobar", 1.0), 2)); + harness.processElement(new StreamRecord(new SubEvent(42, "barfoo", 1.0, 5.0), 3)); + + harness.processWatermark(new Watermark(2)); + + // simulate snapshot/restore with empty element queue but NFA state + StreamTaskState snapshot = harness.snapshot(1, 1); + + FileOutputStream out = new FileOutputStream("/Users/kkloudas/Desktop/cep-keyed-savepoint-1.1"); --- End diff -- This should be something like `"src/test/resources/cep-keyed-savepoint-1.1"`, same for the other path down below. With this, it would generate directly into the directory where the test currently is and where the test expects to find it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-6049) Parallelize execution of (async) snapshots in AsyncCheckpointRunnable
Stefan Richter created FLINK-6049: - Summary: Parallelize execution of (async) snapshots in AsyncCheckpointRunnable Key: FLINK-6049 URL: https://issues.apache.org/jira/browse/FLINK-6049 Project: Flink Issue Type: Improvement Components: State Backends, Checkpointing Affects Versions: 1.3.0 Reporter: Stefan Richter Fix For: 1.3.0 After the changes from [FLINK-6048], it would make sense to parallelize the execution of the `RunnableFuture`s of (async) snapshot in `AsyncCheckpointRunnable`. Currently, `AsyncCheckpointRunnable` is a thread that runs parallel, but executes each snapshot sequentially. The reason is, that previously only keyed state backends had support for async snapshots. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-3930) Implement Service-Level Authorization
[ https://issues.apache.org/jira/browse/FLINK-3930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15924477#comment-15924477 ] ASF GitHub Bot commented on FLINK-3930: --- Github user vijikarthi commented on the issue: https://github.com/apache/flink/pull/2425 @StephanEwen The shared secret serves can be considered as an additional security extension on top of TLS integration, thus it designates only an authorized identity to execute actions on a running cluster. > Implement Service-Level Authorization > - > > Key: FLINK-3930 > URL: https://issues.apache.org/jira/browse/FLINK-3930 > Project: Flink > Issue Type: New Feature > Components: Security >Reporter: Eron Wright >Assignee: Vijay Srinivasaraghavan > Labels: security > Original Estimate: 672h > Remaining Estimate: 672h > > _This issue is part of a series of improvements detailed in the [Secure Data > Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing] > design doc._ > Service-level authorization is the initial authorization mechanism to ensure > clients (or servers) connecting to the Flink cluster are authorized to do so. > The purpose is to prevent a cluster from being used by an unauthorized > user, whether to execute jobs, disrupt cluster functionality, or gain access > to secrets stored within the cluster. > Implement service-level authorization as described in the design doc. > - Introduce a shared secret cookie > - Enable Akka security cookie > - Implement data transfer authentication > - Secure the web dashboard -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #2425: FLINK-3930 Added shared secret based authorization for Fl...
Github user vijikarthi commented on the issue: https://github.com/apache/flink/pull/2425 @StephanEwen The shared secret serves can be considered as an additional security extension on top of TLS integration, thus it designates only an authorized identity to execute actions on a running cluster. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6048) Asynchronous snapshots for heap-based operator state backends
[ https://issues.apache.org/jira/browse/FLINK-6048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15924467#comment-15924467 ] ASF GitHub Bot commented on FLINK-6048: --- Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/3536 CC @aljoscha @StephanEwen > Asynchronous snapshots for heap-based operator state backends > - > > Key: FLINK-6048 > URL: https://issues.apache.org/jira/browse/FLINK-6048 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Affects Versions: 1.3.0 >Reporter: Stefan Richter >Assignee: Stefan Richter > Fix For: 1.3.0 > > > The synchronous checkpointing mechanism of heap-based operator state backends > blocks element processing for the duration of the checkpoint. > We could implement an heap-based operator state backend that allows for > asynchronous checkpoints. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6048) Asynchronous snapshots for heap-based operator state backends
[ https://issues.apache.org/jira/browse/FLINK-6048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15924466#comment-15924466 ] ASF GitHub Bot commented on FLINK-6048: --- GitHub user StefanRRichter opened a pull request: https://github.com/apache/flink/pull/3536 [FLINK-6048] Asynchronous snapshots for heap-based OperatorStateBackend This PR introduces asynchronous snapshots for the heap-based `DefaultOperatorStateBackend`. Compared to the asynchronous snapshots for the heap-based keyed state backend, this implementation is rather simple and eagerly generates a deep in-memory copy of the state before running the asynchronous part of the snapshot that writes to the filesystem. Note that this PR should later sit on top of PR #3483 and piggyback on the async-flag that was introduced. Furthermore, we could have a followup that actually parallelizes checkpointing the different async backends in `AsyncCheckpointRunnable`. Previously, this was not needed because there have only been keyed state backends o those have been the only async backends. You can merge this pull request into a Git repository by running: $ git pull https://github.com/StefanRRichter/flink async-opstatebackend Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3536.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3536 commit ff0930066fd6f9a5d54c548eff73fc4f34141b6a Author: Stefan Richter Date: 2017-03-14T13:25:57Z [FLINK-6048] Implement async snapshots for DefaultOperatorStateBackend commit f38fdf7524039f2d87a4594d275959d874c4a198 Author: Stefan Richter Date: 2017-03-14T15:07:06Z Unit tests for [FLINK-6048] > Asynchronous snapshots for heap-based operator state backends > - > > Key: FLINK-6048 > URL: https://issues.apache.org/jira/browse/FLINK-6048 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Affects Versions: 1.3.0 >Reporter: Stefan Richter >Assignee: Stefan Richter > Fix For: 1.3.0 > > > The synchronous checkpointing mechanism of heap-based operator state backends > blocks element processing for the duration of the checkpoint. > We could implement an heap-based operator state backend that allows for > asynchronous checkpoints. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3536: [FLINK-6048] Asynchronous snapshots for heap-based Operat...
Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/3536 CC @aljoscha @StephanEwen --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3536: [FLINK-6048] Asynchronous snapshots for heap-based...
GitHub user StefanRRichter opened a pull request: https://github.com/apache/flink/pull/3536 [FLINK-6048] Asynchronous snapshots for heap-based OperatorStateBackend This PR introduces asynchronous snapshots for the heap-based `DefaultOperatorStateBackend`. Compared to the asynchronous snapshots for the heap-based keyed state backend, this implementation is rather simple and eagerly generates a deep in-memory copy of the state before running the asynchronous part of the snapshot that writes to the filesystem. Note that this PR should later sit on top of PR #3483 and piggyback on the async-flag that was introduced. Furthermore, we could have a followup that actually parallelizes checkpointing the different async backends in `AsyncCheckpointRunnable`. Previously, this was not needed because there have only been keyed state backends o those have been the only async backends. You can merge this pull request into a Git repository by running: $ git pull https://github.com/StefanRRichter/flink async-opstatebackend Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3536.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3536 commit ff0930066fd6f9a5d54c548eff73fc4f34141b6a Author: Stefan Richter Date: 2017-03-14T13:25:57Z [FLINK-6048] Implement async snapshots for DefaultOperatorStateBackend commit f38fdf7524039f2d87a4594d275959d874c4a198 Author: Stefan Richter Date: 2017-03-14T15:07:06Z Unit tests for [FLINK-6048] --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-6048) Asynchronous snapshots for heap-based operator state backends
Stefan Richter created FLINK-6048: - Summary: Asynchronous snapshots for heap-based operator state backends Key: FLINK-6048 URL: https://issues.apache.org/jira/browse/FLINK-6048 Project: Flink Issue Type: New Feature Components: State Backends, Checkpointing Affects Versions: 1.3.0 Reporter: Stefan Richter Assignee: Stefan Richter Fix For: 1.3.0 The synchronous checkpointing mechanism of heap-based operator state backends blocks element processing for the duration of the checkpoint. We could implement an heap-based operator state backend that allows for asynchronous checkpoints. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6047) Master Jira for "Retraction for Flink Streaming"
Shaoxuan Wang created FLINK-6047: Summary: Master Jira for "Retraction for Flink Streaming" Key: FLINK-6047 URL: https://issues.apache.org/jira/browse/FLINK-6047 Project: Flink Issue Type: New Feature Reporter: Shaoxuan Wang Assignee: Shaoxuan Wang [Design doc]: https://docs.google.com/document/d/18XlGPcfsGbnPSApRipJDLPg5IFNGTQjnz7emkVpZlkw [Introduction]: "Retraction" is an important building block for data streaming to refine the early fired results in streaming. “Early firing” are very common and widely used in many streaming scenarios, for instance “window-less” or unbounded aggregate and stream-stream inner join, windowed (with early firing) aggregate and stream-stream inner join. There are mainly two cases that require retractions: 1) update on the keyed table (the key is either a primaryKey (PK) on source table, or a groupKey/partitionKey in an aggregate); 2) When dynamic windows (e.g., session window) are in use, the new value may be replacing more than one previous window due to window merging. To the best of our knowledge, the retraction for the early fired streaming results has never been practically solved before. In this proposal, we develop a retraction solution and explain how it works for the problem of “update on the keyed table”. The same solution can be easily extended for the dynamic windows merging, as the key component of retraction - how to refine an early fired results - is the same across different problems. [Proposed Jiras]: Implement decoration phase for predicated logical plan rewriting after volcano optimization phase Add source with table primary key and replace table property Add sink tableInsert and NeedRetract property Implement the retraction for partitioned unbounded over window aggregate Implement the retraction for stream-stream inner join Implement the retraction for the early firing window Implement the retraction for the dynamic window with early firing -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r105943415 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java --- @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.history; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.configuration.HistoryServerOptions; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.filecache.FileCache; +import org.apache.flink.runtime.util.ExecutorThreadFactory; +import org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler; +import org.apache.flink.util.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.nio.file.FileAlreadyExistsException; +import java.nio.file.Files; +import java.nio.file.NoSuchFileException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TimerTask; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +public class HistoryServerArchiveFetcher { + + private static final Logger LOG = LoggerFactory.getLogger(HistoryServerArchiveFetcher.class); + + private static final JsonFactory jacksonFactory = new JsonFactory(); + private static final ObjectMapper mapper = new ObjectMapper(); + + private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor( + new ExecutorThreadFactory("Flink-HistoryServer-ArchiveFetcher")); + private final JobArchiveFetcherTask fetcherTask; + private final long refreshIntervalMillis; + + HistoryServerArchiveFetcher(long refreshIntervalMillis, List refreshDirs, File webDir) { + this.refreshIntervalMillis = refreshIntervalMillis; + this.fetcherTask = new JobArchiveFetcherTask(refreshDirs, webDir); + if (LOG.isInfoEnabled()) { + for (HistoryServer.RefreshLocation refreshDir : refreshDirs) { + LOG.info("Monitoring directory {} for archived jobs.", refreshDir.getPath()); + } + } + } + + void start() { + executor.scheduleWithFixedDelay(fetcherTask, 0, refreshIntervalMillis, TimeUnit.MILLISECONDS); + } + + void stop() { + executor.shutdown(); + + try { + if (!executor.awaitTermination(1, TimeUnit.SECONDS)) { + executor.shutdownNow(); + } + } catch (InterruptedException ignored) { + executor.shutdownNow(); + } + } + + /** +* {@link TimerTask} that polls the directories configured as {@link HistoryServerOptions#HISTORY_SERVER_DIRS} for +* new job archives. +*/ + static class JobArchiveFetcherTask extends TimerTask { + private final List refreshDirs; + /** Map containing the JobID of all fetched jobs and the refreshDir from with they originate. */ + private final Map cachedArchives; + private final File webDir; + private final File webTmpDir; + private final File webJobDir; + private final File webOverviewDir; + + private static final String JSON_FILE_ENDING = ".json"; + + JobArchiveFetcherTask(List refreshDirs, File webDir) { +
[jira] [Commented] (FLINK-1579) Create a Flink History Server
[ https://issues.apache.org/jira/browse/FLINK-1579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15924427#comment-15924427 ] ASF GitHub Bot commented on FLINK-1579: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r105943415 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java --- @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.history; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.configuration.HistoryServerOptions; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.filecache.FileCache; +import org.apache.flink.runtime.util.ExecutorThreadFactory; +import org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler; +import org.apache.flink.util.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.nio.file.FileAlreadyExistsException; +import java.nio.file.Files; +import java.nio.file.NoSuchFileException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TimerTask; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +public class HistoryServerArchiveFetcher { + + private static final Logger LOG = LoggerFactory.getLogger(HistoryServerArchiveFetcher.class); + + private static final JsonFactory jacksonFactory = new JsonFactory(); + private static final ObjectMapper mapper = new ObjectMapper(); + + private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor( + new ExecutorThreadFactory("Flink-HistoryServer-ArchiveFetcher")); + private final JobArchiveFetcherTask fetcherTask; + private final long refreshIntervalMillis; + + HistoryServerArchiveFetcher(long refreshIntervalMillis, List refreshDirs, File webDir) { + this.refreshIntervalMillis = refreshIntervalMillis; + this.fetcherTask = new JobArchiveFetcherTask(refreshDirs, webDir); + if (LOG.isInfoEnabled()) { + for (HistoryServer.RefreshLocation refreshDir : refreshDirs) { + LOG.info("Monitoring directory {} for archived jobs.", refreshDir.getPath()); + } + } + } + + void start() { + executor.scheduleWithFixedDelay(fetcherTask, 0, refreshIntervalMillis, TimeUnit.MILLISECONDS); + } + + void stop() { + executor.shutdown(); + + try { + if (!executor.awaitTermination(1, TimeUnit.SECONDS)) { + executor.shutdownNow(); + } + } catch (InterruptedException ignored) { + executor.shutdownNow(); + } + } + + /** +* {@link TimerTask} that polls the directories configured as {@link HistoryServerOptions#HISTORY_SERVER_DIRS} for +* new job archives. +*/ + static class JobArchiveFetcherTask extends TimerTask { + private final List refreshDirs; + /** Map containing the JobID of all fetched jobs and the refreshDir from with they originate. */ + private final Map cachedArchives; + private final File webDir; + private final File webTmpDir; +
[jira] [Commented] (FLINK-5985) Flink treats every task as stateful (making topology changes impossible)
[ https://issues.apache.org/jira/browse/FLINK-5985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15924428#comment-15924428 ] ASF GitHub Bot commented on FLINK-5985: --- Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/3523 Sure, I just quickly prepared a backport here: https://github.com/StefanRRichter/flink/tree/FLINK-5985-backport-to-1.2 > Flink treats every task as stateful (making topology changes impossible) > > > Key: FLINK-5985 > URL: https://issues.apache.org/jira/browse/FLINK-5985 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.2.0 >Reporter: Gyula Fora >Priority: Critical > > It seems that Flink treats every Task as stateful so changing the topology > is not possible without setting uid on every single operator. > If the topology has an iteration this is virtually impossible (or at least > gets super hacky) -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3523: [FLINK-5985] Report no task states for stateless tasks in...
Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/3523 Sure, I just quickly prepared a backport here: https://github.com/StefanRRichter/flink/tree/FLINK-5985-backport-to-1.2 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r105937756 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandler.java --- @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.history; + +import io.netty.channel.ChannelHandler; +import org.apache.flink.runtime.webmonitor.files.AbstractStaticFileServerHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; + +@ChannelHandler.Sharable +public class HistoryServerStaticFileServerHandler extends AbstractStaticFileServerHandler { --- End diff -- all right. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3535: [FLINK-5713] Protect against NPE in WindowOperator...
Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3535#discussion_r105931848 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java --- @@ -354,22 +354,27 @@ public void merge(W mergeResult, } }); - // drop if the window is already late - if (isLate(actualWindow)) { - mergingWindows.retireWindow(actualWindow); - continue; - } + context.key = key; + context.window = actualWindow; W stateWindow = mergingWindows.getStateWindow(actualWindow); if (stateWindow == null) { throw new IllegalStateException("Window " + window + " is not in in-flight window set."); } windowState.setCurrentNamespace(stateWindow); - windowState.add(element.getValue()); - context.key = key; - context.window = actualWindow; + // Drop if the window is already late. In rare cases (with a misbehaving + // WindowAssigner) it can happen that a window becomes late that already has + // state (contents, state and timers). That's why we first get the window state + // above and then drop everything. + if (isLate(actualWindow)) { + clearAllState(actualWindow, windowState, mergingWindows); + mergingWindows.persist(); --- End diff -- Why move `mergingWindows.persist()` from `clearAllState` to here, And we need not do the null check? How about ``` if (mergingWindows != null) { mergingWindows.persist(); } ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1579) Create a Flink History Server
[ https://issues.apache.org/jira/browse/FLINK-1579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15924397#comment-15924397 ] ASF GitHub Bot commented on FLINK-1579: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r105937756 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandler.java --- @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.history; + +import io.netty.channel.ChannelHandler; +import org.apache.flink.runtime.webmonitor.files.AbstractStaticFileServerHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; + +@ChannelHandler.Sharable +public class HistoryServerStaticFileServerHandler extends AbstractStaticFileServerHandler { --- End diff -- all right. > Create a Flink History Server > - > > Key: FLINK-1579 > URL: https://issues.apache.org/jira/browse/FLINK-1579 > Project: Flink > Issue Type: New Feature > Components: Distributed Coordination >Affects Versions: 0.9 >Reporter: Robert Metzger >Assignee: Chesnay Schepler > > Right now its not possible to analyze the job results for jobs that ran on > YARN, because we'll loose the information once the JobManager has stopped. > Therefore, I propose to implement a "Flink History Server" which serves the > results from these jobs. > I haven't started thinking about the implementation, but I suspect it > involves some JSON files stored in HDFS :) -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5713) Protect against NPE in WindowOperator window cleanup
[ https://issues.apache.org/jira/browse/FLINK-5713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15924386#comment-15924386 ] ASF GitHub Bot commented on FLINK-5713: --- Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3535#discussion_r105931848 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java --- @@ -354,22 +354,27 @@ public void merge(W mergeResult, } }); - // drop if the window is already late - if (isLate(actualWindow)) { - mergingWindows.retireWindow(actualWindow); - continue; - } + context.key = key; + context.window = actualWindow; W stateWindow = mergingWindows.getStateWindow(actualWindow); if (stateWindow == null) { throw new IllegalStateException("Window " + window + " is not in in-flight window set."); } windowState.setCurrentNamespace(stateWindow); - windowState.add(element.getValue()); - context.key = key; - context.window = actualWindow; + // Drop if the window is already late. In rare cases (with a misbehaving + // WindowAssigner) it can happen that a window becomes late that already has + // state (contents, state and timers). That's why we first get the window state + // above and then drop everything. + if (isLate(actualWindow)) { + clearAllState(actualWindow, windowState, mergingWindows); + mergingWindows.persist(); --- End diff -- Why move `mergingWindows.persist()` from `clearAllState` to here, And we need not do the null check? How about ``` if (mergingWindows != null) { mergingWindows.persist(); } ``` > Protect against NPE in WindowOperator window cleanup > > > Key: FLINK-5713 > URL: https://issues.apache.org/jira/browse/FLINK-5713 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.2.0 >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek > Fix For: 1.2.1 > > > Some (misbehaved) WindowAssigners can cause windows to be dropped from the > merging window set while a cleanup timer is still active. This will trigger a > NullPointerException when that timer fires. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-1579) Create a Flink History Server
[ https://issues.apache.org/jira/browse/FLINK-1579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15924377#comment-15924377 ] ASF GitHub Bot commented on FLINK-1579: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r105934771 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/AbstractStaticFileServerHandler.java --- @@ -0,0 +1,372 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.files; + +/* + * This code is based on the "HttpStaticFileServerHandler" from the + * Netty project's HTTP server example. + * + * See http://netty.io and + * https://github.com/netty/netty/blob/4.0/example/src/main/java/io/netty/example/http/file/HttpStaticFileServerHandler.java + */ + +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.DefaultFileRegion; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.handler.codec.http.DefaultFullHttpResponse; +import io.netty.handler.codec.http.DefaultHttpResponse; +import io.netty.handler.codec.http.FullHttpResponse; +import io.netty.handler.codec.http.HttpChunkedInput; +import io.netty.handler.codec.http.HttpHeaders; +import io.netty.handler.codec.http.HttpRequest; +import io.netty.handler.codec.http.HttpResponse; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.handler.codec.http.LastHttpContent; +import io.netty.handler.codec.http.router.Routed; +import io.netty.handler.ssl.SslHandler; +import io.netty.handler.stream.ChunkedFile; +import io.netty.util.CharsetUtil; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.io.RandomAccessFile; +import java.net.URI; +import java.net.URL; +import java.nio.file.Files; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Calendar; +import java.util.Date; +import java.util.GregorianCalendar; +import java.util.Locale; +import java.util.TimeZone; + +import static io.netty.handler.codec.http.HttpHeaders.Names.CACHE_CONTROL; +import static io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION; +import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE; +import static io.netty.handler.codec.http.HttpHeaders.Names.DATE; +import static io.netty.handler.codec.http.HttpHeaders.Names.EXPIRES; +import static io.netty.handler.codec.http.HttpHeaders.Names.IF_MODIFIED_SINCE; +import static io.netty.handler.codec.http.HttpHeaders.Names.LAST_MODIFIED; +import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR; +import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND; +import static io.netty.handler.codec.http.HttpResponseStatus.NOT_MODIFIED; +import static io.netty.handler.codec.http.HttpResponseStatus.OK; +import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1; + +/** + * Simple file server handler that serves requests to web frontend's static files, such as + * HTML, CSS, or JS files. + * + * This code is based on the "HttpStaticFileServerHandler" from the Netty project's HTTP server + * example. + * + * For every incoming requests the {@link Routed#path()} is pre-processed in + * {@link AbstractStaticFileServerHandler#preProcessRequestPath(String)}. + * + * This path is then interp
[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r105934771 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/AbstractStaticFileServerHandler.java --- @@ -0,0 +1,372 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.files; + +/* + * This code is based on the "HttpStaticFileServerHandler" from the + * Netty project's HTTP server example. + * + * See http://netty.io and + * https://github.com/netty/netty/blob/4.0/example/src/main/java/io/netty/example/http/file/HttpStaticFileServerHandler.java + */ + +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.DefaultFileRegion; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.handler.codec.http.DefaultFullHttpResponse; +import io.netty.handler.codec.http.DefaultHttpResponse; +import io.netty.handler.codec.http.FullHttpResponse; +import io.netty.handler.codec.http.HttpChunkedInput; +import io.netty.handler.codec.http.HttpHeaders; +import io.netty.handler.codec.http.HttpRequest; +import io.netty.handler.codec.http.HttpResponse; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.handler.codec.http.LastHttpContent; +import io.netty.handler.codec.http.router.Routed; +import io.netty.handler.ssl.SslHandler; +import io.netty.handler.stream.ChunkedFile; +import io.netty.util.CharsetUtil; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.io.RandomAccessFile; +import java.net.URI; +import java.net.URL; +import java.nio.file.Files; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Calendar; +import java.util.Date; +import java.util.GregorianCalendar; +import java.util.Locale; +import java.util.TimeZone; + +import static io.netty.handler.codec.http.HttpHeaders.Names.CACHE_CONTROL; +import static io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION; +import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE; +import static io.netty.handler.codec.http.HttpHeaders.Names.DATE; +import static io.netty.handler.codec.http.HttpHeaders.Names.EXPIRES; +import static io.netty.handler.codec.http.HttpHeaders.Names.IF_MODIFIED_SINCE; +import static io.netty.handler.codec.http.HttpHeaders.Names.LAST_MODIFIED; +import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR; +import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND; +import static io.netty.handler.codec.http.HttpResponseStatus.NOT_MODIFIED; +import static io.netty.handler.codec.http.HttpResponseStatus.OK; +import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1; + +/** + * Simple file server handler that serves requests to web frontend's static files, such as + * HTML, CSS, or JS files. + * + * This code is based on the "HttpStaticFileServerHandler" from the Netty project's HTTP server + * example. + * + * For every incoming requests the {@link Routed#path()} is pre-processed in + * {@link AbstractStaticFileServerHandler#preProcessRequestPath(String)}. + * + * This path is then interpreted as a relative file path, with the configured rootDir being the parent. + * + * If no file exists for this path, another (optional) pre-processing step is executed in + * {@link AbstractStaticFileServerHandler#preProcessFilePath(String)}
[GitHub] flink issue #3406: [flink-5568] [Table API & SQL]Introduce interface for cat...
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3406 Thanks for the update @beyond1920. The PR looks good to me. @twalthr do you also want to have a look at this PR? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-6046) Add support for oversized messages during deployment
[ https://issues.apache.org/jira/browse/FLINK-6046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nico Kruber updated FLINK-6046: --- Description: This is the non-FLIP6 version of FLINK-4346, restricted to deployment messages: Currently, messages larger than the maximum Akka Framesize cause an error when being transported. We should add a way to pass messages that are larger than {{akka.framesize}} as may happen for task deployments via the {{TaskDeploymentDescriptor}}. We should use the {{BlobServer}} to offload big data items (if possible) and make use of any potential distributed file system behind. This way, not only do we avoid the akka framesize restriction, but may also be able to speed up deployment. I suggest the following changes: - the sender, i.e. the {{Execution}} class, tries to store the serialized job information and serialized task information (if oversized) from the {{TaskDeploymentDescriptor}} (tdd) on the {{BlobServer}} as a single {{NAME_ADDRESSABLE}} blob under its job ID (if this does not work, we send the whole tdd as usual via akka) - if stored in a blob, these data items are removed from the tdd - the receiver, i.e. the {{TaskManager}} class, tries to retrieve any offloaded data after receiving the {{TaskDeploymentDescriptor}} from akka; it re-assembles the original tdd - the stored blob may be deleted after re-assembly of the tdd Further (future) changes may include: - separating the serialized job information and serialized task information into two files and re-use the first one for all tasks - not re-deploying these two during job recovery (if possible) - then, as all other {{NAME_ADDRESSABLE}} blobs, these offloaded blobs may be removed when the job enters a final state instead was: This is the non-FLIP6 version of FLINK-4346, restricted to deployment messages: Currently, messages larger than the maximum Akka Framesize cause an error when being transported. We should add a way to pass messages that are larger than {{akka.framesize}} as may happen for task deployments via the {{TaskDeploymentDescriptor}}. We should use the {{BlobServer}} to offload big data items (if possible) and make use of any potential distributed file system behind. This way, not only do we avoid the akka framesize restriction, but may also be able to speed up deployment. I suggest the following changes: - the sender, i.e. the {{Execution}} class, tries to store the serialized job information and serialized task information (if oversized) from the {{TaskDeploymentDescriptor}} (tdd) on the {{BlobServer}} as a single {{NAME_ADDRESSABLE}} blob under its job ID (if this does not work, we send the whole tdd as usual via akka) - if stored in a blob, these data items are removed from the tdd - the receiver, i.e. the {{TaskManager}} class, tries to retrieve any offloaded data after receiving the {{TaskDeploymentDescriptor}} from akka; it re-assembles the original tdd - as all {{NAME_ADDRESSABLE}} blobs, these offloaded blobs are removed when the job enters a final state Further (future) changes may include: - separating the serialized job information and serialized task information into two files and re-use the first one for all tasks - not re-deploying these two during job recovery (if possible) > Add support for oversized messages during deployment > > > Key: FLINK-6046 > URL: https://issues.apache.org/jira/browse/FLINK-6046 > Project: Flink > Issue Type: New Feature > Components: Distributed Coordination >Reporter: Nico Kruber >Assignee: Nico Kruber > > This is the non-FLIP6 version of FLINK-4346, restricted to deployment > messages: > Currently, messages larger than the maximum Akka Framesize cause an error > when being transported. We should add a way to pass messages that are larger > than {{akka.framesize}} as may happen for task deployments via the > {{TaskDeploymentDescriptor}}. > We should use the {{BlobServer}} to offload big data items (if possible) and > make use of any potential distributed file system behind. This way, not only > do we avoid the akka framesize restriction, but may also be able to speed up > deployment. > I suggest the following changes: > - the sender, i.e. the {{Execution}} class, tries to store the serialized > job information and serialized task information (if oversized) from the > {{TaskDeploymentDescriptor}} (tdd) on the {{BlobServer}} as a single > {{NAME_ADDRESSABLE}} blob under its job ID (if this does not work, we send > the whole tdd as usual via akka) > - if stored in a blob, these data items are removed from the tdd > - the receiver, i.e. the {{TaskManager}} class, tries to retrieve any > offloaded data after receiving the {{TaskDeploymentDescriptor}} from akka; it > re-assembles the or
[jira] [Commented] (FLINK-1579) Create a Flink History Server
[ https://issues.apache.org/jira/browse/FLINK-1579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15924359#comment-15924359 ] ASF GitHub Bot commented on FLINK-1579: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r105931282 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java --- @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.history; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.configuration.HistoryServerOptions; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.filecache.FileCache; +import org.apache.flink.runtime.util.ExecutorThreadFactory; +import org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler; +import org.apache.flink.util.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.nio.file.FileAlreadyExistsException; +import java.nio.file.Files; +import java.nio.file.NoSuchFileException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TimerTask; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +public class HistoryServerArchiveFetcher { + + private static final Logger LOG = LoggerFactory.getLogger(HistoryServerArchiveFetcher.class); + + private static final JsonFactory jacksonFactory = new JsonFactory(); + private static final ObjectMapper mapper = new ObjectMapper(); + + private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor( + new ExecutorThreadFactory("Flink-HistoryServer-ArchiveFetcher")); + private final JobArchiveFetcherTask fetcherTask; + private final long refreshIntervalMillis; + + HistoryServerArchiveFetcher(long refreshIntervalMillis, List refreshDirs, File webDir) { + this.refreshIntervalMillis = refreshIntervalMillis; + this.fetcherTask = new JobArchiveFetcherTask(refreshDirs, webDir); + if (LOG.isInfoEnabled()) { + for (HistoryServer.RefreshLocation refreshDir : refreshDirs) { + LOG.info("Monitoring directory {} for archived jobs.", refreshDir.getPath()); + } + } + } + + void start() { + executor.scheduleWithFixedDelay(fetcherTask, 0, refreshIntervalMillis, TimeUnit.MILLISECONDS); + } + + void stop() { + executor.shutdown(); + + try { + if (!executor.awaitTermination(1, TimeUnit.SECONDS)) { + executor.shutdownNow(); + } + } catch (InterruptedException ignored) { + executor.shutdownNow(); + } + } + + /** +* {@link TimerTask} that polls the directories configured as {@link HistoryServerOptions#HISTORY_SERVER_DIRS} for +* new job archives. +*/ + static class JobArchiveFetcherTask extends TimerTask { + private final List refreshDirs; + /** Map containing the JobID of all fetched jobs and the refreshDir from with they originate. */ + private final Map cachedArchives; + private final File webDir; + private final File webTmpDir; +