[jira] [Commented] (FLINK-3657) Change access of DataSetUtils.countElements() to 'public'
[ https://issues.apache.org/jira/browse/FLINK-3657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15207953#comment-15207953 ] ASF GitHub Bot commented on FLINK-3657: --- GitHub user smarthi opened a pull request: https://github.com/apache/flink/pull/1829 FLINK-3657: Change access of DataSetUtils.countElements() to 'public' You can merge this pull request into a Git repository by running: $ git pull https://github.com/smarthi/flink Flink-3657 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1829.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1829 commit d5ba6d90d3cdd6b93f962f2d9226e443810b08b0 Author: smarthi Date: 2016-03-23T06:28:28Z FLINK-3657: Change access of DataSetUtils.countElements() to 'public' > Change access of DataSetUtils.countElements() to 'public' > -- > > Key: FLINK-3657 > URL: https://issues.apache.org/jira/browse/FLINK-3657 > Project: Flink > Issue Type: Improvement > Components: DataSet API >Affects Versions: 1.0.0 >Reporter: Suneel Marthi >Assignee: Suneel Marthi >Priority: Minor > Fix For: 1.0.1 > > > The access of DatasetUtils.countElements() is presently 'private', change > that to be 'public'. We happened to be replicating the functionality in our > project and realized the method already existed in Flink. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: FLINK-3657: Change access of DataSetUtils.coun...
GitHub user smarthi opened a pull request: https://github.com/apache/flink/pull/1829 FLINK-3657: Change access of DataSetUtils.countElements() to 'public' You can merge this pull request into a Git repository by running: $ git pull https://github.com/smarthi/flink Flink-3657 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1829.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1829 commit d5ba6d90d3cdd6b93f962f2d9226e443810b08b0 Author: smarthi Date: 2016-03-23T06:28:28Z FLINK-3657: Change access of DataSetUtils.countElements() to 'public' --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-3657) Change access of DataSetUtils.countElements() to 'public'
Suneel Marthi created FLINK-3657: Summary: Change access of DataSetUtils.countElements() to 'public' Key: FLINK-3657 URL: https://issues.apache.org/jira/browse/FLINK-3657 Project: Flink Issue Type: Improvement Components: DataSet API Affects Versions: 1.0.0 Reporter: Suneel Marthi Assignee: Suneel Marthi Priority: Minor Fix For: 1.0.1 The access of DatasetUtils.countElements() is presently 'private', change that to be 'public'. We happened to be replicating the functionality in our project and realized the method already existed in Flink. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2997) Support range partition with user customized data distribution.
[ https://issues.apache.org/jira/browse/FLINK-2997?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15207407#comment-15207407 ] ASF GitHub Bot commented on FLINK-2997: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1776#discussion_r57078308 --- Diff: flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java --- @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.javaApiOperators; + +import org.apache.flink.api.common.distributions.DataDistribution; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.functions.RichMapPartitionFunction; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.DiscardingOutputFormat; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.utils.DataSetUtils; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.test.javaApiOperators.util.CollectionDataSets; +import org.apache.flink.util.Collector; +import org.junit.Test; + + +import java.io.IOException; + +import static org.junit.Assert.fail; + + +public class CustomDistributionITCase { + + @Test + public void testPartitionWithDistribution1() throws Exception{ + /* +* Test the record partitioned rightly with one field according to the customized data distribution +*/ + + ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); + + DataSet> input1 = CollectionDataSets.get3TupleDataSet(env); + final TestDataDist dist = new TestDataDist(1); + + env.setParallelism(dist.getParallelism()); + + DataSet result = DataSetUtils.partitionByRange(input1, dist, 0).mapPartition(new RichMapPartitionFunction, Boolean>() { + @Override + public void mapPartition(Iterable> values, Collector out) throws Exception { + int partitionIndex = getRuntimeContext().getIndexOfThisSubtask(); + + for (Tuple3 s : values) { + if ((s.f0 - 1) / 7 != partitionIndex) { + fail("Record was not correctly partitioned: " + s.toString()); + } + } + } + }); + + result.output(new DiscardingOutputFormat()); + env.execute(); + } + + @Test + public void testRangeWithDistribution2() throws Exception{ + /* +* Test the record partitioned rightly with two fields according to the customized data distribution +*/ + + ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); + + DataSet> input1 = CollectionDataSets.get3TupleDataSet(env); + final TestDataDist dist = new TestDataDist(2); + + env.setParallelism(dist.getParallelism()); + + DataSet result = DataSetUtils.partitionByRange(input1.map(new MapFunction, Tuple3>() { + @Override + public Tuple3 map(Tuple3 value) throws Exception { + return new Tuple3<>(value.f0, value.f1.intValue(), value.f2); + } + }), dist, 0, 1).mapPartition(new RichMapPartitionFunction, Boolean>() { + @Override + public void mapPartit
[GitHub] flink pull request: [FLINK-2997] Support range partition with user...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1776#discussion_r57078308 --- Diff: flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java --- @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.javaApiOperators; + +import org.apache.flink.api.common.distributions.DataDistribution; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.functions.RichMapPartitionFunction; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.DiscardingOutputFormat; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.utils.DataSetUtils; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.test.javaApiOperators.util.CollectionDataSets; +import org.apache.flink.util.Collector; +import org.junit.Test; + + +import java.io.IOException; + +import static org.junit.Assert.fail; + + +public class CustomDistributionITCase { + + @Test + public void testPartitionWithDistribution1() throws Exception{ + /* +* Test the record partitioned rightly with one field according to the customized data distribution +*/ + + ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); + + DataSet> input1 = CollectionDataSets.get3TupleDataSet(env); + final TestDataDist dist = new TestDataDist(1); + + env.setParallelism(dist.getParallelism()); + + DataSet result = DataSetUtils.partitionByRange(input1, dist, 0).mapPartition(new RichMapPartitionFunction, Boolean>() { + @Override + public void mapPartition(Iterable> values, Collector out) throws Exception { + int partitionIndex = getRuntimeContext().getIndexOfThisSubtask(); + + for (Tuple3 s : values) { + if ((s.f0 - 1) / 7 != partitionIndex) { + fail("Record was not correctly partitioned: " + s.toString()); + } + } + } + }); + + result.output(new DiscardingOutputFormat()); + env.execute(); + } + + @Test + public void testRangeWithDistribution2() throws Exception{ + /* +* Test the record partitioned rightly with two fields according to the customized data distribution +*/ + + ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); + + DataSet> input1 = CollectionDataSets.get3TupleDataSet(env); + final TestDataDist dist = new TestDataDist(2); + + env.setParallelism(dist.getParallelism()); + + DataSet result = DataSetUtils.partitionByRange(input1.map(new MapFunction, Tuple3>() { + @Override + public Tuple3 map(Tuple3 value) throws Exception { + return new Tuple3<>(value.f0, value.f1.intValue(), value.f2); + } + }), dist, 0, 1).mapPartition(new RichMapPartitionFunction, Boolean>() { + @Override + public void mapPartition(Iterable> values, Collector out) throws Exception { + int partitionIndex = getRuntimeContext().getIndexOfThisSubtask(); + + for (Tuple3 s : values) { +
[jira] [Commented] (FLINK-3639) Add methods and utilities to register DataSets and Tables in the TableEnvironment
[ https://issues.apache.org/jira/browse/FLINK-3639?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15207374#comment-15207374 ] ASF GitHub Bot commented on FLINK-3639: --- Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/1827#discussion_r57075247 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/TableEnvironment.scala --- @@ -87,5 +90,72 @@ class TableEnvironment { new JavaBatchTranslator(config).translate[T](table.relNode)(typeInfo) } -} + /** + * Registers a DataSet under a unique name, so that it can be used in SQL queries. + * The fields of the DataSet type are used to name the Table fields. + * @param name the Table name + * @param dataset the DataSet to register + */ + def registerDataSet[T](name: String, dataset: DataSet[T]): Unit = { + +val (fieldNames, fieldIndexes) = TranslationContext.getFieldInfo[T](dataset.getType) +val dataSetTable = new DataSetTable[T]( + dataset, + fieldIndexes, + fieldNames +) +TranslationContext.addAndRegisterDataSet(dataSetTable, name) + } + + /** + * Registers a DataSet under a unique name, so that it can be used in SQL queries. + * The fields of the DataSet type are renamed to the given set of fields. + * + * @param name the Table name + * @param dataset the DataSet to register + * @param fields the Table field names + */ + def registerDataSet[T](name: String, dataset: DataSet[T], fields: String): Unit = { + +val exprs = ExpressionParser + .parseExpressionList(fields) + .toArray + +val (fieldNames, fieldIndexes) = TranslationContext.getFieldInfo[T](dataset.getType, exprs) +val dataSetTable = new DataSetTable[T]( + dataset, + fieldIndexes.toArray, + fieldNames.toArray +) +TranslationContext.addAndRegisterDataSet(dataSetTable, name) + } + + /** + * Registers a Table under a unique name, so that it can be used in SQL queries. + * @param name the Table name + * @param table the Table to register + */ + def registerTable[T](name: String, table: Table): Unit = { +val tableTable = new TableTable(table.getRelNode()) +TranslationContext.registerTable(tableTable, name) + } + + /** + * Retrieve a registered Table. + * @param tableName the name under which the Table has been registered + * @return the Table object + */ + @throws[TableException] + def scan(tableName: String): Table = { +if (TranslationContext.isRegistered(tableName)) { + val relBuilder = TranslationContext.getRelBuilder + relBuilder.scan(tableName) + new Table(relBuilder.build(), relBuilder) +} +else { + throw new TableException("Table \"" + tableName + "\" was not found in the registry.") --- End diff -- That's much nicer than my Java-ish way :S > Add methods and utilities to register DataSets and Tables in the > TableEnvironment > - > > Key: FLINK-3639 > URL: https://issues.apache.org/jira/browse/FLINK-3639 > Project: Flink > Issue Type: New Feature > Components: Table API >Affects Versions: 1.1.0 >Reporter: Vasia Kalavri >Assignee: Vasia Kalavri > > In order to make tables queryable from SQL we need to register them under a > unique name in the TableEnvironment. > [This design > document|https://docs.google.com/document/d/1sITIShmJMGegzAjGqFuwiN_iw1urwykKsLiacokxSw0/edit] > describes the proposed API. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3639] add methods for registering datas...
Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/1827#discussion_r57075247 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/TableEnvironment.scala --- @@ -87,5 +90,72 @@ class TableEnvironment { new JavaBatchTranslator(config).translate[T](table.relNode)(typeInfo) } -} + /** + * Registers a DataSet under a unique name, so that it can be used in SQL queries. + * The fields of the DataSet type are used to name the Table fields. + * @param name the Table name + * @param dataset the DataSet to register + */ + def registerDataSet[T](name: String, dataset: DataSet[T]): Unit = { + +val (fieldNames, fieldIndexes) = TranslationContext.getFieldInfo[T](dataset.getType) +val dataSetTable = new DataSetTable[T]( + dataset, + fieldIndexes, + fieldNames +) +TranslationContext.addAndRegisterDataSet(dataSetTable, name) + } + + /** + * Registers a DataSet under a unique name, so that it can be used in SQL queries. + * The fields of the DataSet type are renamed to the given set of fields. + * + * @param name the Table name + * @param dataset the DataSet to register + * @param fields the Table field names + */ + def registerDataSet[T](name: String, dataset: DataSet[T], fields: String): Unit = { + +val exprs = ExpressionParser + .parseExpressionList(fields) + .toArray + +val (fieldNames, fieldIndexes) = TranslationContext.getFieldInfo[T](dataset.getType, exprs) +val dataSetTable = new DataSetTable[T]( + dataset, + fieldIndexes.toArray, + fieldNames.toArray +) +TranslationContext.addAndRegisterDataSet(dataSetTable, name) + } + + /** + * Registers a Table under a unique name, so that it can be used in SQL queries. + * @param name the Table name + * @param table the Table to register + */ + def registerTable[T](name: String, table: Table): Unit = { +val tableTable = new TableTable(table.getRelNode()) +TranslationContext.registerTable(tableTable, name) + } + + /** + * Retrieve a registered Table. + * @param tableName the name under which the Table has been registered + * @return the Table object + */ + @throws[TableException] + def scan(tableName: String): Table = { +if (TranslationContext.isRegistered(tableName)) { + val relBuilder = TranslationContext.getRelBuilder + relBuilder.scan(tableName) + new Table(relBuilder.build(), relBuilder) +} +else { + throw new TableException("Table \"" + tableName + "\" was not found in the registry.") --- End diff -- That's much nicer than my Java-ish way :S --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3639) Add methods and utilities to register DataSets and Tables in the TableEnvironment
[ https://issues.apache.org/jira/browse/FLINK-3639?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15207368#comment-15207368 ] ASF GitHub Bot commented on FLINK-3639: --- Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/1827#discussion_r57074776 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/TableEnvironment.scala --- @@ -72,5 +74,68 @@ class TableEnvironment { new ScalaBatchTranslator(config).translate[T](table.relNode) } -} + /** + * Registers a DataSet under a unique name, so that it can be used in SQL queries. + * The fields of the DataSet type are used to name the Table fields. + * @param name the Table name + * @param dataset the DataSet to register + */ + def registerDataSet[T](name: String, dataset: DataSet[T]): Unit = { --- End diff -- probably :) > Add methods and utilities to register DataSets and Tables in the > TableEnvironment > - > > Key: FLINK-3639 > URL: https://issues.apache.org/jira/browse/FLINK-3639 > Project: Flink > Issue Type: New Feature > Components: Table API >Affects Versions: 1.1.0 >Reporter: Vasia Kalavri >Assignee: Vasia Kalavri > > In order to make tables queryable from SQL we need to register them under a > unique name in the TableEnvironment. > [This design > document|https://docs.google.com/document/d/1sITIShmJMGegzAjGqFuwiN_iw1urwykKsLiacokxSw0/edit] > describes the proposed API. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3639] add methods for registering datas...
Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/1827#discussion_r57074776 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/TableEnvironment.scala --- @@ -72,5 +74,68 @@ class TableEnvironment { new ScalaBatchTranslator(config).translate[T](table.relNode) } -} + /** + * Registers a DataSet under a unique name, so that it can be used in SQL queries. + * The fields of the DataSet type are used to name the Table fields. + * @param name the Table name + * @param dataset the DataSet to register + */ + def registerDataSet[T](name: String, dataset: DataSet[T]): Unit = { --- End diff -- probably :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3639) Add methods and utilities to register DataSets and Tables in the TableEnvironment
[ https://issues.apache.org/jira/browse/FLINK-3639?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15207361#comment-15207361 ] ASF GitHub Bot commented on FLINK-3639: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1827#discussion_r57073483 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/TableTable.scala --- @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.schema + +import java.lang.Double +import java.util +import java.util.Collections + +import org.apache.calcite.plan.RelOptTable +import org.apache.calcite.plan.RelOptTable.ToRelContext +import org.apache.calcite.rel.{RelDistribution, RelCollation, RelNode} +import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory} +import org.apache.calcite.schema.Schema.TableType +import org.apache.calcite.schema.impl.AbstractTable +import org.apache.calcite.schema.{TranslatableTable, Statistic} +import org.apache.calcite.util.ImmutableBitSet + +class TableTable(relNode: RelNode) extends AbstractTable with TranslatableTable { + + override def getStatistic: Statistic = new DefaultTableStatistic + + override def getJdbcTableType: TableType = ??? + + override def getRowType(typeFactory: RelDataTypeFactory): RelDataType = relNode.getRowType + + override def toRel(context: ToRelContext, relOptTable: RelOptTable): RelNode = { +relNode + } +} + +class DefaultTableStatistic extends Statistic { --- End diff -- Yes, I think you can but don't have to override it. > Add methods and utilities to register DataSets and Tables in the > TableEnvironment > - > > Key: FLINK-3639 > URL: https://issues.apache.org/jira/browse/FLINK-3639 > Project: Flink > Issue Type: New Feature > Components: Table API >Affects Versions: 1.1.0 >Reporter: Vasia Kalavri >Assignee: Vasia Kalavri > > In order to make tables queryable from SQL we need to register them under a > unique name in the TableEnvironment. > [This design > document|https://docs.google.com/document/d/1sITIShmJMGegzAjGqFuwiN_iw1urwykKsLiacokxSw0/edit] > describes the proposed API. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3639] add methods for registering datas...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1827#discussion_r57073483 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/TableTable.scala --- @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.schema + +import java.lang.Double +import java.util +import java.util.Collections + +import org.apache.calcite.plan.RelOptTable +import org.apache.calcite.plan.RelOptTable.ToRelContext +import org.apache.calcite.rel.{RelDistribution, RelCollation, RelNode} +import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory} +import org.apache.calcite.schema.Schema.TableType +import org.apache.calcite.schema.impl.AbstractTable +import org.apache.calcite.schema.{TranslatableTable, Statistic} +import org.apache.calcite.util.ImmutableBitSet + +class TableTable(relNode: RelNode) extends AbstractTable with TranslatableTable { + + override def getStatistic: Statistic = new DefaultTableStatistic + + override def getJdbcTableType: TableType = ??? + + override def getRowType(typeFactory: RelDataTypeFactory): RelDataType = relNode.getRowType + + override def toRel(context: ToRelContext, relOptTable: RelOptTable): RelNode = { +relNode + } +} + +class DefaultTableStatistic extends Statistic { --- End diff -- Yes, I think you can but don't have to override it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3639) Add methods and utilities to register DataSets and Tables in the TableEnvironment
[ https://issues.apache.org/jira/browse/FLINK-3639?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15207336#comment-15207336 ] ASF GitHub Bot commented on FLINK-3639: --- Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/1827#discussion_r57070386 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/TableTable.scala --- @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.schema + +import java.lang.Double +import java.util +import java.util.Collections + +import org.apache.calcite.plan.RelOptTable +import org.apache.calcite.plan.RelOptTable.ToRelContext +import org.apache.calcite.rel.{RelDistribution, RelCollation, RelNode} +import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory} +import org.apache.calcite.schema.Schema.TableType +import org.apache.calcite.schema.impl.AbstractTable +import org.apache.calcite.schema.{TranslatableTable, Statistic} +import org.apache.calcite.util.ImmutableBitSet + +class TableTable(relNode: RelNode) extends AbstractTable with TranslatableTable { + + override def getStatistic: Statistic = new DefaultTableStatistic + + override def getJdbcTableType: TableType = ??? + + override def getRowType(typeFactory: RelDataTypeFactory): RelDataType = relNode.getRowType + + override def toRel(context: ToRelContext, relOptTable: RelOptTable): RelNode = { +relNode + } +} + +class DefaultTableStatistic extends Statistic { --- End diff -- Shall I just leave the method unimplemented then? > Add methods and utilities to register DataSets and Tables in the > TableEnvironment > - > > Key: FLINK-3639 > URL: https://issues.apache.org/jira/browse/FLINK-3639 > Project: Flink > Issue Type: New Feature > Components: Table API >Affects Versions: 1.1.0 >Reporter: Vasia Kalavri >Assignee: Vasia Kalavri > > In order to make tables queryable from SQL we need to register them under a > unique name in the TableEnvironment. > [This design > document|https://docs.google.com/document/d/1sITIShmJMGegzAjGqFuwiN_iw1urwykKsLiacokxSw0/edit] > describes the proposed API. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3644) WebRuntimMonitor set java.io.tmpdir does not work for change upload dir.
[ https://issues.apache.org/jira/browse/FLINK-3644?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15207335#comment-15207335 ] ASF GitHub Bot commented on FLINK-3644: --- Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1824#issuecomment-200032560 Hi @Astralidea, thanks for the PR. We are aware of some flaky tests. So there is no need to trigger new builds. > WebRuntimMonitor set java.io.tmpdir does not work for change upload dir. > > > Key: FLINK-3644 > URL: https://issues.apache.org/jira/browse/FLINK-3644 > Project: Flink > Issue Type: Bug > Components: Webfrontend >Affects Versions: 1.0.0 > Environment: flink-conf.yaml -> java.io.tmpdir: . > java -server -XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled > -XX:+CMSClassUnloadingEnabled -XX:+UseParNewGC -XX:+UseCompressedOops > -XX:+UseFastEmptyMethods -XX:+UseFastAccessorMethods -XX:+AlwaysPreTouch > -Xmx1707m -Dlog4j.configuration=file:log4j-mesos.properties > -Djava.io.tmpdir=. -cp > flink-dist_2.10-1.0.0.jar:log4j-1.2.17.jar:slf4j-log4j12-1.7.7.jar:flink-python_2.10-1.0.0.jar > java version "1.8.0_60" > Java(TM) SE Runtime Environment (build 1.8.0_60-b27) > Java HotSpot(TM) 64-Bit Server VM (build 25.60-b23, mixed mode) > CentOS release 6.4 (Final) >Reporter: astralidea > > flink-conf.yaml & -Djava.io.tmpdir=. does not work for me. > I don't know why.I look for the code System.getProperty("java.io.tmpdir") > should work.but it is not worked. > but in web ui in job manager configuration could see the java.io.tmpdir is > set. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3639] add methods for registering datas...
Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/1827#discussion_r57070386 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/TableTable.scala --- @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.schema + +import java.lang.Double +import java.util +import java.util.Collections + +import org.apache.calcite.plan.RelOptTable +import org.apache.calcite.plan.RelOptTable.ToRelContext +import org.apache.calcite.rel.{RelDistribution, RelCollation, RelNode} +import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory} +import org.apache.calcite.schema.Schema.TableType +import org.apache.calcite.schema.impl.AbstractTable +import org.apache.calcite.schema.{TranslatableTable, Statistic} +import org.apache.calcite.util.ImmutableBitSet + +class TableTable(relNode: RelNode) extends AbstractTable with TranslatableTable { + + override def getStatistic: Statistic = new DefaultTableStatistic + + override def getJdbcTableType: TableType = ??? + + override def getRowType(typeFactory: RelDataTypeFactory): RelDataType = relNode.getRowType + + override def toRel(context: ToRelContext, relOptTable: RelOptTable): RelNode = { +relNode + } +} + +class DefaultTableStatistic extends Statistic { --- End diff -- Shall I just leave the method unimplemented then? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3644] WebRuntimMonitor set java.io.tmpd...
Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1824#issuecomment-200032560 Hi @Astralidea, thanks for the PR. We are aware of some flaky tests. So there is no need to trigger new builds. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3639) Add methods and utilities to register DataSets and Tables in the TableEnvironment
[ https://issues.apache.org/jira/browse/FLINK-3639?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15207328#comment-15207328 ] ASF GitHub Bot commented on FLINK-3639: --- Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/1827#discussion_r57069968 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TranslationContext.scala --- @@ -59,29 +64,55 @@ object TranslationContext { .traitDefs(ConventionTraitDef.INSTANCE) .build -tabNames = Map[AbstractTable, String]() - +tablesRegistry = Map[String, AbstractTable]() relBuilder = RelBuilder.create(frameworkConfig) - nameCntr.set(0) } def addDataSet(newTable: DataSetTable[_]): String = { +val tabName = "DataSetTable_" + nameCntr.getAndIncrement() +tables.add(tabName, newTable) +tabName + } + + @throws[TableException] + def addAndRegisterDataSet(table: DataSetTable[_], name: String): Unit = { --- End diff -- sounds right :) > Add methods and utilities to register DataSets and Tables in the > TableEnvironment > - > > Key: FLINK-3639 > URL: https://issues.apache.org/jira/browse/FLINK-3639 > Project: Flink > Issue Type: New Feature > Components: Table API >Affects Versions: 1.1.0 >Reporter: Vasia Kalavri >Assignee: Vasia Kalavri > > In order to make tables queryable from SQL we need to register them under a > unique name in the TableEnvironment. > [This design > document|https://docs.google.com/document/d/1sITIShmJMGegzAjGqFuwiN_iw1urwykKsLiacokxSw0/edit] > describes the proposed API. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3639] add methods for registering datas...
Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/1827#discussion_r57069968 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TranslationContext.scala --- @@ -59,29 +64,55 @@ object TranslationContext { .traitDefs(ConventionTraitDef.INSTANCE) .build -tabNames = Map[AbstractTable, String]() - +tablesRegistry = Map[String, AbstractTable]() relBuilder = RelBuilder.create(frameworkConfig) - nameCntr.set(0) } def addDataSet(newTable: DataSetTable[_]): String = { +val tabName = "DataSetTable_" + nameCntr.getAndIncrement() +tables.add(tabName, newTable) +tabName + } + + @throws[TableException] + def addAndRegisterDataSet(table: DataSetTable[_], name: String): Unit = { --- End diff -- sounds right :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3639) Add methods and utilities to register DataSets and Tables in the TableEnvironment
[ https://issues.apache.org/jira/browse/FLINK-3639?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15207306#comment-15207306 ] ASF GitHub Bot commented on FLINK-3639: --- Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1827#issuecomment-200025666 Thanks for the PR! Looks good overall. I think we can save a few LOCs by refactoring common parts a bit. Also I spotted more opportunities to use Scala's String building feature (`s""`) which is nicer than String concatenation, IMO. > Add methods and utilities to register DataSets and Tables in the > TableEnvironment > - > > Key: FLINK-3639 > URL: https://issues.apache.org/jira/browse/FLINK-3639 > Project: Flink > Issue Type: New Feature > Components: Table API >Affects Versions: 1.1.0 >Reporter: Vasia Kalavri >Assignee: Vasia Kalavri > > In order to make tables queryable from SQL we need to register them under a > unique name in the TableEnvironment. > [This design > document|https://docs.google.com/document/d/1sITIShmJMGegzAjGqFuwiN_iw1urwykKsLiacokxSw0/edit] > describes the proposed API. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3639] add methods for registering datas...
Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1827#issuecomment-200025666 Thanks for the PR! Looks good overall. I think we can save a few LOCs by refactoring common parts a bit. Also I spotted more opportunities to use Scala's String building feature (`s""`) which is nicer than String concatenation, IMO. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3639) Add methods and utilities to register DataSets and Tables in the TableEnvironment
[ https://issues.apache.org/jira/browse/FLINK-3639?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15207298#comment-15207298 ] ASF GitHub Bot commented on FLINK-3639: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1827#discussion_r57068281 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/RegisterDataSetITCase.scala --- @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.table.test + +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.table._ +import org.apache.flink.api.scala.util.CollectionDataSets +import org.apache.flink.api.table.{TableException, Row} +import org.apache.flink.api.table.plan.TranslationContext +import org.apache.flink.api.table.test.utils.TableProgramsTestBase +import org.apache.flink.api.table.test.utils.TableProgramsTestBase.TableConfigMode +import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode +import org.apache.flink.test.util.TestBaseUtils +import org.junit._ +import org.junit.runner.RunWith +import org.junit.runners.Parameterized + +import scala.collection.JavaConverters._ + +@RunWith(classOf[Parameterized]) +class RegisterDataSetITCase( --- End diff -- If we move the common methods of the Java and Scala `TableEnvironment`s to an `AbstractTableEnvironment`, it is sufficient to only have the Scala tests, IMO. > Add methods and utilities to register DataSets and Tables in the > TableEnvironment > - > > Key: FLINK-3639 > URL: https://issues.apache.org/jira/browse/FLINK-3639 > Project: Flink > Issue Type: New Feature > Components: Table API >Affects Versions: 1.1.0 >Reporter: Vasia Kalavri >Assignee: Vasia Kalavri > > In order to make tables queryable from SQL we need to register them under a > unique name in the TableEnvironment. > [This design > document|https://docs.google.com/document/d/1sITIShmJMGegzAjGqFuwiN_iw1urwykKsLiacokxSw0/edit] > describes the proposed API. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3639] add methods for registering datas...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1827#discussion_r57068281 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/RegisterDataSetITCase.scala --- @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.table.test + +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.table._ +import org.apache.flink.api.scala.util.CollectionDataSets +import org.apache.flink.api.table.{TableException, Row} +import org.apache.flink.api.table.plan.TranslationContext +import org.apache.flink.api.table.test.utils.TableProgramsTestBase +import org.apache.flink.api.table.test.utils.TableProgramsTestBase.TableConfigMode +import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode +import org.apache.flink.test.util.TestBaseUtils +import org.junit._ +import org.junit.runner.RunWith +import org.junit.runners.Parameterized + +import scala.collection.JavaConverters._ + +@RunWith(classOf[Parameterized]) +class RegisterDataSetITCase( --- End diff -- If we move the common methods of the Java and Scala `TableEnvironment`s to an `AbstractTableEnvironment`, it is sufficient to only have the Scala tests, IMO. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2909) Gelly Graph Generators
[ https://issues.apache.org/jira/browse/FLINK-2909?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15207280#comment-15207280 ] ASF GitHub Bot commented on FLINK-2909: --- Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/1807#discussion_r57067037 --- Diff: flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/AbstractGraphTest.java --- @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.generator; + +import org.apache.flink.api.java.ExecutionEnvironment; +import org.junit.Before; + +public class AbstractGraphTest { + + protected ExecutionEnvironment env; + +@Before +public void setup() { + env = ExecutionEnvironment.createCollectionsEnvironment(); + env.getConfig().disableSysoutLogging(); --- End diff -- I think this has no effect on a CollectionEnvironment. > Gelly Graph Generators > -- > > Key: FLINK-2909 > URL: https://issues.apache.org/jira/browse/FLINK-2909 > Project: Flink > Issue Type: New Feature > Components: Gelly >Affects Versions: 1.0.0 >Reporter: Greg Hogan >Assignee: Greg Hogan > > Include a selection of graph generators in Gelly. Generated graphs will be > useful for performing scalability, stress, and regression testing as well as > benchmarking and comparing algorithms, for both Flink users and developers. > Generated data is infinitely scalable yet described by a few simple > parameters and can often substitute for user data or sharing large files when > reporting issues. > There are at multiple categories of graphs as documented by > [NetworkX|https://networkx.github.io/documentation/latest/reference/generators.html] > and elsewhere. > Graphs may be a well-defined, i.e. the [Chvátal > graph|https://en.wikipedia.org/wiki/Chv%C3%A1tal_graph]. These may be > sufficiently small to populate locally. > Graphs may be scalable, i.e. complete and star graphs. These should use > Flink's distributed parallelism. > Graphs may be stochastic, i.e. [RMat > graphs|http://snap.stanford.edu/class/cs224w-readings/chakrabarti04rmat.pdf] > . A key consideration is that the graphs should source randomness from a > seedable PRNG and generate the same Graph regardless of parallelism. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3639) Add methods and utilities to register DataSets and Tables in the TableEnvironment
[ https://issues.apache.org/jira/browse/FLINK-3639?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15207277#comment-15207277 ] ASF GitHub Bot commented on FLINK-3639: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1827#discussion_r57066925 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/TableTable.scala --- @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.schema + +import java.lang.Double +import java.util +import java.util.Collections + +import org.apache.calcite.plan.RelOptTable +import org.apache.calcite.plan.RelOptTable.ToRelContext +import org.apache.calcite.rel.{RelDistribution, RelCollation, RelNode} +import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory} +import org.apache.calcite.schema.Schema.TableType +import org.apache.calcite.schema.impl.AbstractTable +import org.apache.calcite.schema.{TranslatableTable, Statistic} +import org.apache.calcite.util.ImmutableBitSet + +class TableTable(relNode: RelNode) extends AbstractTable with TranslatableTable { + + override def getStatistic: Statistic = new DefaultTableStatistic + + override def getJdbcTableType: TableType = ??? + + override def getRowType(typeFactory: RelDataTypeFactory): RelDataType = relNode.getRowType + + override def toRel(context: ToRelContext, relOptTable: RelOptTable): RelNode = { +relNode + } +} + +class DefaultTableStatistic extends Statistic { --- End diff -- Do not provide statistics here. It might override valid statistics that Calcite computed from the relational expression that the Table represents. > Add methods and utilities to register DataSets and Tables in the > TableEnvironment > - > > Key: FLINK-3639 > URL: https://issues.apache.org/jira/browse/FLINK-3639 > Project: Flink > Issue Type: New Feature > Components: Table API >Affects Versions: 1.1.0 >Reporter: Vasia Kalavri >Assignee: Vasia Kalavri > > In order to make tables queryable from SQL we need to register them under a > unique name in the TableEnvironment. > [This design > document|https://docs.google.com/document/d/1sITIShmJMGegzAjGqFuwiN_iw1urwykKsLiacokxSw0/edit] > describes the proposed API. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2909] [Gelly] Graph generators
Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/1807#discussion_r57067037 --- Diff: flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/AbstractGraphTest.java --- @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.generator; + +import org.apache.flink.api.java.ExecutionEnvironment; +import org.junit.Before; + +public class AbstractGraphTest { + + protected ExecutionEnvironment env; + +@Before +public void setup() { + env = ExecutionEnvironment.createCollectionsEnvironment(); + env.getConfig().disableSysoutLogging(); --- End diff -- I think this has no effect on a CollectionEnvironment. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3639] add methods for registering datas...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1827#discussion_r57066925 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/TableTable.scala --- @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.schema + +import java.lang.Double +import java.util +import java.util.Collections + +import org.apache.calcite.plan.RelOptTable +import org.apache.calcite.plan.RelOptTable.ToRelContext +import org.apache.calcite.rel.{RelDistribution, RelCollation, RelNode} +import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory} +import org.apache.calcite.schema.Schema.TableType +import org.apache.calcite.schema.impl.AbstractTable +import org.apache.calcite.schema.{TranslatableTable, Statistic} +import org.apache.calcite.util.ImmutableBitSet + +class TableTable(relNode: RelNode) extends AbstractTable with TranslatableTable { + + override def getStatistic: Statistic = new DefaultTableStatistic + + override def getJdbcTableType: TableType = ??? + + override def getRowType(typeFactory: RelDataTypeFactory): RelDataType = relNode.getRowType + + override def toRel(context: ToRelContext, relOptTable: RelOptTable): RelNode = { +relNode + } +} + +class DefaultTableStatistic extends Statistic { --- End diff -- Do not provide statistics here. It might override valid statistics that Calcite computed from the relational expression that the Table represents. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3639) Add methods and utilities to register DataSets and Tables in the TableEnvironment
[ https://issues.apache.org/jira/browse/FLINK-3639?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15207269#comment-15207269 ] ASF GitHub Bot commented on FLINK-3639: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1827#discussion_r57066656 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/TableTable.scala --- @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.schema + +import java.lang.Double +import java.util +import java.util.Collections + +import org.apache.calcite.plan.RelOptTable +import org.apache.calcite.plan.RelOptTable.ToRelContext +import org.apache.calcite.rel.{RelDistribution, RelCollation, RelNode} +import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory} +import org.apache.calcite.schema.Schema.TableType +import org.apache.calcite.schema.impl.AbstractTable +import org.apache.calcite.schema.{TranslatableTable, Statistic} +import org.apache.calcite.util.ImmutableBitSet + +class TableTable(relNode: RelNode) extends AbstractTable with TranslatableTable { --- End diff -- Add a brief comment what this class is about. The name might be confusing. > Add methods and utilities to register DataSets and Tables in the > TableEnvironment > - > > Key: FLINK-3639 > URL: https://issues.apache.org/jira/browse/FLINK-3639 > Project: Flink > Issue Type: New Feature > Components: Table API >Affects Versions: 1.1.0 >Reporter: Vasia Kalavri >Assignee: Vasia Kalavri > > In order to make tables queryable from SQL we need to register them under a > unique name in the TableEnvironment. > [This design > document|https://docs.google.com/document/d/1sITIShmJMGegzAjGqFuwiN_iw1urwykKsLiacokxSw0/edit] > describes the proposed API. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3639] add methods for registering datas...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1827#discussion_r57066656 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/TableTable.scala --- @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.schema + +import java.lang.Double +import java.util +import java.util.Collections + +import org.apache.calcite.plan.RelOptTable +import org.apache.calcite.plan.RelOptTable.ToRelContext +import org.apache.calcite.rel.{RelDistribution, RelCollation, RelNode} +import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory} +import org.apache.calcite.schema.Schema.TableType +import org.apache.calcite.schema.impl.AbstractTable +import org.apache.calcite.schema.{TranslatableTable, Statistic} +import org.apache.calcite.util.ImmutableBitSet + +class TableTable(relNode: RelNode) extends AbstractTable with TranslatableTable { --- End diff -- Add a brief comment what this class is about. The name might be confusing. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3639) Add methods and utilities to register DataSets and Tables in the TableEnvironment
[ https://issues.apache.org/jira/browse/FLINK-3639?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15207264#comment-15207264 ] ASF GitHub Bot commented on FLINK-3639: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1827#discussion_r57066078 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/TableEnvironment.scala --- @@ -72,5 +74,68 @@ class TableEnvironment { new ScalaBatchTranslator(config).translate[T](table.relNode) } -} + /** + * Registers a DataSet under a unique name, so that it can be used in SQL queries. + * The fields of the DataSet type are used to name the Table fields. + * @param name the Table name + * @param dataset the DataSet to register + */ + def registerDataSet[T](name: String, dataset: DataSet[T]): Unit = { --- End diff -- Does it make sense to have an `AbstractTableEnvironment` for the common parts of the Java and the Scala `TableEnvironment`s? > Add methods and utilities to register DataSets and Tables in the > TableEnvironment > - > > Key: FLINK-3639 > URL: https://issues.apache.org/jira/browse/FLINK-3639 > Project: Flink > Issue Type: New Feature > Components: Table API >Affects Versions: 1.1.0 >Reporter: Vasia Kalavri >Assignee: Vasia Kalavri > > In order to make tables queryable from SQL we need to register them under a > unique name in the TableEnvironment. > [This design > document|https://docs.google.com/document/d/1sITIShmJMGegzAjGqFuwiN_iw1urwykKsLiacokxSw0/edit] > describes the proposed API. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3639] add methods for registering datas...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1827#discussion_r57066078 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/TableEnvironment.scala --- @@ -72,5 +74,68 @@ class TableEnvironment { new ScalaBatchTranslator(config).translate[T](table.relNode) } -} + /** + * Registers a DataSet under a unique name, so that it can be used in SQL queries. + * The fields of the DataSet type are used to name the Table fields. + * @param name the Table name + * @param dataset the DataSet to register + */ + def registerDataSet[T](name: String, dataset: DataSet[T]): Unit = { --- End diff -- Does it make sense to have an `AbstractTableEnvironment` for the common parts of the Java and the Scala `TableEnvironment`s? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3639) Add methods and utilities to register DataSets and Tables in the TableEnvironment
[ https://issues.apache.org/jira/browse/FLINK-3639?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15207262#comment-15207262 ] ASF GitHub Bot commented on FLINK-3639: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1827#discussion_r57065868 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/TableEnvironment.scala --- @@ -72,5 +74,68 @@ class TableEnvironment { new ScalaBatchTranslator(config).translate[T](table.relNode) } -} + /** + * Registers a DataSet under a unique name, so that it can be used in SQL queries. + * The fields of the DataSet type are used to name the Table fields. + * @param name the Table name + * @param dataset the DataSet to register + */ + def registerDataSet[T](name: String, dataset: DataSet[T]): Unit = { + +val (fieldNames, fieldIndexes) = TranslationContext.getFieldInfo[T](dataset.getType) +val dataSetTable = new DataSetTable[T]( + dataset.javaSet, + fieldIndexes, + fieldNames +) +TranslationContext.addAndRegisterDataSet(dataSetTable, name) + } + + /** + * Registers a DataSet under a unique name, so that it can be used in SQL queries. + * The fields of the DataSet type are renamed to the given set of fields. + * + * @param name the Table name + * @param dataset the DataSet to register + * @param fields the field names expression + */ + def registerDataSet[T](name: String, dataset: DataSet[T], fields: Expression*): Unit = { + +val (fieldNames, fieldIndexes) = TranslationContext.getFieldInfo[T]( + dataset.getType, fields.toArray) +val dataSetTable = new DataSetTable[T]( + dataset.javaSet, + fieldIndexes.toArray, + fieldNames.toArray +) +TranslationContext.addAndRegisterDataSet(dataSetTable, name) + } + + /** + * Registers a Table under a unique name, so that it can be used in SQL queries. + * @param name the Table name + * @param table the Table to register + */ + def registerTable[T](name: String, table: Table): Unit = { +val tableTable = new TableTable(table.getRelNode()) +TranslationContext.registerTable(tableTable, name) + } + + /** + * Retrieve a registered Table. + * @param tableName the name under which the Table has been registered + * @return the Table object + */ + @throws[TableException] + def scan(tableName: String): Table = { +if (TranslationContext.isRegistered(tableName)) { + val relBuilder = TranslationContext.getRelBuilder + relBuilder.scan(tableName) + new Table(relBuilder.build(), relBuilder) +} +else { + throw new TableException("Table \"" + tableName + "\" was not found in the registry.") --- End diff -- Use Scala's `s""` string building here as well. > Add methods and utilities to register DataSets and Tables in the > TableEnvironment > - > > Key: FLINK-3639 > URL: https://issues.apache.org/jira/browse/FLINK-3639 > Project: Flink > Issue Type: New Feature > Components: Table API >Affects Versions: 1.1.0 >Reporter: Vasia Kalavri >Assignee: Vasia Kalavri > > In order to make tables queryable from SQL we need to register them under a > unique name in the TableEnvironment. > [This design > document|https://docs.google.com/document/d/1sITIShmJMGegzAjGqFuwiN_iw1urwykKsLiacokxSw0/edit] > describes the proposed API. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3639] add methods for registering datas...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1827#discussion_r57065868 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/TableEnvironment.scala --- @@ -72,5 +74,68 @@ class TableEnvironment { new ScalaBatchTranslator(config).translate[T](table.relNode) } -} + /** + * Registers a DataSet under a unique name, so that it can be used in SQL queries. + * The fields of the DataSet type are used to name the Table fields. + * @param name the Table name + * @param dataset the DataSet to register + */ + def registerDataSet[T](name: String, dataset: DataSet[T]): Unit = { + +val (fieldNames, fieldIndexes) = TranslationContext.getFieldInfo[T](dataset.getType) +val dataSetTable = new DataSetTable[T]( + dataset.javaSet, + fieldIndexes, + fieldNames +) +TranslationContext.addAndRegisterDataSet(dataSetTable, name) + } + + /** + * Registers a DataSet under a unique name, so that it can be used in SQL queries. + * The fields of the DataSet type are renamed to the given set of fields. + * + * @param name the Table name + * @param dataset the DataSet to register + * @param fields the field names expression + */ + def registerDataSet[T](name: String, dataset: DataSet[T], fields: Expression*): Unit = { + +val (fieldNames, fieldIndexes) = TranslationContext.getFieldInfo[T]( + dataset.getType, fields.toArray) +val dataSetTable = new DataSetTable[T]( + dataset.javaSet, + fieldIndexes.toArray, + fieldNames.toArray +) +TranslationContext.addAndRegisterDataSet(dataSetTable, name) + } + + /** + * Registers a Table under a unique name, so that it can be used in SQL queries. + * @param name the Table name + * @param table the Table to register + */ + def registerTable[T](name: String, table: Table): Unit = { +val tableTable = new TableTable(table.getRelNode()) +TranslationContext.registerTable(tableTable, name) + } + + /** + * Retrieve a registered Table. + * @param tableName the name under which the Table has been registered + * @return the Table object + */ + @throws[TableException] + def scan(tableName: String): Table = { +if (TranslationContext.isRegistered(tableName)) { + val relBuilder = TranslationContext.getRelBuilder + relBuilder.scan(tableName) + new Table(relBuilder.build(), relBuilder) +} +else { + throw new TableException("Table \"" + tableName + "\" was not found in the registry.") --- End diff -- Use Scala's `s""` string building here as well. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3639) Add methods and utilities to register DataSets and Tables in the TableEnvironment
[ https://issues.apache.org/jira/browse/FLINK-3639?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15207261#comment-15207261 ] ASF GitHub Bot commented on FLINK-3639: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1827#discussion_r57065751 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/TableEnvironment.scala --- @@ -87,5 +90,72 @@ class TableEnvironment { new JavaBatchTranslator(config).translate[T](table.relNode)(typeInfo) } -} + /** + * Registers a DataSet under a unique name, so that it can be used in SQL queries. + * The fields of the DataSet type are used to name the Table fields. + * @param name the Table name + * @param dataset the DataSet to register + */ + def registerDataSet[T](name: String, dataset: DataSet[T]): Unit = { + +val (fieldNames, fieldIndexes) = TranslationContext.getFieldInfo[T](dataset.getType) +val dataSetTable = new DataSetTable[T]( + dataset, + fieldIndexes, + fieldNames +) +TranslationContext.addAndRegisterDataSet(dataSetTable, name) + } + + /** + * Registers a DataSet under a unique name, so that it can be used in SQL queries. + * The fields of the DataSet type are renamed to the given set of fields. + * + * @param name the Table name + * @param dataset the DataSet to register + * @param fields the Table field names + */ + def registerDataSet[T](name: String, dataset: DataSet[T], fields: String): Unit = { + +val exprs = ExpressionParser + .parseExpressionList(fields) + .toArray + +val (fieldNames, fieldIndexes) = TranslationContext.getFieldInfo[T](dataset.getType, exprs) +val dataSetTable = new DataSetTable[T]( + dataset, + fieldIndexes.toArray, + fieldNames.toArray +) +TranslationContext.addAndRegisterDataSet(dataSetTable, name) + } + + /** + * Registers a Table under a unique name, so that it can be used in SQL queries. + * @param name the Table name + * @param table the Table to register + */ + def registerTable[T](name: String, table: Table): Unit = { +val tableTable = new TableTable(table.getRelNode()) +TranslationContext.registerTable(tableTable, name) + } + + /** + * Retrieve a registered Table. + * @param tableName the name under which the Table has been registered + * @return the Table object + */ + @throws[TableException] + def scan(tableName: String): Table = { +if (TranslationContext.isRegistered(tableName)) { + val relBuilder = TranslationContext.getRelBuilder + relBuilder.scan(tableName) + new Table(relBuilder.build(), relBuilder) +} +else { + throw new TableException("Table \"" + tableName + "\" was not found in the registry.") --- End diff -- Strings can be build with Scala like this `s"Table \"$tableName\" was not found in the registry."` > Add methods and utilities to register DataSets and Tables in the > TableEnvironment > - > > Key: FLINK-3639 > URL: https://issues.apache.org/jira/browse/FLINK-3639 > Project: Flink > Issue Type: New Feature > Components: Table API >Affects Versions: 1.1.0 >Reporter: Vasia Kalavri >Assignee: Vasia Kalavri > > In order to make tables queryable from SQL we need to register them under a > unique name in the TableEnvironment. > [This design > document|https://docs.google.com/document/d/1sITIShmJMGegzAjGqFuwiN_iw1urwykKsLiacokxSw0/edit] > describes the proposed API. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3639] add methods for registering datas...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1827#discussion_r57065751 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/TableEnvironment.scala --- @@ -87,5 +90,72 @@ class TableEnvironment { new JavaBatchTranslator(config).translate[T](table.relNode)(typeInfo) } -} + /** + * Registers a DataSet under a unique name, so that it can be used in SQL queries. + * The fields of the DataSet type are used to name the Table fields. + * @param name the Table name + * @param dataset the DataSet to register + */ + def registerDataSet[T](name: String, dataset: DataSet[T]): Unit = { + +val (fieldNames, fieldIndexes) = TranslationContext.getFieldInfo[T](dataset.getType) +val dataSetTable = new DataSetTable[T]( + dataset, + fieldIndexes, + fieldNames +) +TranslationContext.addAndRegisterDataSet(dataSetTable, name) + } + + /** + * Registers a DataSet under a unique name, so that it can be used in SQL queries. + * The fields of the DataSet type are renamed to the given set of fields. + * + * @param name the Table name + * @param dataset the DataSet to register + * @param fields the Table field names + */ + def registerDataSet[T](name: String, dataset: DataSet[T], fields: String): Unit = { + +val exprs = ExpressionParser + .parseExpressionList(fields) + .toArray + +val (fieldNames, fieldIndexes) = TranslationContext.getFieldInfo[T](dataset.getType, exprs) +val dataSetTable = new DataSetTable[T]( + dataset, + fieldIndexes.toArray, + fieldNames.toArray +) +TranslationContext.addAndRegisterDataSet(dataSetTable, name) + } + + /** + * Registers a Table under a unique name, so that it can be used in SQL queries. + * @param name the Table name + * @param table the Table to register + */ + def registerTable[T](name: String, table: Table): Unit = { +val tableTable = new TableTable(table.getRelNode()) +TranslationContext.registerTable(tableTable, name) + } + + /** + * Retrieve a registered Table. + * @param tableName the name under which the Table has been registered + * @return the Table object + */ + @throws[TableException] + def scan(tableName: String): Table = { +if (TranslationContext.isRegistered(tableName)) { + val relBuilder = TranslationContext.getRelBuilder + relBuilder.scan(tableName) + new Table(relBuilder.build(), relBuilder) +} +else { + throw new TableException("Table \"" + tableName + "\" was not found in the registry.") --- End diff -- Strings can be build with Scala like this `s"Table \"$tableName\" was not found in the registry."` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3639) Add methods and utilities to register DataSets and Tables in the TableEnvironment
[ https://issues.apache.org/jira/browse/FLINK-3639?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15207254#comment-15207254 ] ASF GitHub Bot commented on FLINK-3639: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1827#discussion_r57065169 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala --- @@ -93,7 +93,10 @@ object FlinkRuleSets { DataSetCalcRule.INSTANCE, DataSetJoinRule.INSTANCE, DataSetScanRule.INSTANCE, -DataSetUnionRule.INSTANCE +DataSetUnionRule.INSTANCE, + +// convert a logical table scan to a relational expression +TableScanRule.INSTANCE --- End diff -- Can we move this rule to the top to the other default Calcite rules? > Add methods and utilities to register DataSets and Tables in the > TableEnvironment > - > > Key: FLINK-3639 > URL: https://issues.apache.org/jira/browse/FLINK-3639 > Project: Flink > Issue Type: New Feature > Components: Table API >Affects Versions: 1.1.0 >Reporter: Vasia Kalavri >Assignee: Vasia Kalavri > > In order to make tables queryable from SQL we need to register them under a > unique name in the TableEnvironment. > [This design > document|https://docs.google.com/document/d/1sITIShmJMGegzAjGqFuwiN_iw1urwykKsLiacokxSw0/edit] > describes the proposed API. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3639] add methods for registering datas...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1827#discussion_r57065169 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala --- @@ -93,7 +93,10 @@ object FlinkRuleSets { DataSetCalcRule.INSTANCE, DataSetJoinRule.INSTANCE, DataSetScanRule.INSTANCE, -DataSetUnionRule.INSTANCE +DataSetUnionRule.INSTANCE, + +// convert a logical table scan to a relational expression +TableScanRule.INSTANCE --- End diff -- Can we move this rule to the top to the other default Calcite rules? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3639) Add methods and utilities to register DataSets and Tables in the TableEnvironment
[ https://issues.apache.org/jira/browse/FLINK-3639?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15207253#comment-15207253 ] ASF GitHub Bot commented on FLINK-3639: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1827#discussion_r57065041 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TranslationContext.scala --- @@ -59,29 +64,55 @@ object TranslationContext { .traitDefs(ConventionTraitDef.INSTANCE) .build -tabNames = Map[AbstractTable, String]() - +tablesRegistry = Map[String, AbstractTable]() relBuilder = RelBuilder.create(frameworkConfig) - nameCntr.set(0) } def addDataSet(newTable: DataSetTable[_]): String = { --- End diff -- Refactor to `registerTable(AbstractTable)` as well? > Add methods and utilities to register DataSets and Tables in the > TableEnvironment > - > > Key: FLINK-3639 > URL: https://issues.apache.org/jira/browse/FLINK-3639 > Project: Flink > Issue Type: New Feature > Components: Table API >Affects Versions: 1.1.0 >Reporter: Vasia Kalavri >Assignee: Vasia Kalavri > > In order to make tables queryable from SQL we need to register them under a > unique name in the TableEnvironment. > [This design > document|https://docs.google.com/document/d/1sITIShmJMGegzAjGqFuwiN_iw1urwykKsLiacokxSw0/edit] > describes the proposed API. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3639) Add methods and utilities to register DataSets and Tables in the TableEnvironment
[ https://issues.apache.org/jira/browse/FLINK-3639?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15207249#comment-15207249 ] ASF GitHub Bot commented on FLINK-3639: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1827#discussion_r57064833 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TranslationContext.scala --- @@ -59,29 +64,55 @@ object TranslationContext { .traitDefs(ConventionTraitDef.INSTANCE) .build -tabNames = Map[AbstractTable, String]() - +tablesRegistry = Map[String, AbstractTable]() relBuilder = RelBuilder.create(frameworkConfig) - nameCntr.set(0) } def addDataSet(newTable: DataSetTable[_]): String = { +val tabName = "DataSetTable_" + nameCntr.getAndIncrement() +tables.add(tabName, newTable) +tabName + } + + @throws[TableException] + def addAndRegisterDataSet(table: DataSetTable[_], name: String): Unit = { --- End diff -- I think `addAndRegisterDataSet(DataSetTable, String)` and `registerTable(TableTable, String)` can be combined to `registerTable(AbstractTable, String)` > Add methods and utilities to register DataSets and Tables in the > TableEnvironment > - > > Key: FLINK-3639 > URL: https://issues.apache.org/jira/browse/FLINK-3639 > Project: Flink > Issue Type: New Feature > Components: Table API >Affects Versions: 1.1.0 >Reporter: Vasia Kalavri >Assignee: Vasia Kalavri > > In order to make tables queryable from SQL we need to register them under a > unique name in the TableEnvironment. > [This design > document|https://docs.google.com/document/d/1sITIShmJMGegzAjGqFuwiN_iw1urwykKsLiacokxSw0/edit] > describes the proposed API. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3639] add methods for registering datas...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1827#discussion_r57065041 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TranslationContext.scala --- @@ -59,29 +64,55 @@ object TranslationContext { .traitDefs(ConventionTraitDef.INSTANCE) .build -tabNames = Map[AbstractTable, String]() - +tablesRegistry = Map[String, AbstractTable]() relBuilder = RelBuilder.create(frameworkConfig) - nameCntr.set(0) } def addDataSet(newTable: DataSetTable[_]): String = { --- End diff -- Refactor to `registerTable(AbstractTable)` as well? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3639] add methods for registering datas...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1827#discussion_r57064833 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TranslationContext.scala --- @@ -59,29 +64,55 @@ object TranslationContext { .traitDefs(ConventionTraitDef.INSTANCE) .build -tabNames = Map[AbstractTable, String]() - +tablesRegistry = Map[String, AbstractTable]() relBuilder = RelBuilder.create(frameworkConfig) - nameCntr.set(0) } def addDataSet(newTable: DataSetTable[_]): String = { +val tabName = "DataSetTable_" + nameCntr.getAndIncrement() +tables.add(tabName, newTable) +tabName + } + + @throws[TableException] + def addAndRegisterDataSet(table: DataSetTable[_], name: String): Unit = { --- End diff -- I think `addAndRegisterDataSet(DataSetTable, String)` and `registerTable(TableTable, String)` can be combined to `registerTable(AbstractTable, String)` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-3656) Rework TableAPI tests
Vasia Kalavri created FLINK-3656: Summary: Rework TableAPI tests Key: FLINK-3656 URL: https://issues.apache.org/jira/browse/FLINK-3656 Project: Flink Issue Type: Improvement Components: Table API Reporter: Vasia Kalavri We should look into whether we could rework the Table API tests to extract check of Table API parts that are common for DataSet and DataStream. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3547) Add support for streaming projection, selection, and union
[ https://issues.apache.org/jira/browse/FLINK-3547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15207188#comment-15207188 ] ASF GitHub Bot commented on FLINK-3547: --- Github user vasia commented on the pull request: https://github.com/apache/flink/pull/1820#issuecomment-19747 I have updated the PR. Thanks! > Add support for streaming projection, selection, and union > -- > > Key: FLINK-3547 > URL: https://issues.apache.org/jira/browse/FLINK-3547 > Project: Flink > Issue Type: Sub-task > Components: Table API >Reporter: Vasia Kalavri >Assignee: Vasia Kalavri > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3547] add support for streaming filter,...
Github user vasia commented on the pull request: https://github.com/apache/flink/pull/1820#issuecomment-19747 I have updated the PR. Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-3655) Allow comma-separated or multiple directories to be specified for FileInputFormat
[ https://issues.apache.org/jira/browse/FLINK-3655?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gna Phetsarath updated FLINK-3655: -- Description: Allow comma-separated or multiple directories to be specified for FileInputFormat so that a DataSource will process the directories sequentially. env.readFile("/data/2016/01/01/*/*,/data/2016/01/02/*/*,/data/2016/01/03/*/*") in Scala env.readFile(paths: Seq[String]) or env.readFile(path: String, otherPaths: String*) Wildcard support would be a bonus. was: Allow comma-separated directories to be specified for FileInputFormat so that a DataSource will process the directories sequentially. env.readFile("/data/2016/01/01/*/*,/data/2016/01/02/*/*,/data/2016/01/03/*/*") in Scala env.readFile(paths: Seq[String]) or env.readFile(path: String, otherPaths: String*) Wildcard support would be a bonus. > Allow comma-separated or multiple directories to be specified for > FileInputFormat > - > > Key: FLINK-3655 > URL: https://issues.apache.org/jira/browse/FLINK-3655 > Project: Flink > Issue Type: Improvement > Components: Core >Affects Versions: 1.0.0 >Reporter: Gna Phetsarath >Priority: Minor > > Allow comma-separated or multiple directories to be specified for > FileInputFormat so that a DataSource will process the directories > sequentially. > > env.readFile("/data/2016/01/01/*/*,/data/2016/01/02/*/*,/data/2016/01/03/*/*") > in Scala >env.readFile(paths: Seq[String]) > or > env.readFile(path: String, otherPaths: String*) > Wildcard support would be a bonus. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-3655) Allow comma-separated or multiple directories to be specified for FileInputFormat
[ https://issues.apache.org/jira/browse/FLINK-3655?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gna Phetsarath updated FLINK-3655: -- Summary: Allow comma-separated or multiple directories to be specified for FileInputFormat (was: Allow comma-separated directories to be specified for FileInputFormat) > Allow comma-separated or multiple directories to be specified for > FileInputFormat > - > > Key: FLINK-3655 > URL: https://issues.apache.org/jira/browse/FLINK-3655 > Project: Flink > Issue Type: Improvement > Components: Core >Affects Versions: 1.0.0 >Reporter: Gna Phetsarath >Priority: Minor > > Allow comma-separated directories to be specified for FileInputFormat so that > a DataSource will process the directories sequentially. > > env.readFile("/data/2016/01/01/*/*,/data/2016/01/02/*/*,/data/2016/01/03/*/*") > in Scala >env.readFile(paths: Seq[String]) > or > env.readFile(path: String, otherPaths: String*) > Wildcard support would be a bonus. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-3655) Allow comma-separated directories to be specified for FileInputFormat
[ https://issues.apache.org/jira/browse/FLINK-3655?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gna Phetsarath updated FLINK-3655: -- Description: Allow comma-separated directories to be specified for FileInputFormat so that a DataSource will process the directories sequentially. env.readFile("/data/2016/01/01/*/*,/data/2016/01/02/*/*,/data/2016/01/03/*/*") in Scala env.readFile(paths: Seq[String]) or env.readFile(path: String, otherPaths: String*) Wildcard support would be a bonus. was: Allow comma-separated directories to be specified for FileInputFormat so that a DataSource will process the directories sequentially. env.readFile("/data/2016/01/01/*/*,/data/2016/01/02/*/*,/data/2016/01/03/*/*")}} Wildcard support would be a bonus. > Allow comma-separated directories to be specified for FileInputFormat > - > > Key: FLINK-3655 > URL: https://issues.apache.org/jira/browse/FLINK-3655 > Project: Flink > Issue Type: Improvement > Components: Core >Affects Versions: 1.0.0 >Reporter: Gna Phetsarath >Priority: Minor > > Allow comma-separated directories to be specified for FileInputFormat so that > a DataSource will process the directories sequentially. > > env.readFile("/data/2016/01/01/*/*,/data/2016/01/02/*/*,/data/2016/01/03/*/*") > in Scala >env.readFile(paths: Seq[String]) > or > env.readFile(path: String, otherPaths: String*) > Wildcard support would be a bonus. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (FLINK-3613) Add standard deviation, mean, variance to list of Aggregations
[ https://issues.apache.org/jira/browse/FLINK-3613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15207054#comment-15207054 ] Todd Lisonbee edited comment on FLINK-3613 at 3/22/16 7:02 PM: --- Attached is a design for improvements to DataSet.aggregate() needed to implement additional aggregations like Standard Deviation. To maintain public API's it seems like the best path would be to have AggregateOperator implement CustomUnaryOperation but that seems weird because no other Operator is done that way. But other options I see don't seem consistent with other Operators either. I really could use some feedback on this. Thanks. Also, should I be posting this to the Dev mailing list? was (Author: tlisonbee): Attached is a design for improvements to DataSet.aggregate() needed to implement additional aggregations like Standard Deviation. To maintain public API's it seems like the best path would be to have AggregateOperator implement CustomUnaryOperation but that seems weird because no other Operator is done that way. But other options I see don't seem consistent with other Operators either. I really could use some feedback on this. Thanks. > Add standard deviation, mean, variance to list of Aggregations > -- > > Key: FLINK-3613 > URL: https://issues.apache.org/jira/browse/FLINK-3613 > Project: Flink > Issue Type: Improvement >Reporter: Todd Lisonbee >Priority: Minor > Attachments: DataSet-Aggregation-Design-March2016-v1.txt > > > Implement standard deviation, mean, variance for > org.apache.flink.api.java.aggregation.Aggregations > Ideally implementation should be single pass and numerically stable. > References: > "Scalable and Numerically Stable Descriptive Statistics in SystemML", Tian et > al, International Conference on Data Engineering 2012 > http://dl.acm.org/citation.cfm?id=2310392 > "The Kahan summation algorithm (also known as compensated summation) reduces > the numerical errors that occur when adding a sequence of finite precision > floating point numbers. Numerical errors arise due to truncation and > rounding. These errors can lead to numerical instability when calculating > variance." > https://en.wikipedia.org/wiki/Kahan_summation_algorithm -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-3613) Add standard deviation, mean, variance to list of Aggregations
[ https://issues.apache.org/jira/browse/FLINK-3613?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Todd Lisonbee updated FLINK-3613: - Attachment: DataSet-Aggregation-Design-March2016-v1.txt Attached is a design for improvements to DataSet.aggregate() needed to implement additional aggregations like Standard Deviation. To maintain public API's it seems like the best path would be to have AggregateOperator implement CustomUnaryOperation but that seems weird because no other Operator is done that way. But other options I see don't seem consistent with other Operators either. I really could use some feedback on this. Thanks. > Add standard deviation, mean, variance to list of Aggregations > -- > > Key: FLINK-3613 > URL: https://issues.apache.org/jira/browse/FLINK-3613 > Project: Flink > Issue Type: Improvement >Reporter: Todd Lisonbee >Priority: Minor > Attachments: DataSet-Aggregation-Design-March2016-v1.txt > > > Implement standard deviation, mean, variance for > org.apache.flink.api.java.aggregation.Aggregations > Ideally implementation should be single pass and numerically stable. > References: > "Scalable and Numerically Stable Descriptive Statistics in SystemML", Tian et > al, International Conference on Data Engineering 2012 > http://dl.acm.org/citation.cfm?id=2310392 > "The Kahan summation algorithm (also known as compensated summation) reduces > the numerical errors that occur when adding a sequence of finite precision > floating point numbers. Numerical errors arise due to truncation and > rounding. These errors can lead to numerical instability when calculating > variance." > https://en.wikipedia.org/wiki/Kahan_summation_algorithm -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-3655) Allow comma-separated directories to be specified for FileInputFormat
[ https://issues.apache.org/jira/browse/FLINK-3655?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gna Phetsarath updated FLINK-3655: -- Description: Allow comma-separated directories to be specified for FileInputFormat so that a DataSource will process the directories sequentially. env.readFile("/data/2016/01/01/*/*,/data/2016/01/02/*/*,/data/2016/01/03/*/*")}} Wildcard support would be a bonus. was: Allow comma-separated multiple directories to be specified for FileInputFormat. env.readFile("/data/2016/01/01/*/*,/data/2016/01/02/*/*,/data/2016/01/03/*/*")}} Wildcard support would be a bonus. > Allow comma-separated directories to be specified for FileInputFormat > - > > Key: FLINK-3655 > URL: https://issues.apache.org/jira/browse/FLINK-3655 > Project: Flink > Issue Type: Improvement > Components: Core >Affects Versions: 1.0.0 >Reporter: Gna Phetsarath >Priority: Minor > > Allow comma-separated directories to be specified for FileInputFormat so that > a DataSource will process the directories sequentially. > env.readFile("/data/2016/01/01/*/*,/data/2016/01/02/*/*,/data/2016/01/03/*/*")}} > Wildcard support would be a bonus. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-3655) Allow comma-separated directories to be specified for FileInputFormat
[ https://issues.apache.org/jira/browse/FLINK-3655?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gna Phetsarath updated FLINK-3655: -- Summary: Allow comma-separated directories to be specified for FileInputFormat (was: Allow comma-separated multiple directories to be specified for FileInputFormat) > Allow comma-separated directories to be specified for FileInputFormat > - > > Key: FLINK-3655 > URL: https://issues.apache.org/jira/browse/FLINK-3655 > Project: Flink > Issue Type: Improvement > Components: Core >Affects Versions: 1.0.0 >Reporter: Gna Phetsarath >Priority: Minor > > Allow comma-separated multiple directories to be specified for > FileInputFormat. > env.readFile("/data/2016/01/01/*/*,/data/2016/01/02/*/*,/data/2016/01/03/*/*")}} > Wildcard support would be a bonus. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3655) Allow comma-separated multiple directories to be specified for FileInputFormat
Gna Phetsarath created FLINK-3655: - Summary: Allow comma-separated multiple directories to be specified for FileInputFormat Key: FLINK-3655 URL: https://issues.apache.org/jira/browse/FLINK-3655 Project: Flink Issue Type: Improvement Components: Core Affects Versions: 1.0.0 Reporter: Gna Phetsarath Priority: Minor Allow comma-separated multiple directories to be specified for FileInputFormat. env.readFile("/data/2016/01/01/*/*,/data/2016/01/02/*/*,/data/2016/01/03/*/*")}} Wildcard support would be a bonus. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2821) Change Akka configuration to allow accessing actors from different URLs
[ https://issues.apache.org/jira/browse/FLINK-2821?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15206980#comment-15206980 ] Robert Metzger commented on FLINK-2821: --- That sounds like a very good idea! +1 > Change Akka configuration to allow accessing actors from different URLs > --- > > Key: FLINK-2821 > URL: https://issues.apache.org/jira/browse/FLINK-2821 > Project: Flink > Issue Type: Bug > Components: Distributed Runtime >Reporter: Robert Metzger >Assignee: Maximilian Michels > > Akka expects the actor's URL to be exactly matching. > As pointed out here, cases where users were complaining about this: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Error-trying-to-access-JM-through-proxy-td3018.html > - Proxy routing (as described here, send to the proxy URL, receiver > recognizes only original URL) > - Using hostname / IP interchangeably does not work (we solved this by > always putting IP addresses into URLs, never hostnames) > - Binding to multiple interfaces (any local 0.0.0.0) does not work. Still > no solution to that (but seems not too much of a restriction) > I am aware that this is not possible due to Akka, so it is actually not a > Flink bug. But I think we should track the resolution of the issue here > anyways because its affecting our user's satisfaction. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3544) ResourceManager runtime components
[ https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15206875#comment-15206875 ] ASF GitHub Bot commented on FLINK-3544: --- Github user mxm commented on the pull request: https://github.com/apache/flink/pull/1741#issuecomment-199934108 Thanks a lot @tillrohrmann for taking the time to look into the bulk of code. I have already eagerly addressed most of your comments in the additional commits I pushed. Next, I'll revise the cluster shutdown logic and the re-connect in case of unresponsiveness of the resource manager. > ResourceManager runtime components > -- > > Key: FLINK-3544 > URL: https://issues.apache.org/jira/browse/FLINK-3544 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Affects Versions: 1.1.0 >Reporter: Maximilian Michels >Assignee: Maximilian Michels > Fix For: 1.1.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3544] Introduce ResourceManager compone...
Github user mxm commented on the pull request: https://github.com/apache/flink/pull/1741#issuecomment-199934108 Thanks a lot @tillrohrmann for taking the time to look into the bulk of code. I have already eagerly addressed most of your comments in the additional commits I pushed. Next, I'll revise the cluster shutdown logic and the re-connect in case of unresponsiveness of the resource manager. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3544] Introduce ResourceManager compone...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1741#discussion_r57031901 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala --- @@ -312,59 +323,121 @@ class JobManager( leaderSessionID = None -case RegisterTaskManager( - connectionInfo, - hardwareInformation, - numberOfSlots) => +case msg: RegisterResourceManager => + log.debug(s"Resource manager registration: $msg") + + // ditch current resource manager (if any) + currentResourceManager = Option(msg.resourceManager()) + + val taskManagerResources = instanceManager.getAllRegisteredInstances.asScala.map( +instance => instance.getResourceId).toList.asJava + + // confirm registration and send known task managers with their resource ids + sender ! decorateMessage(new RegisterResourceManagerSuccessful(self, taskManagerResources)) + +case msg: DisconnectResourceManager => + log.debug(s"Resource manager disconnect: $msg") + + currentResourceManager match { +case Some(rm) if rm.equals(msg.resourceManager()) => + // we should ditch the current resource manager + log.debug(s"Disconnecting resource manager $rm.") + // send the old one a disconnect message + rm ! decorateMessage(new TriggerRegistrationAtJobManager(self)) + currentResourceManager = None +case None => + // not connected, thus ignoring this message + log.warn(s"No resource manager ${msg.resourceManager()} connected. Can't disconnect.") + } + +case msg @ RegisterTaskManager( + resourceId, + connectionInfo, + hardwareInformation, + numberOfSlots) => + // we are being informed by the ResourceManager that a new task manager is available + log.debug(s"RegisterTaskManager: $msg") val taskManager = sender() + currentResourceManager match { +case Some(rm) => + val future = (rm ? decorateMessage(new RegisterResource(taskManager, msg)))(timeout) + future.onComplete { +case scala.util.Success(response) => + // the resource manager is available and answered + self ! response +case scala.util.Failure(t) => --- End diff -- Done (doesn't change the diff). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3544) ResourceManager runtime components
[ https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15206850#comment-15206850 ] ASF GitHub Bot commented on FLINK-3544: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1741#discussion_r57031901 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala --- @@ -312,59 +323,121 @@ class JobManager( leaderSessionID = None -case RegisterTaskManager( - connectionInfo, - hardwareInformation, - numberOfSlots) => +case msg: RegisterResourceManager => + log.debug(s"Resource manager registration: $msg") + + // ditch current resource manager (if any) + currentResourceManager = Option(msg.resourceManager()) + + val taskManagerResources = instanceManager.getAllRegisteredInstances.asScala.map( +instance => instance.getResourceId).toList.asJava + + // confirm registration and send known task managers with their resource ids + sender ! decorateMessage(new RegisterResourceManagerSuccessful(self, taskManagerResources)) + +case msg: DisconnectResourceManager => + log.debug(s"Resource manager disconnect: $msg") + + currentResourceManager match { +case Some(rm) if rm.equals(msg.resourceManager()) => + // we should ditch the current resource manager + log.debug(s"Disconnecting resource manager $rm.") + // send the old one a disconnect message + rm ! decorateMessage(new TriggerRegistrationAtJobManager(self)) + currentResourceManager = None +case None => + // not connected, thus ignoring this message + log.warn(s"No resource manager ${msg.resourceManager()} connected. Can't disconnect.") + } + +case msg @ RegisterTaskManager( + resourceId, + connectionInfo, + hardwareInformation, + numberOfSlots) => + // we are being informed by the ResourceManager that a new task manager is available + log.debug(s"RegisterTaskManager: $msg") val taskManager = sender() + currentResourceManager match { +case Some(rm) => + val future = (rm ? decorateMessage(new RegisterResource(taskManager, msg)))(timeout) + future.onComplete { +case scala.util.Success(response) => + // the resource manager is available and answered + self ! response +case scala.util.Failure(t) => --- End diff -- Done (doesn't change the diff). > ResourceManager runtime components > -- > > Key: FLINK-3544 > URL: https://issues.apache.org/jira/browse/FLINK-3544 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Affects Versions: 1.1.0 >Reporter: Maximilian Michels >Assignee: Maximilian Michels > Fix For: 1.1.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3544) ResourceManager runtime components
[ https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15206835#comment-15206835 ] ASF GitHub Bot commented on FLINK-3544: --- Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/1741#issuecomment-199920379 Great work @mxm! I really like the new architecture :-) I had some minor comments. The only thing which is important to fix is that the `JobManager` can terminate if it is not connected to a `FlinkResourceManager`. Apart from that, I think we should merge it soon so that it can get some exposure. > ResourceManager runtime components > -- > > Key: FLINK-3544 > URL: https://issues.apache.org/jira/browse/FLINK-3544 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Affects Versions: 1.1.0 >Reporter: Maximilian Michels >Assignee: Maximilian Michels > Fix For: 1.1.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3544] Introduce ResourceManager compone...
Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/1741#issuecomment-199920379 Great work @mxm! I really like the new architecture :-) I had some minor comments. The only thing which is important to fix is that the `JobManager` can terminate if it is not connected to a `FlinkResourceManager`. Apart from that, I think we should merge it soon so that it can get some exposure. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3544) ResourceManager runtime components
[ https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15206832#comment-15206832 ] ASF GitHub Bot commented on FLINK-3544: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1741#discussion_r57030485 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java --- @@ -0,0 +1,647 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import akka.actor.ActorRef; +import akka.actor.Props; + +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.clusterframework.FlinkResourceManager; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; +import org.apache.flink.runtime.clusterframework.messages.StopCluster; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.instance.AkkaActorGateway; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.flink.yarn.messages.ContainersAllocated; +import org.apache.flink.yarn.messages.ContainersComplete; + +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.client.api.AMRMClient; +import org.apache.hadoop.yarn.client.api.NMClient; +import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; +import org.apache.hadoop.yarn.conf.YarnConfiguration; + +import org.slf4j.Logger; + +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static java.util.Objects.requireNonNull; + +/** + * Specialized Flink Resource Manager implementation for YARN clusters. It is started as the + * YARN ApplicationMaster and implements the YARN-specific logic for container requests and failure + * monitoring. + */ +public class YarnFlinkResourceManager extends FlinkResourceManager { + + /** The heartbeat interval while the resource master is waiting for containers */ + private static final int FAST_YARN_HEARTBEAT_INTERVAL_MS = 500; + + /** The default heartbeat interval during regular operation */ + private static final int DEFAULT_YARN_HEARTBEAT_INTERVAL_MS = 5000; + + /** The containers where a TaskManager is starting and we are waiting for it to register */ + private final Map containersInLaunch; + + /** Containers we have released, where we are waiting for an acknowledgement that +* they are released */ + private final Map containersBeingReturned; + + /** The YARN / Hadoop configuration object */ + private final YarnConfiguration yarnConfig; + + /** The TaskManager container parameters (like container memory size) */ + private final ContaineredTaskManagerParameters taskManagerParameters; + + /** Context information used to start a TaskManager Java process */ + private final ContainerLaunchContext taskManagerLaunchContext; + + /** Host name for the container running this process */ + private fi
[GitHub] flink pull request: [FLINK-3544] Introduce ResourceManager compone...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1741#discussion_r57030485 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java --- @@ -0,0 +1,647 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import akka.actor.ActorRef; +import akka.actor.Props; + +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.clusterframework.FlinkResourceManager; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; +import org.apache.flink.runtime.clusterframework.messages.StopCluster; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.instance.AkkaActorGateway; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.flink.yarn.messages.ContainersAllocated; +import org.apache.flink.yarn.messages.ContainersComplete; + +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.client.api.AMRMClient; +import org.apache.hadoop.yarn.client.api.NMClient; +import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; +import org.apache.hadoop.yarn.conf.YarnConfiguration; + +import org.slf4j.Logger; + +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static java.util.Objects.requireNonNull; + +/** + * Specialized Flink Resource Manager implementation for YARN clusters. It is started as the + * YARN ApplicationMaster and implements the YARN-specific logic for container requests and failure + * monitoring. + */ +public class YarnFlinkResourceManager extends FlinkResourceManager { + + /** The heartbeat interval while the resource master is waiting for containers */ + private static final int FAST_YARN_HEARTBEAT_INTERVAL_MS = 500; + + /** The default heartbeat interval during regular operation */ + private static final int DEFAULT_YARN_HEARTBEAT_INTERVAL_MS = 5000; + + /** The containers where a TaskManager is starting and we are waiting for it to register */ + private final Map containersInLaunch; + + /** Containers we have released, where we are waiting for an acknowledgement that +* they are released */ + private final Map containersBeingReturned; + + /** The YARN / Hadoop configuration object */ + private final YarnConfiguration yarnConfig; + + /** The TaskManager container parameters (like container memory size) */ + private final ContaineredTaskManagerParameters taskManagerParameters; + + /** Context information used to start a TaskManager Java process */ + private final ContainerLaunchContext taskManagerLaunchContext; + + /** Host name for the container running this process */ + private final String applicationMasterHostName; + + /** Web interface URL, may be null */ + private final String webInterfaceURL; + + /** Default heartbeat interval between this actor and the YARN resource manager */ + private final
[jira] [Commented] (FLINK-3544) ResourceManager runtime components
[ https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15206815#comment-15206815 ] ASF GitHub Bot commented on FLINK-3544: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1741#discussion_r57028233 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala --- @@ -312,59 +323,121 @@ class JobManager( leaderSessionID = None -case RegisterTaskManager( - connectionInfo, - hardwareInformation, - numberOfSlots) => +case msg: RegisterResourceManager => + log.debug(s"Resource manager registration: $msg") + + // ditch current resource manager (if any) + currentResourceManager = Option(msg.resourceManager()) + + val taskManagerResources = instanceManager.getAllRegisteredInstances.asScala.map( +instance => instance.getResourceId).toList.asJava + + // confirm registration and send known task managers with their resource ids + sender ! decorateMessage(new RegisterResourceManagerSuccessful(self, taskManagerResources)) + +case msg: DisconnectResourceManager => + log.debug(s"Resource manager disconnect: $msg") + + currentResourceManager match { +case Some(rm) if rm.equals(msg.resourceManager()) => + // we should ditch the current resource manager + log.debug(s"Disconnecting resource manager $rm.") + // send the old one a disconnect message + rm ! decorateMessage(new TriggerRegistrationAtJobManager(self)) + currentResourceManager = None +case None => + // not connected, thus ignoring this message + log.warn(s"No resource manager ${msg.resourceManager()} connected. Can't disconnect.") + } + +case msg @ RegisterTaskManager( + resourceId, + connectionInfo, + hardwareInformation, + numberOfSlots) => + // we are being informed by the ResourceManager that a new task manager is available + log.debug(s"RegisterTaskManager: $msg") val taskManager = sender() + currentResourceManager match { +case Some(rm) => + val future = (rm ? decorateMessage(new RegisterResource(taskManager, msg)))(timeout) + future.onComplete { +case scala.util.Success(response) => + // the resource manager is available and answered + self ! response +case scala.util.Failure(t) => --- End diff -- Could be good to log the failure. > ResourceManager runtime components > -- > > Key: FLINK-3544 > URL: https://issues.apache.org/jira/browse/FLINK-3544 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Affects Versions: 1.1.0 >Reporter: Maximilian Michels >Assignee: Maximilian Michels > Fix For: 1.1.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3544] Introduce ResourceManager compone...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1741#discussion_r57028233 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala --- @@ -312,59 +323,121 @@ class JobManager( leaderSessionID = None -case RegisterTaskManager( - connectionInfo, - hardwareInformation, - numberOfSlots) => +case msg: RegisterResourceManager => + log.debug(s"Resource manager registration: $msg") + + // ditch current resource manager (if any) + currentResourceManager = Option(msg.resourceManager()) + + val taskManagerResources = instanceManager.getAllRegisteredInstances.asScala.map( +instance => instance.getResourceId).toList.asJava + + // confirm registration and send known task managers with their resource ids + sender ! decorateMessage(new RegisterResourceManagerSuccessful(self, taskManagerResources)) + +case msg: DisconnectResourceManager => + log.debug(s"Resource manager disconnect: $msg") + + currentResourceManager match { +case Some(rm) if rm.equals(msg.resourceManager()) => + // we should ditch the current resource manager + log.debug(s"Disconnecting resource manager $rm.") + // send the old one a disconnect message + rm ! decorateMessage(new TriggerRegistrationAtJobManager(self)) + currentResourceManager = None +case None => + // not connected, thus ignoring this message + log.warn(s"No resource manager ${msg.resourceManager()} connected. Can't disconnect.") + } + +case msg @ RegisterTaskManager( + resourceId, + connectionInfo, + hardwareInformation, + numberOfSlots) => + // we are being informed by the ResourceManager that a new task manager is available + log.debug(s"RegisterTaskManager: $msg") val taskManager = sender() + currentResourceManager match { +case Some(rm) => + val future = (rm ? decorateMessage(new RegisterResource(taskManager, msg)))(timeout) + future.onComplete { +case scala.util.Success(response) => + // the resource manager is available and answered + self ! response +case scala.util.Failure(t) => --- End diff -- Could be good to log the failure. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2821) Change Akka configuration to allow accessing actors from different URLs
[ https://issues.apache.org/jira/browse/FLINK-2821?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15206810#comment-15206810 ] Stephan Ewen commented on FLINK-2821: - How about the following solution (multiple steps): - We make the default Scala version from Flink 1.1 on Scala 2.11. That version is stable and many people seem to use it anyways (my impression from the mailing lists) - We make the default Akka version Akka 2.4.x - The "change-scala-version.sh" script needs to adjust both Scala and Akka versions. That way, the default (don't care which Scala version) releases would have a fix. Specially downgraded versions would have the restriction. > Change Akka configuration to allow accessing actors from different URLs > --- > > Key: FLINK-2821 > URL: https://issues.apache.org/jira/browse/FLINK-2821 > Project: Flink > Issue Type: Bug > Components: Distributed Runtime >Reporter: Robert Metzger >Assignee: Maximilian Michels > > Akka expects the actor's URL to be exactly matching. > As pointed out here, cases where users were complaining about this: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Error-trying-to-access-JM-through-proxy-td3018.html > - Proxy routing (as described here, send to the proxy URL, receiver > recognizes only original URL) > - Using hostname / IP interchangeably does not work (we solved this by > always putting IP addresses into URLs, never hostnames) > - Binding to multiple interfaces (any local 0.0.0.0) does not work. Still > no solution to that (but seems not too much of a restriction) > I am aware that this is not possible due to Akka, so it is actually not a > Flink bug. But I think we should track the resolution of the issue here > anyways because its affecting our user's satisfaction. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-3654) Disable Write-Ahead-Log in RocksDB State
[ https://issues.apache.org/jira/browse/FLINK-3654?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek updated FLINK-3654: Component/s: (was: Sure) Streaming > Disable Write-Ahead-Log in RocksDB State > > > Key: FLINK-3654 > URL: https://issues.apache.org/jira/browse/FLINK-3654 > Project: Flink > Issue Type: Improvement > Components: Streaming >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek > > We do our own checkpointing of the RocksDB database so the WAL is useless to > us. Disabling writes to the WAL should give us a very large performance boost. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3654) Disable Write-Ahead-Log in RocksDB State
Aljoscha Krettek created FLINK-3654: --- Summary: Disable Write-Ahead-Log in RocksDB State Key: FLINK-3654 URL: https://issues.apache.org/jira/browse/FLINK-3654 Project: Flink Issue Type: Improvement Components: Sure Reporter: Aljoscha Krettek Assignee: Aljoscha Krettek We do our own checkpointing of the RocksDB database so the WAL is useless to us. Disabling writes to the WAL should give us a very large performance boost. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3211) Add AWS Kinesis streaming connector
[ https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15206777#comment-15206777 ] Robert Metzger commented on FLINK-3211: --- Great, thank you. I'm looking forward reviewing it. > Add AWS Kinesis streaming connector > --- > > Key: FLINK-3211 > URL: https://issues.apache.org/jira/browse/FLINK-3211 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Affects Versions: 1.0.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > Original Estimate: 336h > Remaining Estimate: 336h > > AWS Kinesis is a widely adopted message queue used by AWS users, much like a > cloud service version of Apache Kafka. Support for AWS Kinesis will be a > great addition to the handful of Flink's streaming connectors to external > systems and a great reach out to the AWS community. > AWS supports two different ways to consume Kinesis data: with the low-level > AWS SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK > can be used to consume Kinesis data, including stream read beginning from a > specific offset (or "record sequence number" in Kinesis terminology). On the > other hand, AWS officially recommends using KCL, which offers a higher-level > of abstraction that also comes with checkpointing and failure recovery by > using a KCL-managed AWS DynamoDB "leash table" as the checkpoint state > storage. > However, KCL is essentially a stream processing library that wraps all the > partition-to-task (or "shard" in Kinesis terminology) determination and > checkpointing to allow the user to focus only on streaming application logic. > This leads to the understanding that we can not use the KCL to implement the > Kinesis streaming connector if we are aiming for a deep integration of Flink > with Kinesis that provides exactly once guarantees (KCL promises only > at-least-once). Therefore, AWS SDK will be the way to go for the > implementation of this feature. > With the ability to read from specific offsets, and also the fact that > Kinesis and Kafka share a lot of similarities, the basic principles of the > implementation of Flink's Kinesis streaming connector will very much resemble > the Kafka connector. We can basically follow the outlines described in > [~StephanEwen]'s description [3] and [~rmetzger]'s Kafka connector > implementation [4]. A few tweaks due to some of Kinesis v.s. Kafka > differences is described as following: > 1. While the Kafka connector can support reading from multiple topics, I > currently don't think this is a good idea for Kinesis streams (a Kinesis > Stream is logically equivalent to a Kafka topic). Kinesis streams can exist > in different AWS regions, and each Kinesis stream under the same AWS user > account may have completely independent access settings with different > authorization keys. Overall, a Kinesis stream feels like a much more > consolidated resource compared to Kafka topics. It would be great to hear > more thoughts on this part. > 2. While Kafka has brokers that can hold multiple partitions, the only > partitioning abstraction for AWS Kinesis is "shards". Therefore, in contrast > to the Kafka connector having per broker connections where the connections > can handle multiple Kafka partitions, the Kinesis connector will only need to > have simple per shard connections. > 3. Kinesis itself does not support committing offsets back to Kinesis. If we > were to implement this feature like the Kafka connector with Kafka / ZK to > sync outside view of progress, we probably could use ZK or DynamoDB like the > way KCL works. More thoughts on this part will be very helpful too. > As for the Kinesis Sink, it should be possible to use the AWS KPL (Kinesis > Producer Library) [5]. However, for higher code consistency with the proposed > Kinesis Consumer, I think it will be better to stick with the AWS SDK for the > implementation. The implementation should be straight forward, being almost > if not completely the same as the Kafka sink. > References: > [1] > http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-sdk.html > [2] > http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html > [3] > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-Connector-td2872.html > [4] http://data-artisans.com/kafka-flink-a-practical-how-to/ > [5] > http://docs.aws.amazon.com//kinesis/latest/dev/developing-producers-with-kpl.html#d0e4998 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (FLINK-3211) Add AWS Kinesis streaming connector
[ https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15206770#comment-15206770 ] Tzu-Li (Gordon) Tai edited comment on FLINK-3211 at 3/22/16 4:52 PM: - No problem :) I'll have it ready soon, hopefully by the end of March. was (Author: tzulitai): No problem :) I'll have it ready soon. > Add AWS Kinesis streaming connector > --- > > Key: FLINK-3211 > URL: https://issues.apache.org/jira/browse/FLINK-3211 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Affects Versions: 1.0.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > Original Estimate: 336h > Remaining Estimate: 336h > > AWS Kinesis is a widely adopted message queue used by AWS users, much like a > cloud service version of Apache Kafka. Support for AWS Kinesis will be a > great addition to the handful of Flink's streaming connectors to external > systems and a great reach out to the AWS community. > AWS supports two different ways to consume Kinesis data: with the low-level > AWS SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK > can be used to consume Kinesis data, including stream read beginning from a > specific offset (or "record sequence number" in Kinesis terminology). On the > other hand, AWS officially recommends using KCL, which offers a higher-level > of abstraction that also comes with checkpointing and failure recovery by > using a KCL-managed AWS DynamoDB "leash table" as the checkpoint state > storage. > However, KCL is essentially a stream processing library that wraps all the > partition-to-task (or "shard" in Kinesis terminology) determination and > checkpointing to allow the user to focus only on streaming application logic. > This leads to the understanding that we can not use the KCL to implement the > Kinesis streaming connector if we are aiming for a deep integration of Flink > with Kinesis that provides exactly once guarantees (KCL promises only > at-least-once). Therefore, AWS SDK will be the way to go for the > implementation of this feature. > With the ability to read from specific offsets, and also the fact that > Kinesis and Kafka share a lot of similarities, the basic principles of the > implementation of Flink's Kinesis streaming connector will very much resemble > the Kafka connector. We can basically follow the outlines described in > [~StephanEwen]'s description [3] and [~rmetzger]'s Kafka connector > implementation [4]. A few tweaks due to some of Kinesis v.s. Kafka > differences is described as following: > 1. While the Kafka connector can support reading from multiple topics, I > currently don't think this is a good idea for Kinesis streams (a Kinesis > Stream is logically equivalent to a Kafka topic). Kinesis streams can exist > in different AWS regions, and each Kinesis stream under the same AWS user > account may have completely independent access settings with different > authorization keys. Overall, a Kinesis stream feels like a much more > consolidated resource compared to Kafka topics. It would be great to hear > more thoughts on this part. > 2. While Kafka has brokers that can hold multiple partitions, the only > partitioning abstraction for AWS Kinesis is "shards". Therefore, in contrast > to the Kafka connector having per broker connections where the connections > can handle multiple Kafka partitions, the Kinesis connector will only need to > have simple per shard connections. > 3. Kinesis itself does not support committing offsets back to Kinesis. If we > were to implement this feature like the Kafka connector with Kafka / ZK to > sync outside view of progress, we probably could use ZK or DynamoDB like the > way KCL works. More thoughts on this part will be very helpful too. > As for the Kinesis Sink, it should be possible to use the AWS KPL (Kinesis > Producer Library) [5]. However, for higher code consistency with the proposed > Kinesis Consumer, I think it will be better to stick with the AWS SDK for the > implementation. The implementation should be straight forward, being almost > if not completely the same as the Kafka sink. > References: > [1] > http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-sdk.html > [2] > http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html > [3] > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-Connector-td2872.html > [4] http://data-artisans.com/kafka-flink-a-practical-how-to/ > [5] > http://docs.aws.amazon.com//kinesis/latest/dev/developing-producers-with-kpl.html#d0e4998 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (FLINK-3211) Add AWS Kinesis streaming connector
[ https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15206770#comment-15206770 ] Tzu-Li (Gordon) Tai edited comment on FLINK-3211 at 3/22/16 4:51 PM: - No problem :) I'll have it ready soon. was (Author: tzulitai): No problem :) > Add AWS Kinesis streaming connector > --- > > Key: FLINK-3211 > URL: https://issues.apache.org/jira/browse/FLINK-3211 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Affects Versions: 1.0.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > Original Estimate: 336h > Remaining Estimate: 336h > > AWS Kinesis is a widely adopted message queue used by AWS users, much like a > cloud service version of Apache Kafka. Support for AWS Kinesis will be a > great addition to the handful of Flink's streaming connectors to external > systems and a great reach out to the AWS community. > AWS supports two different ways to consume Kinesis data: with the low-level > AWS SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK > can be used to consume Kinesis data, including stream read beginning from a > specific offset (or "record sequence number" in Kinesis terminology). On the > other hand, AWS officially recommends using KCL, which offers a higher-level > of abstraction that also comes with checkpointing and failure recovery by > using a KCL-managed AWS DynamoDB "leash table" as the checkpoint state > storage. > However, KCL is essentially a stream processing library that wraps all the > partition-to-task (or "shard" in Kinesis terminology) determination and > checkpointing to allow the user to focus only on streaming application logic. > This leads to the understanding that we can not use the KCL to implement the > Kinesis streaming connector if we are aiming for a deep integration of Flink > with Kinesis that provides exactly once guarantees (KCL promises only > at-least-once). Therefore, AWS SDK will be the way to go for the > implementation of this feature. > With the ability to read from specific offsets, and also the fact that > Kinesis and Kafka share a lot of similarities, the basic principles of the > implementation of Flink's Kinesis streaming connector will very much resemble > the Kafka connector. We can basically follow the outlines described in > [~StephanEwen]'s description [3] and [~rmetzger]'s Kafka connector > implementation [4]. A few tweaks due to some of Kinesis v.s. Kafka > differences is described as following: > 1. While the Kafka connector can support reading from multiple topics, I > currently don't think this is a good idea for Kinesis streams (a Kinesis > Stream is logically equivalent to a Kafka topic). Kinesis streams can exist > in different AWS regions, and each Kinesis stream under the same AWS user > account may have completely independent access settings with different > authorization keys. Overall, a Kinesis stream feels like a much more > consolidated resource compared to Kafka topics. It would be great to hear > more thoughts on this part. > 2. While Kafka has brokers that can hold multiple partitions, the only > partitioning abstraction for AWS Kinesis is "shards". Therefore, in contrast > to the Kafka connector having per broker connections where the connections > can handle multiple Kafka partitions, the Kinesis connector will only need to > have simple per shard connections. > 3. Kinesis itself does not support committing offsets back to Kinesis. If we > were to implement this feature like the Kafka connector with Kafka / ZK to > sync outside view of progress, we probably could use ZK or DynamoDB like the > way KCL works. More thoughts on this part will be very helpful too. > As for the Kinesis Sink, it should be possible to use the AWS KPL (Kinesis > Producer Library) [5]. However, for higher code consistency with the proposed > Kinesis Consumer, I think it will be better to stick with the AWS SDK for the > implementation. The implementation should be straight forward, being almost > if not completely the same as the Kafka sink. > References: > [1] > http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-sdk.html > [2] > http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html > [3] > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-Connector-td2872.html > [4] http://data-artisans.com/kafka-flink-a-practical-how-to/ > [5] > http://docs.aws.amazon.com//kinesis/latest/dev/developing-producers-with-kpl.html#d0e4998 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3211) Add AWS Kinesis streaming connector
[ https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15206770#comment-15206770 ] Tzu-Li (Gordon) Tai commented on FLINK-3211: No problem :) > Add AWS Kinesis streaming connector > --- > > Key: FLINK-3211 > URL: https://issues.apache.org/jira/browse/FLINK-3211 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Affects Versions: 1.0.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > Original Estimate: 336h > Remaining Estimate: 336h > > AWS Kinesis is a widely adopted message queue used by AWS users, much like a > cloud service version of Apache Kafka. Support for AWS Kinesis will be a > great addition to the handful of Flink's streaming connectors to external > systems and a great reach out to the AWS community. > AWS supports two different ways to consume Kinesis data: with the low-level > AWS SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK > can be used to consume Kinesis data, including stream read beginning from a > specific offset (or "record sequence number" in Kinesis terminology). On the > other hand, AWS officially recommends using KCL, which offers a higher-level > of abstraction that also comes with checkpointing and failure recovery by > using a KCL-managed AWS DynamoDB "leash table" as the checkpoint state > storage. > However, KCL is essentially a stream processing library that wraps all the > partition-to-task (or "shard" in Kinesis terminology) determination and > checkpointing to allow the user to focus only on streaming application logic. > This leads to the understanding that we can not use the KCL to implement the > Kinesis streaming connector if we are aiming for a deep integration of Flink > with Kinesis that provides exactly once guarantees (KCL promises only > at-least-once). Therefore, AWS SDK will be the way to go for the > implementation of this feature. > With the ability to read from specific offsets, and also the fact that > Kinesis and Kafka share a lot of similarities, the basic principles of the > implementation of Flink's Kinesis streaming connector will very much resemble > the Kafka connector. We can basically follow the outlines described in > [~StephanEwen]'s description [3] and [~rmetzger]'s Kafka connector > implementation [4]. A few tweaks due to some of Kinesis v.s. Kafka > differences is described as following: > 1. While the Kafka connector can support reading from multiple topics, I > currently don't think this is a good idea for Kinesis streams (a Kinesis > Stream is logically equivalent to a Kafka topic). Kinesis streams can exist > in different AWS regions, and each Kinesis stream under the same AWS user > account may have completely independent access settings with different > authorization keys. Overall, a Kinesis stream feels like a much more > consolidated resource compared to Kafka topics. It would be great to hear > more thoughts on this part. > 2. While Kafka has brokers that can hold multiple partitions, the only > partitioning abstraction for AWS Kinesis is "shards". Therefore, in contrast > to the Kafka connector having per broker connections where the connections > can handle multiple Kafka partitions, the Kinesis connector will only need to > have simple per shard connections. > 3. Kinesis itself does not support committing offsets back to Kinesis. If we > were to implement this feature like the Kafka connector with Kafka / ZK to > sync outside view of progress, we probably could use ZK or DynamoDB like the > way KCL works. More thoughts on this part will be very helpful too. > As for the Kinesis Sink, it should be possible to use the AWS KPL (Kinesis > Producer Library) [5]. However, for higher code consistency with the proposed > Kinesis Consumer, I think it will be better to stick with the AWS SDK for the > implementation. The implementation should be straight forward, being almost > if not completely the same as the Kafka sink. > References: > [1] > http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-sdk.html > [2] > http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html > [3] > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-Connector-td2872.html > [4] http://data-artisans.com/kafka-flink-a-practical-how-to/ > [5] > http://docs.aws.amazon.com//kinesis/latest/dev/developing-producers-with-kpl.html#d0e4998 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3579) Improve String concatenation
[ https://issues.apache.org/jira/browse/FLINK-3579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15206764#comment-15206764 ] ASF GitHub Bot commented on FLINK-3579: --- Github user ramkrish86 commented on the pull request: https://github.com/apache/flink/pull/1821#issuecomment-199900035 Updated PR. RexNodeTranslator had > 100 character line. That is now corrected. > Improve String concatenation > > > Key: FLINK-3579 > URL: https://issues.apache.org/jira/browse/FLINK-3579 > Project: Flink > Issue Type: Bug > Components: Table API >Reporter: Timo Walther >Assignee: ramkrishna.s.vasudevan >Priority: Minor > > Concatenation of a String and non-String does not work properly. > e.g. {{f0 + 42}} leads to RelBuilder Exception > ExpressionParser does not like {{f0 + 42.cast(STRING)}} either. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: FLINK-3579 Improve String concatenation (Ram)
Github user ramkrish86 commented on the pull request: https://github.com/apache/flink/pull/1821#issuecomment-199900035 Updated PR. RexNodeTranslator had > 100 character line. That is now corrected. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3544) ResourceManager runtime components
[ https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15206695#comment-15206695 ] ASF GitHub Bot commented on FLINK-3544: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1741#discussion_r57017514 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java --- @@ -0,0 +1,647 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import akka.actor.ActorRef; +import akka.actor.Props; + +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.clusterframework.FlinkResourceManager; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; +import org.apache.flink.runtime.clusterframework.messages.StopCluster; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.instance.AkkaActorGateway; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.flink.yarn.messages.ContainersAllocated; +import org.apache.flink.yarn.messages.ContainersComplete; + +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.client.api.AMRMClient; +import org.apache.hadoop.yarn.client.api.NMClient; +import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; +import org.apache.hadoop.yarn.conf.YarnConfiguration; + +import org.slf4j.Logger; + +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static java.util.Objects.requireNonNull; + +/** + * Specialized Flink Resource Manager implementation for YARN clusters. It is started as the + * YARN ApplicationMaster and implements the YARN-specific logic for container requests and failure + * monitoring. + */ +public class YarnFlinkResourceManager extends FlinkResourceManager { + + /** The heartbeat interval while the resource master is waiting for containers */ + private static final int FAST_YARN_HEARTBEAT_INTERVAL_MS = 500; + + /** The default heartbeat interval during regular operation */ + private static final int DEFAULT_YARN_HEARTBEAT_INTERVAL_MS = 5000; + + /** The containers where a TaskManager is starting and we are waiting for it to register */ + private final Map containersInLaunch; + + /** Containers we have released, where we are waiting for an acknowledgement that +* they are released */ + private final Map containersBeingReturned; + + /** The YARN / Hadoop configuration object */ + private final YarnConfiguration yarnConfig; + + /** The TaskManager container parameters (like container memory size) */ + private final ContaineredTaskManagerParameters taskManagerParameters; + + /** Context information used to start a TaskManager Java process */ + private final ContainerLaunchContext taskManagerLaunchContext; + + /** Host name for the container running this process */ + p
[jira] [Commented] (FLINK-3211) Add AWS Kinesis streaming connector
[ https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15206755#comment-15206755 ] Robert Metzger commented on FLINK-3211: --- Hi Tzu, thank you for your response. I'm happy to see that you are concerned about having good tests for the code. We have those tests for all connectors which connect with an open source system, running on the JVM. But for example our RabbitMQ connector also doesn't contain any tests connecting to a real RMQ. There are some tests using a mocked RMQ. Its fine to follow the same approach with the Kinesis connector. Lets just manually test the code on AWS. Please open a pull request as soon as you have it ready so that we can start reviewing it. It would be nice to put it into the Flink 1.1 release. > Add AWS Kinesis streaming connector > --- > > Key: FLINK-3211 > URL: https://issues.apache.org/jira/browse/FLINK-3211 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Affects Versions: 1.0.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > Original Estimate: 336h > Remaining Estimate: 336h > > AWS Kinesis is a widely adopted message queue used by AWS users, much like a > cloud service version of Apache Kafka. Support for AWS Kinesis will be a > great addition to the handful of Flink's streaming connectors to external > systems and a great reach out to the AWS community. > AWS supports two different ways to consume Kinesis data: with the low-level > AWS SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK > can be used to consume Kinesis data, including stream read beginning from a > specific offset (or "record sequence number" in Kinesis terminology). On the > other hand, AWS officially recommends using KCL, which offers a higher-level > of abstraction that also comes with checkpointing and failure recovery by > using a KCL-managed AWS DynamoDB "leash table" as the checkpoint state > storage. > However, KCL is essentially a stream processing library that wraps all the > partition-to-task (or "shard" in Kinesis terminology) determination and > checkpointing to allow the user to focus only on streaming application logic. > This leads to the understanding that we can not use the KCL to implement the > Kinesis streaming connector if we are aiming for a deep integration of Flink > with Kinesis that provides exactly once guarantees (KCL promises only > at-least-once). Therefore, AWS SDK will be the way to go for the > implementation of this feature. > With the ability to read from specific offsets, and also the fact that > Kinesis and Kafka share a lot of similarities, the basic principles of the > implementation of Flink's Kinesis streaming connector will very much resemble > the Kafka connector. We can basically follow the outlines described in > [~StephanEwen]'s description [3] and [~rmetzger]'s Kafka connector > implementation [4]. A few tweaks due to some of Kinesis v.s. Kafka > differences is described as following: > 1. While the Kafka connector can support reading from multiple topics, I > currently don't think this is a good idea for Kinesis streams (a Kinesis > Stream is logically equivalent to a Kafka topic). Kinesis streams can exist > in different AWS regions, and each Kinesis stream under the same AWS user > account may have completely independent access settings with different > authorization keys. Overall, a Kinesis stream feels like a much more > consolidated resource compared to Kafka topics. It would be great to hear > more thoughts on this part. > 2. While Kafka has brokers that can hold multiple partitions, the only > partitioning abstraction for AWS Kinesis is "shards". Therefore, in contrast > to the Kafka connector having per broker connections where the connections > can handle multiple Kafka partitions, the Kinesis connector will only need to > have simple per shard connections. > 3. Kinesis itself does not support committing offsets back to Kinesis. If we > were to implement this feature like the Kafka connector with Kafka / ZK to > sync outside view of progress, we probably could use ZK or DynamoDB like the > way KCL works. More thoughts on this part will be very helpful too. > As for the Kinesis Sink, it should be possible to use the AWS KPL (Kinesis > Producer Library) [5]. However, for higher code consistency with the proposed > Kinesis Consumer, I think it will be better to stick with the AWS SDK for the > implementation. The implementation should be straight forward, being almost > if not completely the same as the Kafka sink. > References: > [1] > http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-sdk.html > [2] > http://docs.aws.amazon.com/kinesis/latest/dev/developing-co
[jira] [Commented] (FLINK-3544) ResourceManager runtime components
[ https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15206753#comment-15206753 ] ASF GitHub Bot commented on FLINK-3544: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1741#discussion_r57021932 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerITCase.java --- @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.resourcemanager; + +import akka.actor.ActorSystem; +import akka.testkit.JavaTestKit; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.messages.StopCluster; +import org.apache.flink.runtime.clusterframework.messages.StopClusterSuccessful; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.instance.InstanceConnectionInfo; +import org.apache.flink.runtime.messages.Messages; +import org.apache.flink.runtime.messages.RegistrationMessages; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.testutils.TestingResourceManager; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; +import scala.Option; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + + +public class ResourceManagerITCase { + + private static ActorSystem system; + + private static Configuration config = new Configuration(); + + @Before + public void setup() { + system = AkkaUtils.createActorSystem(AkkaUtils.getDefaultAkkaConfig()); + } + + @After + public void teardown() { + JavaTestKit.shutdownActorSystem(system); + } + + /** +* Tests whether the resource manager connects and reconciles existing task managers. +*/ + @Test + public void testResourceManagerReconciliation() { + + new JavaTestKit(system){{ + new Within(duration("10 seconds")) { + @Override + protected void run() { + + ActorGateway jobManager = TestingUtils.createJobManager(system, config); + ActorGateway me = + TestingUtils.createForwardingActor(system, getTestActor(), Option.empty()); + + // !! no resource manager started !! + + ResourceID resourceID = ResourceID.generate(); + + jobManager.tell( + new RegistrationMessages.RegisterTaskManager( + resourceID, + Mockito.mock(InstanceConnectionInfo.class), + null, + 1), + me); + + expectMsgClass(RegistrationMessages.AcknowledgeRegistration.class); + + // now start the resource manager + ActorGateway resourceManager = + TestingUtils.createResourceManager(system, jobManager.actor(), config); + + // register at testing job manager to receive a message once a resource manager registers + resourceManager.tell(new TestingResourceManager.NotifyWhenResourceManagerConnected(), me); + + // Wait for resource manager + expectMsgEquals(Messages.getAcknowledge()); + + // check if we registered the task manager resource +
[GitHub] flink pull request: [FLINK-3544] Introduce ResourceManager compone...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1741#discussion_r57021932 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerITCase.java --- @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.resourcemanager; + +import akka.actor.ActorSystem; +import akka.testkit.JavaTestKit; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.messages.StopCluster; +import org.apache.flink.runtime.clusterframework.messages.StopClusterSuccessful; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.instance.InstanceConnectionInfo; +import org.apache.flink.runtime.messages.Messages; +import org.apache.flink.runtime.messages.RegistrationMessages; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.testutils.TestingResourceManager; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; +import scala.Option; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + + +public class ResourceManagerITCase { + + private static ActorSystem system; + + private static Configuration config = new Configuration(); + + @Before + public void setup() { + system = AkkaUtils.createActorSystem(AkkaUtils.getDefaultAkkaConfig()); + } + + @After + public void teardown() { + JavaTestKit.shutdownActorSystem(system); + } + + /** +* Tests whether the resource manager connects and reconciles existing task managers. +*/ + @Test + public void testResourceManagerReconciliation() { + + new JavaTestKit(system){{ + new Within(duration("10 seconds")) { + @Override + protected void run() { + + ActorGateway jobManager = TestingUtils.createJobManager(system, config); + ActorGateway me = + TestingUtils.createForwardingActor(system, getTestActor(), Option.empty()); + + // !! no resource manager started !! + + ResourceID resourceID = ResourceID.generate(); + + jobManager.tell( + new RegistrationMessages.RegisterTaskManager( + resourceID, + Mockito.mock(InstanceConnectionInfo.class), + null, + 1), + me); + + expectMsgClass(RegistrationMessages.AcknowledgeRegistration.class); + + // now start the resource manager + ActorGateway resourceManager = + TestingUtils.createResourceManager(system, jobManager.actor(), config); + + // register at testing job manager to receive a message once a resource manager registers + resourceManager.tell(new TestingResourceManager.NotifyWhenResourceManagerConnected(), me); + + // Wait for resource manager + expectMsgEquals(Messages.getAcknowledge()); + + // check if we registered the task manager resource + resourceManager.tell(new TestingResourceManager.GetRegisteredResources(), me); + + TestingResourceManager.GetRegisteredResourcesReply reply = + expectMsgClass(TestingResourceManager.GetRegiste
[jira] [Commented] (FLINK-3579) Improve String concatenation
[ https://issues.apache.org/jira/browse/FLINK-3579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15206750#comment-15206750 ] ASF GitHub Bot commented on FLINK-3579: --- Github user ramkrish86 commented on the pull request: https://github.com/apache/flink/pull/1821#issuecomment-199897376 @vasia Thanks for the info. Let me check that and update the PR. > Improve String concatenation > > > Key: FLINK-3579 > URL: https://issues.apache.org/jira/browse/FLINK-3579 > Project: Flink > Issue Type: Bug > Components: Table API >Reporter: Timo Walther >Assignee: ramkrishna.s.vasudevan >Priority: Minor > > Concatenation of a String and non-String does not work properly. > e.g. {{f0 + 42}} leads to RelBuilder Exception > ExpressionParser does not like {{f0 + 42.cast(STRING)}} either. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: FLINK-3579 Improve String concatenation (Ram)
Github user ramkrish86 commented on the pull request: https://github.com/apache/flink/pull/1821#issuecomment-199897376 @vasia Thanks for the info. Let me check that and update the PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3211) Add AWS Kinesis streaming connector
[ https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15206739#comment-15206739 ] Tzu-Li (Gordon) Tai commented on FLINK-3211: Hi, thanks Roy. I don't think using the actual Kinesis API will be feasible since the tests will be part of the build. I did take a look at kinesalite at first and gave it a try, but was still quite tedious to integrate it into the code base for the sake of tests since it isn't Java/Scala. I am still quite new and unfamiliar with PR to the Flink community, and was uncertain of sending out the PR before all tests were properly covered. Nevertheless, if integration tests aren't too much of an issue for the connector and the connector is urgently needed, I can start polishing things up and prepare the PR! > Add AWS Kinesis streaming connector > --- > > Key: FLINK-3211 > URL: https://issues.apache.org/jira/browse/FLINK-3211 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Affects Versions: 1.0.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > Original Estimate: 336h > Remaining Estimate: 336h > > AWS Kinesis is a widely adopted message queue used by AWS users, much like a > cloud service version of Apache Kafka. Support for AWS Kinesis will be a > great addition to the handful of Flink's streaming connectors to external > systems and a great reach out to the AWS community. > AWS supports two different ways to consume Kinesis data: with the low-level > AWS SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK > can be used to consume Kinesis data, including stream read beginning from a > specific offset (or "record sequence number" in Kinesis terminology). On the > other hand, AWS officially recommends using KCL, which offers a higher-level > of abstraction that also comes with checkpointing and failure recovery by > using a KCL-managed AWS DynamoDB "leash table" as the checkpoint state > storage. > However, KCL is essentially a stream processing library that wraps all the > partition-to-task (or "shard" in Kinesis terminology) determination and > checkpointing to allow the user to focus only on streaming application logic. > This leads to the understanding that we can not use the KCL to implement the > Kinesis streaming connector if we are aiming for a deep integration of Flink > with Kinesis that provides exactly once guarantees (KCL promises only > at-least-once). Therefore, AWS SDK will be the way to go for the > implementation of this feature. > With the ability to read from specific offsets, and also the fact that > Kinesis and Kafka share a lot of similarities, the basic principles of the > implementation of Flink's Kinesis streaming connector will very much resemble > the Kafka connector. We can basically follow the outlines described in > [~StephanEwen]'s description [3] and [~rmetzger]'s Kafka connector > implementation [4]. A few tweaks due to some of Kinesis v.s. Kafka > differences is described as following: > 1. While the Kafka connector can support reading from multiple topics, I > currently don't think this is a good idea for Kinesis streams (a Kinesis > Stream is logically equivalent to a Kafka topic). Kinesis streams can exist > in different AWS regions, and each Kinesis stream under the same AWS user > account may have completely independent access settings with different > authorization keys. Overall, a Kinesis stream feels like a much more > consolidated resource compared to Kafka topics. It would be great to hear > more thoughts on this part. > 2. While Kafka has brokers that can hold multiple partitions, the only > partitioning abstraction for AWS Kinesis is "shards". Therefore, in contrast > to the Kafka connector having per broker connections where the connections > can handle multiple Kafka partitions, the Kinesis connector will only need to > have simple per shard connections. > 3. Kinesis itself does not support committing offsets back to Kinesis. If we > were to implement this feature like the Kafka connector with Kafka / ZK to > sync outside view of progress, we probably could use ZK or DynamoDB like the > way KCL works. More thoughts on this part will be very helpful too. > As for the Kinesis Sink, it should be possible to use the AWS KPL (Kinesis > Producer Library) [5]. However, for higher code consistency with the proposed > Kinesis Consumer, I think it will be better to stick with the AWS SDK for the > implementation. The implementation should be straight forward, being almost > if not completely the same as the Kafka sink. > References: > [1] > http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-sdk.html > [2] > http://docs.aws.amazon.com/kinesis/latest/dev
[jira] [Commented] (FLINK-2946) Add orderBy() to Table API
[ https://issues.apache.org/jira/browse/FLINK-2946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15206731#comment-15206731 ] Dawid Wysakowicz commented on FLINK-2946: - Hi, I have started working on this issue and would be really gratefull if someone could have a quick look on what I did so far. Did I go the right direction? Also I am not sure how to approach the parralelism/sortPartition/partitionByRange issue. Generally any advices welcome. The changes are here: https://github.com/dawidwys/flink/tree/tableSort > Add orderBy() to Table API > -- > > Key: FLINK-2946 > URL: https://issues.apache.org/jira/browse/FLINK-2946 > Project: Flink > Issue Type: New Feature > Components: Table API >Reporter: Timo Walther >Assignee: Dawid Wysakowicz > > In order to implement a FLINK-2099 prototype that uses the Table APIs code > generation facilities, the Table API needs a sorting feature. > I would implement it the next days. Ideas how to implement such a sorting > feature are very welcome. Is there any more efficient way instead of > {{.sortPartition(...).setParallism(1)}}? Is it better to sort locally on the > nodes first and finally sort on one node afterwards? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3211) Add AWS Kinesis streaming connector
[ https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15206709#comment-15206709 ] Tzu-Li (Gordon) Tai commented on FLINK-3211: Hi Roy & Robert Apologies for the late reply. I already have a working version that I have been using in non-production environments for a while now. The main reason for not sending a PR yet is because of the lack of tests for the Kinesis producer. The main issue is with integration tests, which is hard since Kinesis is an AWS service and doesn't come with a local version. If this isn't too much of an issue, I can surely PR soon! > Add AWS Kinesis streaming connector > --- > > Key: FLINK-3211 > URL: https://issues.apache.org/jira/browse/FLINK-3211 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Affects Versions: 1.0.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > Original Estimate: 336h > Remaining Estimate: 336h > > AWS Kinesis is a widely adopted message queue used by AWS users, much like a > cloud service version of Apache Kafka. Support for AWS Kinesis will be a > great addition to the handful of Flink's streaming connectors to external > systems and a great reach out to the AWS community. > AWS supports two different ways to consume Kinesis data: with the low-level > AWS SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK > can be used to consume Kinesis data, including stream read beginning from a > specific offset (or "record sequence number" in Kinesis terminology). On the > other hand, AWS officially recommends using KCL, which offers a higher-level > of abstraction that also comes with checkpointing and failure recovery by > using a KCL-managed AWS DynamoDB "leash table" as the checkpoint state > storage. > However, KCL is essentially a stream processing library that wraps all the > partition-to-task (or "shard" in Kinesis terminology) determination and > checkpointing to allow the user to focus only on streaming application logic. > This leads to the understanding that we can not use the KCL to implement the > Kinesis streaming connector if we are aiming for a deep integration of Flink > with Kinesis that provides exactly once guarantees (KCL promises only > at-least-once). Therefore, AWS SDK will be the way to go for the > implementation of this feature. > With the ability to read from specific offsets, and also the fact that > Kinesis and Kafka share a lot of similarities, the basic principles of the > implementation of Flink's Kinesis streaming connector will very much resemble > the Kafka connector. We can basically follow the outlines described in > [~StephanEwen]'s description [3] and [~rmetzger]'s Kafka connector > implementation [4]. A few tweaks due to some of Kinesis v.s. Kafka > differences is described as following: > 1. While the Kafka connector can support reading from multiple topics, I > currently don't think this is a good idea for Kinesis streams (a Kinesis > Stream is logically equivalent to a Kafka topic). Kinesis streams can exist > in different AWS regions, and each Kinesis stream under the same AWS user > account may have completely independent access settings with different > authorization keys. Overall, a Kinesis stream feels like a much more > consolidated resource compared to Kafka topics. It would be great to hear > more thoughts on this part. > 2. While Kafka has brokers that can hold multiple partitions, the only > partitioning abstraction for AWS Kinesis is "shards". Therefore, in contrast > to the Kafka connector having per broker connections where the connections > can handle multiple Kafka partitions, the Kinesis connector will only need to > have simple per shard connections. > 3. Kinesis itself does not support committing offsets back to Kinesis. If we > were to implement this feature like the Kafka connector with Kafka / ZK to > sync outside view of progress, we probably could use ZK or DynamoDB like the > way KCL works. More thoughts on this part will be very helpful too. > As for the Kinesis Sink, it should be possible to use the AWS KPL (Kinesis > Producer Library) [5]. However, for higher code consistency with the proposed > Kinesis Consumer, I think it will be better to stick with the AWS SDK for the > implementation. The implementation should be straight forward, being almost > if not completely the same as the Kafka sink. > References: > [1] > http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-sdk.html > [2] > http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html > [3] > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-Connector-td2872.html > [4] http://data-artisans.com/kafka
[jira] [Commented] (FLINK-3544) ResourceManager runtime components
[ https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15206703#comment-15206703 ] ASF GitHub Bot commented on FLINK-3544: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1741#discussion_r57018050 --- Diff: flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala --- @@ -115,7 +115,7 @@ class ApplicationClient( val jobManager = context.actorSelection(jobManagerAkkaURL) jobManager ! decorateMessage( -RegisterApplicationClient +RegisterInfoMessageListener.get() --- End diff -- `getInstance`? > ResourceManager runtime components > -- > > Key: FLINK-3544 > URL: https://issues.apache.org/jira/browse/FLINK-3544 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Affects Versions: 1.1.0 >Reporter: Maximilian Michels >Assignee: Maximilian Michels > Fix For: 1.1.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3211) Add AWS Kinesis streaming connector
[ https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15206699#comment-15206699 ] Roy Ben-Alta commented on FLINK-3211: - Hi Robert, I am the Business Development Manager for Amazon Kinesis and it will be great to have Apache Flink connector to Amazon Kinesis.Please send me email to benal...@amazon.com and we can chat over emails. Will be happy to provide you useful collateral in regards to Amazon Kinesis streams. > Add AWS Kinesis streaming connector > --- > > Key: FLINK-3211 > URL: https://issues.apache.org/jira/browse/FLINK-3211 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Affects Versions: 1.0.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > Original Estimate: 336h > Remaining Estimate: 336h > > AWS Kinesis is a widely adopted message queue used by AWS users, much like a > cloud service version of Apache Kafka. Support for AWS Kinesis will be a > great addition to the handful of Flink's streaming connectors to external > systems and a great reach out to the AWS community. > AWS supports two different ways to consume Kinesis data: with the low-level > AWS SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK > can be used to consume Kinesis data, including stream read beginning from a > specific offset (or "record sequence number" in Kinesis terminology). On the > other hand, AWS officially recommends using KCL, which offers a higher-level > of abstraction that also comes with checkpointing and failure recovery by > using a KCL-managed AWS DynamoDB "leash table" as the checkpoint state > storage. > However, KCL is essentially a stream processing library that wraps all the > partition-to-task (or "shard" in Kinesis terminology) determination and > checkpointing to allow the user to focus only on streaming application logic. > This leads to the understanding that we can not use the KCL to implement the > Kinesis streaming connector if we are aiming for a deep integration of Flink > with Kinesis that provides exactly once guarantees (KCL promises only > at-least-once). Therefore, AWS SDK will be the way to go for the > implementation of this feature. > With the ability to read from specific offsets, and also the fact that > Kinesis and Kafka share a lot of similarities, the basic principles of the > implementation of Flink's Kinesis streaming connector will very much resemble > the Kafka connector. We can basically follow the outlines described in > [~StephanEwen]'s description [3] and [~rmetzger]'s Kafka connector > implementation [4]. A few tweaks due to some of Kinesis v.s. Kafka > differences is described as following: > 1. While the Kafka connector can support reading from multiple topics, I > currently don't think this is a good idea for Kinesis streams (a Kinesis > Stream is logically equivalent to a Kafka topic). Kinesis streams can exist > in different AWS regions, and each Kinesis stream under the same AWS user > account may have completely independent access settings with different > authorization keys. Overall, a Kinesis stream feels like a much more > consolidated resource compared to Kafka topics. It would be great to hear > more thoughts on this part. > 2. While Kafka has brokers that can hold multiple partitions, the only > partitioning abstraction for AWS Kinesis is "shards". Therefore, in contrast > to the Kafka connector having per broker connections where the connections > can handle multiple Kafka partitions, the Kinesis connector will only need to > have simple per shard connections. > 3. Kinesis itself does not support committing offsets back to Kinesis. If we > were to implement this feature like the Kafka connector with Kafka / ZK to > sync outside view of progress, we probably could use ZK or DynamoDB like the > way KCL works. More thoughts on this part will be very helpful too. > As for the Kinesis Sink, it should be possible to use the AWS KPL (Kinesis > Producer Library) [5]. However, for higher code consistency with the proposed > Kinesis Consumer, I think it will be better to stick with the AWS SDK for the > implementation. The implementation should be straight forward, being almost > if not completely the same as the Kafka sink. > References: > [1] > http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-sdk.html > [2] > http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html > [3] > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-Connector-td2872.html > [4] http://data-artisans.com/kafka-flink-a-practical-how-to/ > [5] > http://docs.aws.amazon.com//kinesis/latest/dev/developing-producers-with-kpl.html#d0e4998 -- This message was sent
[jira] [Commented] (FLINK-3211) Add AWS Kinesis streaming connector
[ https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15206716#comment-15206716 ] Roy Ben-Alta commented on FLINK-3211: - Hi Tzu, There is no Kinesis local exist but there is one I found on Git: https://github.com/mhart/kinesalite However, you should not have issue to test it on AWS by using Kinesis API (You can create stream with x number of shards and delete once your test are completed). Feel free to contact me at benaltar@amazon and I will be happy to provide you with some credits if needed for the sake of the test. Roy. > Add AWS Kinesis streaming connector > --- > > Key: FLINK-3211 > URL: https://issues.apache.org/jira/browse/FLINK-3211 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Affects Versions: 1.0.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > Original Estimate: 336h > Remaining Estimate: 336h > > AWS Kinesis is a widely adopted message queue used by AWS users, much like a > cloud service version of Apache Kafka. Support for AWS Kinesis will be a > great addition to the handful of Flink's streaming connectors to external > systems and a great reach out to the AWS community. > AWS supports two different ways to consume Kinesis data: with the low-level > AWS SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK > can be used to consume Kinesis data, including stream read beginning from a > specific offset (or "record sequence number" in Kinesis terminology). On the > other hand, AWS officially recommends using KCL, which offers a higher-level > of abstraction that also comes with checkpointing and failure recovery by > using a KCL-managed AWS DynamoDB "leash table" as the checkpoint state > storage. > However, KCL is essentially a stream processing library that wraps all the > partition-to-task (or "shard" in Kinesis terminology) determination and > checkpointing to allow the user to focus only on streaming application logic. > This leads to the understanding that we can not use the KCL to implement the > Kinesis streaming connector if we are aiming for a deep integration of Flink > with Kinesis that provides exactly once guarantees (KCL promises only > at-least-once). Therefore, AWS SDK will be the way to go for the > implementation of this feature. > With the ability to read from specific offsets, and also the fact that > Kinesis and Kafka share a lot of similarities, the basic principles of the > implementation of Flink's Kinesis streaming connector will very much resemble > the Kafka connector. We can basically follow the outlines described in > [~StephanEwen]'s description [3] and [~rmetzger]'s Kafka connector > implementation [4]. A few tweaks due to some of Kinesis v.s. Kafka > differences is described as following: > 1. While the Kafka connector can support reading from multiple topics, I > currently don't think this is a good idea for Kinesis streams (a Kinesis > Stream is logically equivalent to a Kafka topic). Kinesis streams can exist > in different AWS regions, and each Kinesis stream under the same AWS user > account may have completely independent access settings with different > authorization keys. Overall, a Kinesis stream feels like a much more > consolidated resource compared to Kafka topics. It would be great to hear > more thoughts on this part. > 2. While Kafka has brokers that can hold multiple partitions, the only > partitioning abstraction for AWS Kinesis is "shards". Therefore, in contrast > to the Kafka connector having per broker connections where the connections > can handle multiple Kafka partitions, the Kinesis connector will only need to > have simple per shard connections. > 3. Kinesis itself does not support committing offsets back to Kinesis. If we > were to implement this feature like the Kafka connector with Kafka / ZK to > sync outside view of progress, we probably could use ZK or DynamoDB like the > way KCL works. More thoughts on this part will be very helpful too. > As for the Kinesis Sink, it should be possible to use the AWS KPL (Kinesis > Producer Library) [5]. However, for higher code consistency with the proposed > Kinesis Consumer, I think it will be better to stick with the AWS SDK for the > implementation. The implementation should be straight forward, being almost > if not completely the same as the Kafka sink. > References: > [1] > http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-sdk.html > [2] > http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html > [3] > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-Connector-td2872.html > [4] http://data-artisans.com/kafka-flink-a-practical-how-to/ > [5] > h
[GitHub] flink pull request: [FLINK-3544] Introduce ResourceManager compone...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1741#discussion_r57017514 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java --- @@ -0,0 +1,647 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import akka.actor.ActorRef; +import akka.actor.Props; + +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.clusterframework.FlinkResourceManager; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; +import org.apache.flink.runtime.clusterframework.messages.StopCluster; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.instance.AkkaActorGateway; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.flink.yarn.messages.ContainersAllocated; +import org.apache.flink.yarn.messages.ContainersComplete; + +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.client.api.AMRMClient; +import org.apache.hadoop.yarn.client.api.NMClient; +import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; +import org.apache.hadoop.yarn.conf.YarnConfiguration; + +import org.slf4j.Logger; + +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static java.util.Objects.requireNonNull; + +/** + * Specialized Flink Resource Manager implementation for YARN clusters. It is started as the + * YARN ApplicationMaster and implements the YARN-specific logic for container requests and failure + * monitoring. + */ +public class YarnFlinkResourceManager extends FlinkResourceManager { + + /** The heartbeat interval while the resource master is waiting for containers */ + private static final int FAST_YARN_HEARTBEAT_INTERVAL_MS = 500; + + /** The default heartbeat interval during regular operation */ + private static final int DEFAULT_YARN_HEARTBEAT_INTERVAL_MS = 5000; + + /** The containers where a TaskManager is starting and we are waiting for it to register */ + private final Map containersInLaunch; + + /** Containers we have released, where we are waiting for an acknowledgement that +* they are released */ + private final Map containersBeingReturned; + + /** The YARN / Hadoop configuration object */ + private final YarnConfiguration yarnConfig; + + /** The TaskManager container parameters (like container memory size) */ + private final ContaineredTaskManagerParameters taskManagerParameters; + + /** Context information used to start a TaskManager Java process */ + private final ContainerLaunchContext taskManagerLaunchContext; + + /** Host name for the container running this process */ + private final String applicationMasterHostName; + + /** Web interface URL, may be null */ + private final String webInterfaceURL; + + /** Default heartbeat interval between this actor and the YARN resource manager */ + priva
[jira] [Commented] (FLINK-3653) recovery.zookeeper.storageDir is not documented on the configuration page
[ https://issues.apache.org/jira/browse/FLINK-3653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15206705#comment-15206705 ] ASF GitHub Bot commented on FLINK-3653: --- GitHub user stefanobaghino opened a pull request: https://github.com/apache/flink/pull/1828 [FLINK-3653] recovery.zookeeper.storageDir is not documented on the configuration page Also: minor _hotfix_ to the `state.backend.fs.checkpointdir` option description (punctuation, casing and formatting, _boyscout rule_) You can merge this pull request into a Git repository by running: $ git pull https://github.com/radicalbit/flink 3653 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1828.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1828 commit 28f279ad70f1ac0316b770fab62a1c1568e0aac5 Author: Stefano Baghino Date: 2016-03-22T16:17:32Z [FLINK-3653] recovery.zookeeper.storageDir is not documented on the configuration page > recovery.zookeeper.storageDir is not documented on the configuration page > - > > Key: FLINK-3653 > URL: https://issues.apache.org/jira/browse/FLINK-3653 > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.0.0 >Reporter: Stefano Baghino >Assignee: Stefano Baghino >Priority: Minor > Fix For: 1.1.0 > > > The {{recovery.zookeeper.storageDir}} option is documented in the HA page but > is missing from the configuration page. Since it's required for HA I think it > would be a good idea to have it on both pages. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3544) ResourceManager runtime components
[ https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15206704#comment-15206704 ] ASF GitHub Bot commented on FLINK-3544: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1741#discussion_r57018075 --- Diff: flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala --- @@ -148,7 +148,7 @@ class ApplicationClient( INITIAL_POLLING_DELAY, WAIT_FOR_YARN_INTERVAL, yarnJobManager.get, - decorateMessage(PollYarnClusterStatus)) + decorateMessage(GetClusterStatus.get())) --- End diff -- `getInstance`? > ResourceManager runtime components > -- > > Key: FLINK-3544 > URL: https://issues.apache.org/jira/browse/FLINK-3544 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Affects Versions: 1.1.0 >Reporter: Maximilian Michels >Assignee: Maximilian Michels > Fix For: 1.1.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3544] Introduce ResourceManager compone...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1741#discussion_r57018050 --- Diff: flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala --- @@ -115,7 +115,7 @@ class ApplicationClient( val jobManager = context.actorSelection(jobManagerAkkaURL) jobManager ! decorateMessage( -RegisterApplicationClient +RegisterInfoMessageListener.get() --- End diff -- `getInstance`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3653] recovery.zookeeper.storageDir is ...
GitHub user stefanobaghino opened a pull request: https://github.com/apache/flink/pull/1828 [FLINK-3653] recovery.zookeeper.storageDir is not documented on the configuration page Also: minor _hotfix_ to the `state.backend.fs.checkpointdir` option description (punctuation, casing and formatting, _boyscout rule_) You can merge this pull request into a Git repository by running: $ git pull https://github.com/radicalbit/flink 3653 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1828.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1828 commit 28f279ad70f1ac0316b770fab62a1c1568e0aac5 Author: Stefano Baghino Date: 2016-03-22T16:17:32Z [FLINK-3653] recovery.zookeeper.storageDir is not documented on the configuration page --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3544] Introduce ResourceManager compone...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1741#discussion_r57018075 --- Diff: flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala --- @@ -148,7 +148,7 @@ class ApplicationClient( INITIAL_POLLING_DELAY, WAIT_FOR_YARN_INTERVAL, yarnJobManager.get, - decorateMessage(PollYarnClusterStatus)) + decorateMessage(GetClusterStatus.get())) --- End diff -- `getInstance`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3211) Add AWS Kinesis streaming connector
[ https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15206677#comment-15206677 ] Robert Metzger commented on FLINK-3211: --- Hi Roy, I'll try to contact Tzu-Li via email to get an estimate when this is done. How urgently do you need the connector? Maybe I can allocate some time soon to get a first version into Flink. > Add AWS Kinesis streaming connector > --- > > Key: FLINK-3211 > URL: https://issues.apache.org/jira/browse/FLINK-3211 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Affects Versions: 1.0.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > Original Estimate: 336h > Remaining Estimate: 336h > > AWS Kinesis is a widely adopted message queue used by AWS users, much like a > cloud service version of Apache Kafka. Support for AWS Kinesis will be a > great addition to the handful of Flink's streaming connectors to external > systems and a great reach out to the AWS community. > AWS supports two different ways to consume Kinesis data: with the low-level > AWS SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK > can be used to consume Kinesis data, including stream read beginning from a > specific offset (or "record sequence number" in Kinesis terminology). On the > other hand, AWS officially recommends using KCL, which offers a higher-level > of abstraction that also comes with checkpointing and failure recovery by > using a KCL-managed AWS DynamoDB "leash table" as the checkpoint state > storage. > However, KCL is essentially a stream processing library that wraps all the > partition-to-task (or "shard" in Kinesis terminology) determination and > checkpointing to allow the user to focus only on streaming application logic. > This leads to the understanding that we can not use the KCL to implement the > Kinesis streaming connector if we are aiming for a deep integration of Flink > with Kinesis that provides exactly once guarantees (KCL promises only > at-least-once). Therefore, AWS SDK will be the way to go for the > implementation of this feature. > With the ability to read from specific offsets, and also the fact that > Kinesis and Kafka share a lot of similarities, the basic principles of the > implementation of Flink's Kinesis streaming connector will very much resemble > the Kafka connector. We can basically follow the outlines described in > [~StephanEwen]'s description [3] and [~rmetzger]'s Kafka connector > implementation [4]. A few tweaks due to some of Kinesis v.s. Kafka > differences is described as following: > 1. While the Kafka connector can support reading from multiple topics, I > currently don't think this is a good idea for Kinesis streams (a Kinesis > Stream is logically equivalent to a Kafka topic). Kinesis streams can exist > in different AWS regions, and each Kinesis stream under the same AWS user > account may have completely independent access settings with different > authorization keys. Overall, a Kinesis stream feels like a much more > consolidated resource compared to Kafka topics. It would be great to hear > more thoughts on this part. > 2. While Kafka has brokers that can hold multiple partitions, the only > partitioning abstraction for AWS Kinesis is "shards". Therefore, in contrast > to the Kafka connector having per broker connections where the connections > can handle multiple Kafka partitions, the Kinesis connector will only need to > have simple per shard connections. > 3. Kinesis itself does not support committing offsets back to Kinesis. If we > were to implement this feature like the Kafka connector with Kafka / ZK to > sync outside view of progress, we probably could use ZK or DynamoDB like the > way KCL works. More thoughts on this part will be very helpful too. > As for the Kinesis Sink, it should be possible to use the AWS KPL (Kinesis > Producer Library) [5]. However, for higher code consistency with the proposed > Kinesis Consumer, I think it will be better to stick with the AWS SDK for the > implementation. The implementation should be straight forward, being almost > if not completely the same as the Kafka sink. > References: > [1] > http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-sdk.html > [2] > http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html > [3] > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-Connector-td2872.html > [4] http://data-artisans.com/kafka-flink-a-practical-how-to/ > [5] > http://docs.aws.amazon.com//kinesis/latest/dev/developing-producers-with-kpl.html#d0e4998 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3653) recovery.zookeeper.storageDir is not documented on the configuration page
Stefano Baghino created FLINK-3653: -- Summary: recovery.zookeeper.storageDir is not documented on the configuration page Key: FLINK-3653 URL: https://issues.apache.org/jira/browse/FLINK-3653 Project: Flink Issue Type: Bug Components: Documentation Affects Versions: 1.0.0 Reporter: Stefano Baghino Assignee: Stefano Baghino Priority: Minor Fix For: 1.1.0 The {{recovery.zookeeper.storageDir}} option is documented in the HA page but is missing from the configuration page. Since it's required for HA I think it would be a good idea to have it on both pages. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3544) ResourceManager runtime components
[ https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15206658#comment-15206658 ] ASF GitHub Bot commented on FLINK-3544: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1741#discussion_r57015042 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java --- @@ -0,0 +1,601 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.Props; + +import org.apache.flink.client.CliFrontend; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.clusterframework.BootstrapTools; +import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; +import org.apache.flink.runtime.jobmanager.JobManager; +import org.apache.flink.runtime.jobmanager.MemoryArchivist; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.flink.runtime.process.ProcessReaper; +import org.apache.flink.runtime.taskmanager.TaskManager; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.runtime.util.LeaderRetrievalUtils; +import org.apache.flink.runtime.util.SignalHandler; +import org.apache.flink.runtime.webmonitor.WebMonitor; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.yarn.api.ApplicationConstants; +import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.util.Records; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import scala.concurrent.duration.FiniteDuration; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.security.PrivilegedAction; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +/** + * This class is the executable entry point for the YARN application master. + * It starts actor system and the actors for {@link org.apache.flink.runtime.jobmanager.JobManager} + * and {@link YarnFlinkResourceManager}. + * + * The JobManager handles Flink job execution, while the YarnFlinkResourceManager handles container + * allocation and failure detection. + */ +public class YarnApplicationMasterRunner { + + /** Logger */ + protected static final Logger LOG = LoggerFactory.getLogger(YarnApplicationMasterRunner.class); + + /** The maximum time that TaskManagers may be waiting to register at the JobManager, +* before they quit */ + private static final FiniteDuration TASKMANAGER_REGISTRATION_TIMEOUT = new FiniteDuration(5, TimeUnit.MINUTES); + + /** The process environment variables */ + private static final Map ENV = System.getenv(); + + /** The exit code returned if the initialization of the application master failed */ + private static final int INIT_ERROR_EXIT_CODE = 31; + + /** The exit code returned if the process exits because a critical actor died */ + private static final int ACTOR_DIED_EXIT_CODE = 32; + + + // -
[GitHub] flink pull request: [FLINK-3544] Introduce ResourceManager compone...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1741#discussion_r57015042 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java --- @@ -0,0 +1,601 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.Props; + +import org.apache.flink.client.CliFrontend; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.clusterframework.BootstrapTools; +import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; +import org.apache.flink.runtime.jobmanager.JobManager; +import org.apache.flink.runtime.jobmanager.MemoryArchivist; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.flink.runtime.process.ProcessReaper; +import org.apache.flink.runtime.taskmanager.TaskManager; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.runtime.util.LeaderRetrievalUtils; +import org.apache.flink.runtime.util.SignalHandler; +import org.apache.flink.runtime.webmonitor.WebMonitor; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.yarn.api.ApplicationConstants; +import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.util.Records; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import scala.concurrent.duration.FiniteDuration; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.security.PrivilegedAction; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +/** + * This class is the executable entry point for the YARN application master. + * It starts actor system and the actors for {@link org.apache.flink.runtime.jobmanager.JobManager} + * and {@link YarnFlinkResourceManager}. + * + * The JobManager handles Flink job execution, while the YarnFlinkResourceManager handles container + * allocation and failure detection. + */ +public class YarnApplicationMasterRunner { + + /** Logger */ + protected static final Logger LOG = LoggerFactory.getLogger(YarnApplicationMasterRunner.class); + + /** The maximum time that TaskManagers may be waiting to register at the JobManager, +* before they quit */ + private static final FiniteDuration TASKMANAGER_REGISTRATION_TIMEOUT = new FiniteDuration(5, TimeUnit.MINUTES); + + /** The process environment variables */ + private static final Map ENV = System.getenv(); + + /** The exit code returned if the initialization of the application master failed */ + private static final int INIT_ERROR_EXIT_CODE = 31; + + /** The exit code returned if the process exits because a critical actor died */ + private static final int ACTOR_DIED_EXIT_CODE = 32; + + + // + // Program entry point + // + + /** +* The entry point for the YARN application master. +* +* @param arg
[jira] [Commented] (FLINK-3547) Add support for streaming projection, selection, and union
[ https://issues.apache.org/jira/browse/FLINK-3547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15206646#comment-15206646 ] ASF GitHub Bot commented on FLINK-3547: --- Github user vasia commented on the pull request: https://github.com/apache/flink/pull/1820#issuecomment-199881323 Thanks for the review! I'll see how we can share the common parts between DataSet and DataStream translation. I will also open a JIRA for reworking the tests. > Add support for streaming projection, selection, and union > -- > > Key: FLINK-3547 > URL: https://issues.apache.org/jira/browse/FLINK-3547 > Project: Flink > Issue Type: Sub-task > Components: Table API >Reporter: Vasia Kalavri >Assignee: Vasia Kalavri > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3547] add support for streaming filter,...
Github user vasia commented on the pull request: https://github.com/apache/flink/pull/1820#issuecomment-199881323 Thanks for the review! I'll see how we can share the common parts between DataSet and DataStream translation. I will also open a JIRA for reworking the tests. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-3652) Enable UnusedImports check for Scala checkstyle
Maximilian Michels created FLINK-3652: - Summary: Enable UnusedImports check for Scala checkstyle Key: FLINK-3652 URL: https://issues.apache.org/jira/browse/FLINK-3652 Project: Flink Issue Type: Improvement Components: Build System Reporter: Maximilian Michels Assignee: Maximilian Michels Priority: Minor For some reason, we don't have the UnusedImports check enabled in Scala checkstyle. This is not consistent with Java where we strictly check for unused imports. I propose to enable it and fix eventual unused imports. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2445] Add tests for HadoopOutputFormats
Github user mliesenberg commented on the pull request: https://github.com/apache/flink/pull/1625#issuecomment-199878190 it's only the build job with with the `hadoop.profile=1` option. for the others the tests are passing. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2445) Add tests for HadoopOutputFormats
[ https://issues.apache.org/jira/browse/FLINK-2445?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15206628#comment-15206628 ] ASF GitHub Bot commented on FLINK-2445: --- Github user mliesenberg commented on the pull request: https://github.com/apache/flink/pull/1625#issuecomment-199878190 it's only the build job with with the `hadoop.profile=1` option. for the others the tests are passing. > Add tests for HadoopOutputFormats > - > > Key: FLINK-2445 > URL: https://issues.apache.org/jira/browse/FLINK-2445 > Project: Flink > Issue Type: Test > Components: Hadoop Compatibility, Tests >Affects Versions: 0.9.1, 0.10.0 >Reporter: Fabian Hueske >Assignee: Martin Liesenberg > Labels: starter > > The HadoopOutputFormats and HadoopOutputFormatBase classes are not > sufficiently covered by unit tests. > We need tests that ensure that the methods of the wrapped Hadoop > OutputFormats are correctly called. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3544) ResourceManager runtime components
[ https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15206625#comment-15206625 ] ASF GitHub Bot commented on FLINK-3544: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1741#discussion_r57011989 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java --- @@ -0,0 +1,601 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.Props; + +import org.apache.flink.client.CliFrontend; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.clusterframework.BootstrapTools; +import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; +import org.apache.flink.runtime.jobmanager.JobManager; +import org.apache.flink.runtime.jobmanager.MemoryArchivist; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.flink.runtime.process.ProcessReaper; +import org.apache.flink.runtime.taskmanager.TaskManager; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.runtime.util.LeaderRetrievalUtils; +import org.apache.flink.runtime.util.SignalHandler; +import org.apache.flink.runtime.webmonitor.WebMonitor; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.yarn.api.ApplicationConstants; +import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.util.Records; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import scala.concurrent.duration.FiniteDuration; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.security.PrivilegedAction; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +/** + * This class is the executable entry point for the YARN application master. + * It starts actor system and the actors for {@link org.apache.flink.runtime.jobmanager.JobManager} + * and {@link YarnFlinkResourceManager}. + * + * The JobManager handles Flink job execution, while the YarnFlinkResourceManager handles container + * allocation and failure detection. + */ +public class YarnApplicationMasterRunner { + + /** Logger */ + protected static final Logger LOG = LoggerFactory.getLogger(YarnApplicationMasterRunner.class); + + /** The maximum time that TaskManagers may be waiting to register at the JobManager, +* before they quit */ + private static final FiniteDuration TASKMANAGER_REGISTRATION_TIMEOUT = new FiniteDuration(5, TimeUnit.MINUTES); + + /** The process environment variables */ + private static final Map ENV = System.getenv(); + + /** The exit code returned if the initialization of the application master failed */ + private static final int INIT_ERROR_EXIT_CODE = 31; + + /** The exit code returned if the process exits because a critical actor died */ + private static final int ACTOR_DIED_EXIT_CODE = 32; + + + // --
[jira] [Commented] (FLINK-3547) Add support for streaming projection, selection, and union
[ https://issues.apache.org/jira/browse/FLINK-3547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15206618#comment-15206618 ] ASF GitHub Bot commented on FLINK-3547: --- Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1820#issuecomment-199877286 I think we should try to share more code between the DataSet and DataStream translation. Otherwise it looks good. We should also rework the tests and extract tests that check the Table API parts that are common for DataSet and DataStream. I wouldn't do that in this PR though. > Add support for streaming projection, selection, and union > -- > > Key: FLINK-3547 > URL: https://issues.apache.org/jira/browse/FLINK-3547 > Project: Flink > Issue Type: Sub-task > Components: Table API >Reporter: Vasia Kalavri >Assignee: Vasia Kalavri > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3544) ResourceManager runtime components
[ https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15206622#comment-15206622 ] ASF GitHub Bot commented on FLINK-3544: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1741#discussion_r57011773 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala --- @@ -39,24 +39,32 @@ import org.apache.flink.runtime.blob.BlobServer import org.apache.flink.runtime.checkpoint._ import org.apache.flink.runtime.client._ import org.apache.flink.runtime.execution.SuppressRestartsException +import org.apache.flink.runtime.clusterframework.FlinkResourceManager +import org.apache.flink.runtime.clusterframework.messages._ +import org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager +import org.apache.flink.runtime.clusterframework.types.ResourceID import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager import org.apache.flink.runtime.executiongraph.restart.{RestartStrategy, RestartStrategyFactory} import org.apache.flink.runtime.executiongraph.{ExecutionGraph, ExecutionJobVertex} -import org.apache.flink.runtime.instance.{AkkaActorGateway, InstanceManager} +import org.apache.flink.runtime.instance.{HardwareDescription, InstanceConnectionInfo, --- End diff -- `HardwareDescription` and `InstnaceConnectionInfo` unused > ResourceManager runtime components > -- > > Key: FLINK-3544 > URL: https://issues.apache.org/jira/browse/FLINK-3544 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager >Affects Versions: 1.1.0 >Reporter: Maximilian Michels >Assignee: Maximilian Michels > Fix For: 1.1.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)