[jira] [Commented] (FLINK-3657) Change access of DataSetUtils.countElements() to 'public'

2016-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15207953#comment-15207953
 ] 

ASF GitHub Bot commented on FLINK-3657:
---

GitHub user smarthi opened a pull request:

https://github.com/apache/flink/pull/1829

FLINK-3657: Change access of DataSetUtils.countElements() to 'public'



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/smarthi/flink Flink-3657

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1829.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1829


commit d5ba6d90d3cdd6b93f962f2d9226e443810b08b0
Author: smarthi 
Date:   2016-03-23T06:28:28Z

FLINK-3657: Change access of DataSetUtils.countElements() to 'public'




> Change access of DataSetUtils.countElements() to 'public' 
> --
>
> Key: FLINK-3657
> URL: https://issues.apache.org/jira/browse/FLINK-3657
> Project: Flink
>  Issue Type: Improvement
>  Components: DataSet API
>Affects Versions: 1.0.0
>Reporter: Suneel Marthi
>Assignee: Suneel Marthi
>Priority: Minor
> Fix For: 1.0.1
>
>
> The access of DatasetUtils.countElements() is presently 'private', change 
> that to be 'public'. We happened to be replicating the functionality in our 
> project and realized the method already existed in Flink.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: FLINK-3657: Change access of DataSetUtils.coun...

2016-03-22 Thread smarthi
GitHub user smarthi opened a pull request:

https://github.com/apache/flink/pull/1829

FLINK-3657: Change access of DataSetUtils.countElements() to 'public'



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/smarthi/flink Flink-3657

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1829.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1829


commit d5ba6d90d3cdd6b93f962f2d9226e443810b08b0
Author: smarthi 
Date:   2016-03-23T06:28:28Z

FLINK-3657: Change access of DataSetUtils.countElements() to 'public'




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-3657) Change access of DataSetUtils.countElements() to 'public'

2016-03-22 Thread Suneel Marthi (JIRA)
Suneel Marthi created FLINK-3657:


 Summary: Change access of DataSetUtils.countElements() to 'public' 
 Key: FLINK-3657
 URL: https://issues.apache.org/jira/browse/FLINK-3657
 Project: Flink
  Issue Type: Improvement
  Components: DataSet API
Affects Versions: 1.0.0
Reporter: Suneel Marthi
Assignee: Suneel Marthi
Priority: Minor
 Fix For: 1.0.1


The access of DatasetUtils.countElements() is presently 'private', change that 
to be 'public'. We happened to be replicating the functionality in our project 
and realized the method already existed in Flink.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2997) Support range partition with user customized data distribution.

2016-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2997?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15207407#comment-15207407
 ] 

ASF GitHub Bot commented on FLINK-2997:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1776#discussion_r57078308
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java
 ---
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.javaApiOperators;
+
+import org.apache.flink.api.common.distributions.DataDistribution;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RichMapPartitionFunction;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.utils.DataSetUtils;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
+import org.apache.flink.util.Collector;
+import org.junit.Test;
+
+
+import java.io.IOException;
+
+import static org.junit.Assert.fail;
+
+
+public class CustomDistributionITCase {
+   
+   @Test
+   public void testPartitionWithDistribution1() throws Exception{
+   /*
+* Test the record partitioned rightly with one field according 
to the customized data distribution
+*/
+
+   ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment();
+
+   DataSet> input1 = 
CollectionDataSets.get3TupleDataSet(env);
+   final TestDataDist dist = new TestDataDist(1);
+
+   env.setParallelism(dist.getParallelism());
+
+   DataSet result = DataSetUtils.partitionByRange(input1, 
dist, 0).mapPartition(new RichMapPartitionFunction, Boolean>() {
+   @Override
+   public void mapPartition(Iterable> values, Collector out) throws Exception {
+   int partitionIndex = 
getRuntimeContext().getIndexOfThisSubtask();
+
+   for (Tuple3 s : values) {
+   if ((s.f0 - 1) / 7 != partitionIndex) {
+   fail("Record was not correctly 
partitioned: " + s.toString());
+   }
+   }
+   }
+   });
+
+   result.output(new DiscardingOutputFormat()); 
+   env.execute();
+   }
+
+   @Test
+   public void testRangeWithDistribution2() throws Exception{
+   /*
+* Test the record partitioned rightly with two fields 
according to the customized data distribution
+*/
+
+   ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment();
+
+   DataSet> input1 = 
CollectionDataSets.get3TupleDataSet(env);
+   final TestDataDist dist = new TestDataDist(2);
+
+   env.setParallelism(dist.getParallelism());
+
+   DataSet result = 
DataSetUtils.partitionByRange(input1.map(new MapFunction, Tuple3>() {
+   @Override
+   public Tuple3 
map(Tuple3 value) throws Exception {
+   return new Tuple3<>(value.f0, 
value.f1.intValue(), value.f2);
+   }
+   }), dist, 0, 1).mapPartition(new 
RichMapPartitionFunction, Boolean>() {
+   @Override
+   public void mapPartit

[GitHub] flink pull request: [FLINK-2997] Support range partition with user...

2016-03-22 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1776#discussion_r57078308
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java
 ---
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.javaApiOperators;
+
+import org.apache.flink.api.common.distributions.DataDistribution;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RichMapPartitionFunction;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.utils.DataSetUtils;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
+import org.apache.flink.util.Collector;
+import org.junit.Test;
+
+
+import java.io.IOException;
+
+import static org.junit.Assert.fail;
+
+
+public class CustomDistributionITCase {
+   
+   @Test
+   public void testPartitionWithDistribution1() throws Exception{
+   /*
+* Test the record partitioned rightly with one field according 
to the customized data distribution
+*/
+
+   ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment();
+
+   DataSet> input1 = 
CollectionDataSets.get3TupleDataSet(env);
+   final TestDataDist dist = new TestDataDist(1);
+
+   env.setParallelism(dist.getParallelism());
+
+   DataSet result = DataSetUtils.partitionByRange(input1, 
dist, 0).mapPartition(new RichMapPartitionFunction, Boolean>() {
+   @Override
+   public void mapPartition(Iterable> values, Collector out) throws Exception {
+   int partitionIndex = 
getRuntimeContext().getIndexOfThisSubtask();
+
+   for (Tuple3 s : values) {
+   if ((s.f0 - 1) / 7 != partitionIndex) {
+   fail("Record was not correctly 
partitioned: " + s.toString());
+   }
+   }
+   }
+   });
+
+   result.output(new DiscardingOutputFormat()); 
+   env.execute();
+   }
+
+   @Test
+   public void testRangeWithDistribution2() throws Exception{
+   /*
+* Test the record partitioned rightly with two fields 
according to the customized data distribution
+*/
+
+   ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment();
+
+   DataSet> input1 = 
CollectionDataSets.get3TupleDataSet(env);
+   final TestDataDist dist = new TestDataDist(2);
+
+   env.setParallelism(dist.getParallelism());
+
+   DataSet result = 
DataSetUtils.partitionByRange(input1.map(new MapFunction, Tuple3>() {
+   @Override
+   public Tuple3 
map(Tuple3 value) throws Exception {
+   return new Tuple3<>(value.f0, 
value.f1.intValue(), value.f2);
+   }
+   }), dist, 0, 1).mapPartition(new 
RichMapPartitionFunction, Boolean>() {
+   @Override
+   public void mapPartition(Iterable> values, Collector out) throws Exception {
+   int partitionIndex = 
getRuntimeContext().getIndexOfThisSubtask();
+
+   for (Tuple3 s : 
values) {
+   

[jira] [Commented] (FLINK-3639) Add methods and utilities to register DataSets and Tables in the TableEnvironment

2016-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3639?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15207374#comment-15207374
 ] 

ASF GitHub Bot commented on FLINK-3639:
---

Github user vasia commented on a diff in the pull request:

https://github.com/apache/flink/pull/1827#discussion_r57075247
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/TableEnvironment.scala
 ---
@@ -87,5 +90,72 @@ class TableEnvironment {
 new JavaBatchTranslator(config).translate[T](table.relNode)(typeInfo)
   }
 
-}
+  /**
+   * Registers a DataSet under a unique name, so that it can be used in 
SQL queries.
+   * The fields of the DataSet type are used to name the Table fields.
+   * @param name the Table name
+   * @param dataset the DataSet to register
+   */
+  def registerDataSet[T](name: String, dataset: DataSet[T]): Unit = {
+
+val (fieldNames, fieldIndexes) = 
TranslationContext.getFieldInfo[T](dataset.getType)
+val dataSetTable = new DataSetTable[T](
+  dataset,
+  fieldIndexes,
+  fieldNames
+)
+TranslationContext.addAndRegisterDataSet(dataSetTable, name)
+  }
+
+  /**
+   * Registers a DataSet under a unique name, so that it can be used in 
SQL queries.
+   * The fields of the DataSet type are renamed to the given set of fields.
+   *
+   * @param name the Table name
+   * @param dataset the DataSet to register
+   * @param fields the Table field names
+   */
+  def registerDataSet[T](name: String, dataset: DataSet[T], fields: 
String): Unit = {
+
+val exprs = ExpressionParser
+  .parseExpressionList(fields)
+  .toArray
+
+val (fieldNames, fieldIndexes) = 
TranslationContext.getFieldInfo[T](dataset.getType, exprs)
 
+val dataSetTable = new DataSetTable[T](
+  dataset,
+  fieldIndexes.toArray,
+  fieldNames.toArray
+)
+TranslationContext.addAndRegisterDataSet(dataSetTable, name)
+  }
+
+  /**
+   * Registers a Table under a unique name, so that it can be used in SQL 
queries.
+   * @param name the Table name
+   * @param table the Table to register
+   */
+  def registerTable[T](name: String, table: Table): Unit = {
+val tableTable = new TableTable(table.getRelNode())
+TranslationContext.registerTable(tableTable, name)
+  }
+
+  /**
+   * Retrieve a registered Table.
+   * @param tableName the name under which the Table has been registered
+   * @return the Table object
+   */
+  @throws[TableException]
+  def scan(tableName: String): Table = {
+if (TranslationContext.isRegistered(tableName)) {
+  val relBuilder = TranslationContext.getRelBuilder
+  relBuilder.scan(tableName)
+  new Table(relBuilder.build(), relBuilder)
+}
+else {
+  throw new TableException("Table \"" + tableName + "\" was not found 
in the registry.")
--- End diff --

That's much nicer than my Java-ish way :S


> Add methods and utilities to register DataSets and Tables in the 
> TableEnvironment
> -
>
> Key: FLINK-3639
> URL: https://issues.apache.org/jira/browse/FLINK-3639
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API
>Affects Versions: 1.1.0
>Reporter: Vasia Kalavri
>Assignee: Vasia Kalavri
>
> In order to make tables queryable from SQL we need to register them under a 
> unique name in the TableEnvironment.
> [This design 
> document|https://docs.google.com/document/d/1sITIShmJMGegzAjGqFuwiN_iw1urwykKsLiacokxSw0/edit]
>  describes the proposed API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3639] add methods for registering datas...

2016-03-22 Thread vasia
Github user vasia commented on a diff in the pull request:

https://github.com/apache/flink/pull/1827#discussion_r57075247
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/TableEnvironment.scala
 ---
@@ -87,5 +90,72 @@ class TableEnvironment {
 new JavaBatchTranslator(config).translate[T](table.relNode)(typeInfo)
   }
 
-}
+  /**
+   * Registers a DataSet under a unique name, so that it can be used in 
SQL queries.
+   * The fields of the DataSet type are used to name the Table fields.
+   * @param name the Table name
+   * @param dataset the DataSet to register
+   */
+  def registerDataSet[T](name: String, dataset: DataSet[T]): Unit = {
+
+val (fieldNames, fieldIndexes) = 
TranslationContext.getFieldInfo[T](dataset.getType)
+val dataSetTable = new DataSetTable[T](
+  dataset,
+  fieldIndexes,
+  fieldNames
+)
+TranslationContext.addAndRegisterDataSet(dataSetTable, name)
+  }
+
+  /**
+   * Registers a DataSet under a unique name, so that it can be used in 
SQL queries.
+   * The fields of the DataSet type are renamed to the given set of fields.
+   *
+   * @param name the Table name
+   * @param dataset the DataSet to register
+   * @param fields the Table field names
+   */
+  def registerDataSet[T](name: String, dataset: DataSet[T], fields: 
String): Unit = {
+
+val exprs = ExpressionParser
+  .parseExpressionList(fields)
+  .toArray
+
+val (fieldNames, fieldIndexes) = 
TranslationContext.getFieldInfo[T](dataset.getType, exprs)
 
+val dataSetTable = new DataSetTable[T](
+  dataset,
+  fieldIndexes.toArray,
+  fieldNames.toArray
+)
+TranslationContext.addAndRegisterDataSet(dataSetTable, name)
+  }
+
+  /**
+   * Registers a Table under a unique name, so that it can be used in SQL 
queries.
+   * @param name the Table name
+   * @param table the Table to register
+   */
+  def registerTable[T](name: String, table: Table): Unit = {
+val tableTable = new TableTable(table.getRelNode())
+TranslationContext.registerTable(tableTable, name)
+  }
+
+  /**
+   * Retrieve a registered Table.
+   * @param tableName the name under which the Table has been registered
+   * @return the Table object
+   */
+  @throws[TableException]
+  def scan(tableName: String): Table = {
+if (TranslationContext.isRegistered(tableName)) {
+  val relBuilder = TranslationContext.getRelBuilder
+  relBuilder.scan(tableName)
+  new Table(relBuilder.build(), relBuilder)
+}
+else {
+  throw new TableException("Table \"" + tableName + "\" was not found 
in the registry.")
--- End diff --

That's much nicer than my Java-ish way :S


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3639) Add methods and utilities to register DataSets and Tables in the TableEnvironment

2016-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3639?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15207368#comment-15207368
 ] 

ASF GitHub Bot commented on FLINK-3639:
---

Github user vasia commented on a diff in the pull request:

https://github.com/apache/flink/pull/1827#discussion_r57074776
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/TableEnvironment.scala
 ---
@@ -72,5 +74,68 @@ class TableEnvironment {
  new ScalaBatchTranslator(config).translate[T](table.relNode)
   }
 
-}
+  /**
+   * Registers a DataSet under a unique name, so that it can be used in 
SQL queries.
+   * The fields of the DataSet type are used to name the Table fields.
+   * @param name the Table name
+   * @param dataset the DataSet to register
+   */
+  def registerDataSet[T](name: String, dataset: DataSet[T]): Unit = {
--- End diff --

probably :)


> Add methods and utilities to register DataSets and Tables in the 
> TableEnvironment
> -
>
> Key: FLINK-3639
> URL: https://issues.apache.org/jira/browse/FLINK-3639
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API
>Affects Versions: 1.1.0
>Reporter: Vasia Kalavri
>Assignee: Vasia Kalavri
>
> In order to make tables queryable from SQL we need to register them under a 
> unique name in the TableEnvironment.
> [This design 
> document|https://docs.google.com/document/d/1sITIShmJMGegzAjGqFuwiN_iw1urwykKsLiacokxSw0/edit]
>  describes the proposed API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3639] add methods for registering datas...

2016-03-22 Thread vasia
Github user vasia commented on a diff in the pull request:

https://github.com/apache/flink/pull/1827#discussion_r57074776
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/TableEnvironment.scala
 ---
@@ -72,5 +74,68 @@ class TableEnvironment {
  new ScalaBatchTranslator(config).translate[T](table.relNode)
   }
 
-}
+  /**
+   * Registers a DataSet under a unique name, so that it can be used in 
SQL queries.
+   * The fields of the DataSet type are used to name the Table fields.
+   * @param name the Table name
+   * @param dataset the DataSet to register
+   */
+  def registerDataSet[T](name: String, dataset: DataSet[T]): Unit = {
--- End diff --

probably :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3639) Add methods and utilities to register DataSets and Tables in the TableEnvironment

2016-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3639?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15207361#comment-15207361
 ] 

ASF GitHub Bot commented on FLINK-3639:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1827#discussion_r57073483
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/TableTable.scala
 ---
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.schema
+
+import java.lang.Double
+import java.util
+import java.util.Collections
+
+import org.apache.calcite.plan.RelOptTable
+import org.apache.calcite.plan.RelOptTable.ToRelContext
+import org.apache.calcite.rel.{RelDistribution, RelCollation, RelNode}
+import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
+import org.apache.calcite.schema.Schema.TableType
+import org.apache.calcite.schema.impl.AbstractTable
+import org.apache.calcite.schema.{TranslatableTable, Statistic}
+import org.apache.calcite.util.ImmutableBitSet
+
+class TableTable(relNode: RelNode) extends AbstractTable with 
TranslatableTable {
+
+  override def getStatistic: Statistic = new DefaultTableStatistic
+
+  override def getJdbcTableType: TableType = ???
+
+  override def getRowType(typeFactory: RelDataTypeFactory): RelDataType = 
relNode.getRowType
+
+  override def toRel(context: ToRelContext, relOptTable: RelOptTable): 
RelNode = {
+relNode
+  }
+}
+
+class DefaultTableStatistic extends Statistic {
--- End diff --

Yes, I think you can but don't have to override it.


> Add methods and utilities to register DataSets and Tables in the 
> TableEnvironment
> -
>
> Key: FLINK-3639
> URL: https://issues.apache.org/jira/browse/FLINK-3639
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API
>Affects Versions: 1.1.0
>Reporter: Vasia Kalavri
>Assignee: Vasia Kalavri
>
> In order to make tables queryable from SQL we need to register them under a 
> unique name in the TableEnvironment.
> [This design 
> document|https://docs.google.com/document/d/1sITIShmJMGegzAjGqFuwiN_iw1urwykKsLiacokxSw0/edit]
>  describes the proposed API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3639] add methods for registering datas...

2016-03-22 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1827#discussion_r57073483
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/TableTable.scala
 ---
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.schema
+
+import java.lang.Double
+import java.util
+import java.util.Collections
+
+import org.apache.calcite.plan.RelOptTable
+import org.apache.calcite.plan.RelOptTable.ToRelContext
+import org.apache.calcite.rel.{RelDistribution, RelCollation, RelNode}
+import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
+import org.apache.calcite.schema.Schema.TableType
+import org.apache.calcite.schema.impl.AbstractTable
+import org.apache.calcite.schema.{TranslatableTable, Statistic}
+import org.apache.calcite.util.ImmutableBitSet
+
+class TableTable(relNode: RelNode) extends AbstractTable with 
TranslatableTable {
+
+  override def getStatistic: Statistic = new DefaultTableStatistic
+
+  override def getJdbcTableType: TableType = ???
+
+  override def getRowType(typeFactory: RelDataTypeFactory): RelDataType = 
relNode.getRowType
+
+  override def toRel(context: ToRelContext, relOptTable: RelOptTable): 
RelNode = {
+relNode
+  }
+}
+
+class DefaultTableStatistic extends Statistic {
--- End diff --

Yes, I think you can but don't have to override it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3639) Add methods and utilities to register DataSets and Tables in the TableEnvironment

2016-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3639?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15207336#comment-15207336
 ] 

ASF GitHub Bot commented on FLINK-3639:
---

Github user vasia commented on a diff in the pull request:

https://github.com/apache/flink/pull/1827#discussion_r57070386
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/TableTable.scala
 ---
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.schema
+
+import java.lang.Double
+import java.util
+import java.util.Collections
+
+import org.apache.calcite.plan.RelOptTable
+import org.apache.calcite.plan.RelOptTable.ToRelContext
+import org.apache.calcite.rel.{RelDistribution, RelCollation, RelNode}
+import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
+import org.apache.calcite.schema.Schema.TableType
+import org.apache.calcite.schema.impl.AbstractTable
+import org.apache.calcite.schema.{TranslatableTable, Statistic}
+import org.apache.calcite.util.ImmutableBitSet
+
+class TableTable(relNode: RelNode) extends AbstractTable with 
TranslatableTable {
+
+  override def getStatistic: Statistic = new DefaultTableStatistic
+
+  override def getJdbcTableType: TableType = ???
+
+  override def getRowType(typeFactory: RelDataTypeFactory): RelDataType = 
relNode.getRowType
+
+  override def toRel(context: ToRelContext, relOptTable: RelOptTable): 
RelNode = {
+relNode
+  }
+}
+
+class DefaultTableStatistic extends Statistic {
--- End diff --

Shall I just leave the method unimplemented then?


> Add methods and utilities to register DataSets and Tables in the 
> TableEnvironment
> -
>
> Key: FLINK-3639
> URL: https://issues.apache.org/jira/browse/FLINK-3639
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API
>Affects Versions: 1.1.0
>Reporter: Vasia Kalavri
>Assignee: Vasia Kalavri
>
> In order to make tables queryable from SQL we need to register them under a 
> unique name in the TableEnvironment.
> [This design 
> document|https://docs.google.com/document/d/1sITIShmJMGegzAjGqFuwiN_iw1urwykKsLiacokxSw0/edit]
>  describes the proposed API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3644) WebRuntimMonitor set java.io.tmpdir does not work for change upload dir.

2016-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3644?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15207335#comment-15207335
 ] 

ASF GitHub Bot commented on FLINK-3644:
---

Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1824#issuecomment-200032560
  
Hi @Astralidea, thanks for the PR. We are aware of some flaky tests. So 
there is no need to trigger new builds.


> WebRuntimMonitor set java.io.tmpdir does not work for change upload dir.
> 
>
> Key: FLINK-3644
> URL: https://issues.apache.org/jira/browse/FLINK-3644
> Project: Flink
>  Issue Type: Bug
>  Components: Webfrontend
>Affects Versions: 1.0.0
> Environment: flink-conf.yaml -> java.io.tmpdir: .
> java -server -XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled 
> -XX:+CMSClassUnloadingEnabled -XX:+UseParNewGC -XX:+UseCompressedOops 
> -XX:+UseFastEmptyMethods -XX:+UseFastAccessorMethods -XX:+AlwaysPreTouch 
> -Xmx1707m -Dlog4j.configuration=file:log4j-mesos.properties 
> -Djava.io.tmpdir=. -cp 
> flink-dist_2.10-1.0.0.jar:log4j-1.2.17.jar:slf4j-log4j12-1.7.7.jar:flink-python_2.10-1.0.0.jar
> java version "1.8.0_60"
> Java(TM) SE Runtime Environment (build 1.8.0_60-b27)
> Java HotSpot(TM) 64-Bit Server VM (build 25.60-b23, mixed mode)
> CentOS release 6.4 (Final)
>Reporter: astralidea
>
> flink-conf.yaml & -Djava.io.tmpdir=. does not work for me.
> I don't know why.I look for the code System.getProperty("java.io.tmpdir") 
> should work.but it is not worked.
> but in web ui in job manager configuration could see the java.io.tmpdir is 
> set.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3639] add methods for registering datas...

2016-03-22 Thread vasia
Github user vasia commented on a diff in the pull request:

https://github.com/apache/flink/pull/1827#discussion_r57070386
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/TableTable.scala
 ---
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.schema
+
+import java.lang.Double
+import java.util
+import java.util.Collections
+
+import org.apache.calcite.plan.RelOptTable
+import org.apache.calcite.plan.RelOptTable.ToRelContext
+import org.apache.calcite.rel.{RelDistribution, RelCollation, RelNode}
+import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
+import org.apache.calcite.schema.Schema.TableType
+import org.apache.calcite.schema.impl.AbstractTable
+import org.apache.calcite.schema.{TranslatableTable, Statistic}
+import org.apache.calcite.util.ImmutableBitSet
+
+class TableTable(relNode: RelNode) extends AbstractTable with 
TranslatableTable {
+
+  override def getStatistic: Statistic = new DefaultTableStatistic
+
+  override def getJdbcTableType: TableType = ???
+
+  override def getRowType(typeFactory: RelDataTypeFactory): RelDataType = 
relNode.getRowType
+
+  override def toRel(context: ToRelContext, relOptTable: RelOptTable): 
RelNode = {
+relNode
+  }
+}
+
+class DefaultTableStatistic extends Statistic {
--- End diff --

Shall I just leave the method unimplemented then?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3644] WebRuntimMonitor set java.io.tmpd...

2016-03-22 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1824#issuecomment-200032560
  
Hi @Astralidea, thanks for the PR. We are aware of some flaky tests. So 
there is no need to trigger new builds.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3639) Add methods and utilities to register DataSets and Tables in the TableEnvironment

2016-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3639?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15207328#comment-15207328
 ] 

ASF GitHub Bot commented on FLINK-3639:
---

Github user vasia commented on a diff in the pull request:

https://github.com/apache/flink/pull/1827#discussion_r57069968
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TranslationContext.scala
 ---
@@ -59,29 +64,55 @@ object TranslationContext {
   .traitDefs(ConventionTraitDef.INSTANCE)
   .build
 
-tabNames = Map[AbstractTable, String]()
-
+tablesRegistry = Map[String, AbstractTable]()
 relBuilder = RelBuilder.create(frameworkConfig)
-
 nameCntr.set(0)
 
   }
 
   def addDataSet(newTable: DataSetTable[_]): String = {
+val tabName = "DataSetTable_" + nameCntr.getAndIncrement()
+tables.add(tabName, newTable)
+tabName
+  }
+
+  @throws[TableException]
+  def addAndRegisterDataSet(table: DataSetTable[_], name: String): Unit = {
--- End diff --

sounds right :)


> Add methods and utilities to register DataSets and Tables in the 
> TableEnvironment
> -
>
> Key: FLINK-3639
> URL: https://issues.apache.org/jira/browse/FLINK-3639
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API
>Affects Versions: 1.1.0
>Reporter: Vasia Kalavri
>Assignee: Vasia Kalavri
>
> In order to make tables queryable from SQL we need to register them under a 
> unique name in the TableEnvironment.
> [This design 
> document|https://docs.google.com/document/d/1sITIShmJMGegzAjGqFuwiN_iw1urwykKsLiacokxSw0/edit]
>  describes the proposed API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3639] add methods for registering datas...

2016-03-22 Thread vasia
Github user vasia commented on a diff in the pull request:

https://github.com/apache/flink/pull/1827#discussion_r57069968
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TranslationContext.scala
 ---
@@ -59,29 +64,55 @@ object TranslationContext {
   .traitDefs(ConventionTraitDef.INSTANCE)
   .build
 
-tabNames = Map[AbstractTable, String]()
-
+tablesRegistry = Map[String, AbstractTable]()
 relBuilder = RelBuilder.create(frameworkConfig)
-
 nameCntr.set(0)
 
   }
 
   def addDataSet(newTable: DataSetTable[_]): String = {
+val tabName = "DataSetTable_" + nameCntr.getAndIncrement()
+tables.add(tabName, newTable)
+tabName
+  }
+
+  @throws[TableException]
+  def addAndRegisterDataSet(table: DataSetTable[_], name: String): Unit = {
--- End diff --

sounds right :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3639) Add methods and utilities to register DataSets and Tables in the TableEnvironment

2016-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3639?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15207306#comment-15207306
 ] 

ASF GitHub Bot commented on FLINK-3639:
---

Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1827#issuecomment-200025666
  
Thanks for the PR! Looks good overall. 
I think we can save a few LOCs by refactoring common parts a bit. Also I 
spotted more opportunities to use Scala's String building feature (`s""`) which 
is nicer than String concatenation, IMO.


> Add methods and utilities to register DataSets and Tables in the 
> TableEnvironment
> -
>
> Key: FLINK-3639
> URL: https://issues.apache.org/jira/browse/FLINK-3639
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API
>Affects Versions: 1.1.0
>Reporter: Vasia Kalavri
>Assignee: Vasia Kalavri
>
> In order to make tables queryable from SQL we need to register them under a 
> unique name in the TableEnvironment.
> [This design 
> document|https://docs.google.com/document/d/1sITIShmJMGegzAjGqFuwiN_iw1urwykKsLiacokxSw0/edit]
>  describes the proposed API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3639] add methods for registering datas...

2016-03-22 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1827#issuecomment-200025666
  
Thanks for the PR! Looks good overall. 
I think we can save a few LOCs by refactoring common parts a bit. Also I 
spotted more opportunities to use Scala's String building feature (`s""`) which 
is nicer than String concatenation, IMO.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3639) Add methods and utilities to register DataSets and Tables in the TableEnvironment

2016-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3639?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15207298#comment-15207298
 ] 

ASF GitHub Bot commented on FLINK-3639:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1827#discussion_r57068281
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/RegisterDataSetITCase.scala
 ---
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.table.test
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.scala.util.CollectionDataSets
+import org.apache.flink.api.table.{TableException, Row}
+import org.apache.flink.api.table.plan.TranslationContext
+import org.apache.flink.api.table.test.utils.TableProgramsTestBase
+import 
org.apache.flink.api.table.test.utils.TableProgramsTestBase.TableConfigMode
+import 
org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.TestBaseUtils
+import org.junit._
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+import scala.collection.JavaConverters._
+
+@RunWith(classOf[Parameterized])
+class RegisterDataSetITCase(
--- End diff --

If we move the common methods of the Java and Scala `TableEnvironment`s to 
an `AbstractTableEnvironment`, it is sufficient to only have the Scala tests, 
IMO.


> Add methods and utilities to register DataSets and Tables in the 
> TableEnvironment
> -
>
> Key: FLINK-3639
> URL: https://issues.apache.org/jira/browse/FLINK-3639
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API
>Affects Versions: 1.1.0
>Reporter: Vasia Kalavri
>Assignee: Vasia Kalavri
>
> In order to make tables queryable from SQL we need to register them under a 
> unique name in the TableEnvironment.
> [This design 
> document|https://docs.google.com/document/d/1sITIShmJMGegzAjGqFuwiN_iw1urwykKsLiacokxSw0/edit]
>  describes the proposed API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3639] add methods for registering datas...

2016-03-22 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1827#discussion_r57068281
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/RegisterDataSetITCase.scala
 ---
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.table.test
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.scala.util.CollectionDataSets
+import org.apache.flink.api.table.{TableException, Row}
+import org.apache.flink.api.table.plan.TranslationContext
+import org.apache.flink.api.table.test.utils.TableProgramsTestBase
+import 
org.apache.flink.api.table.test.utils.TableProgramsTestBase.TableConfigMode
+import 
org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.TestBaseUtils
+import org.junit._
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+import scala.collection.JavaConverters._
+
+@RunWith(classOf[Parameterized])
+class RegisterDataSetITCase(
--- End diff --

If we move the common methods of the Java and Scala `TableEnvironment`s to 
an `AbstractTableEnvironment`, it is sufficient to only have the Scala tests, 
IMO.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2909) Gelly Graph Generators

2016-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2909?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15207280#comment-15207280
 ] 

ASF GitHub Bot commented on FLINK-2909:
---

Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/1807#discussion_r57067037
  
--- Diff: 
flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/AbstractGraphTest.java
 ---
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.generator;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.junit.Before;
+
+public class AbstractGraphTest {
+
+   protected ExecutionEnvironment env;
+
+@Before
+public void setup() {
+   env = ExecutionEnvironment.createCollectionsEnvironment();
+   env.getConfig().disableSysoutLogging();
--- End diff --

I think this has no effect on a CollectionEnvironment.


> Gelly Graph Generators
> --
>
> Key: FLINK-2909
> URL: https://issues.apache.org/jira/browse/FLINK-2909
> Project: Flink
>  Issue Type: New Feature
>  Components: Gelly
>Affects Versions: 1.0.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>
> Include a selection of graph generators in Gelly. Generated graphs will be 
> useful for performing scalability, stress, and regression testing as well as 
> benchmarking and comparing algorithms, for both Flink users and developers. 
> Generated data is infinitely scalable yet described by a few simple 
> parameters and can often substitute for user data or sharing large files when 
> reporting issues.
> There are at multiple categories of graphs as documented by 
> [NetworkX|https://networkx.github.io/documentation/latest/reference/generators.html]
>  and elsewhere.
> Graphs may be a well-defined, i.e. the [Chvátal 
> graph|https://en.wikipedia.org/wiki/Chv%C3%A1tal_graph]. These may be 
> sufficiently small to populate locally.
> Graphs may be scalable, i.e. complete and star graphs. These should use 
> Flink's distributed parallelism.
> Graphs may be stochastic, i.e. [RMat 
> graphs|http://snap.stanford.edu/class/cs224w-readings/chakrabarti04rmat.pdf] 
> . A key consideration is that the graphs should source randomness from a 
> seedable PRNG and generate the same Graph regardless of parallelism.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3639) Add methods and utilities to register DataSets and Tables in the TableEnvironment

2016-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3639?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15207277#comment-15207277
 ] 

ASF GitHub Bot commented on FLINK-3639:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1827#discussion_r57066925
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/TableTable.scala
 ---
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.schema
+
+import java.lang.Double
+import java.util
+import java.util.Collections
+
+import org.apache.calcite.plan.RelOptTable
+import org.apache.calcite.plan.RelOptTable.ToRelContext
+import org.apache.calcite.rel.{RelDistribution, RelCollation, RelNode}
+import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
+import org.apache.calcite.schema.Schema.TableType
+import org.apache.calcite.schema.impl.AbstractTable
+import org.apache.calcite.schema.{TranslatableTable, Statistic}
+import org.apache.calcite.util.ImmutableBitSet
+
+class TableTable(relNode: RelNode) extends AbstractTable with 
TranslatableTable {
+
+  override def getStatistic: Statistic = new DefaultTableStatistic
+
+  override def getJdbcTableType: TableType = ???
+
+  override def getRowType(typeFactory: RelDataTypeFactory): RelDataType = 
relNode.getRowType
+
+  override def toRel(context: ToRelContext, relOptTable: RelOptTable): 
RelNode = {
+relNode
+  }
+}
+
+class DefaultTableStatistic extends Statistic {
--- End diff --

Do not provide statistics here. It might override valid statistics that 
Calcite computed from the relational expression that the Table represents.


> Add methods and utilities to register DataSets and Tables in the 
> TableEnvironment
> -
>
> Key: FLINK-3639
> URL: https://issues.apache.org/jira/browse/FLINK-3639
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API
>Affects Versions: 1.1.0
>Reporter: Vasia Kalavri
>Assignee: Vasia Kalavri
>
> In order to make tables queryable from SQL we need to register them under a 
> unique name in the TableEnvironment.
> [This design 
> document|https://docs.google.com/document/d/1sITIShmJMGegzAjGqFuwiN_iw1urwykKsLiacokxSw0/edit]
>  describes the proposed API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2909] [Gelly] Graph generators

2016-03-22 Thread greghogan
Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/1807#discussion_r57067037
  
--- Diff: 
flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/AbstractGraphTest.java
 ---
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.generator;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.junit.Before;
+
+public class AbstractGraphTest {
+
+   protected ExecutionEnvironment env;
+
+@Before
+public void setup() {
+   env = ExecutionEnvironment.createCollectionsEnvironment();
+   env.getConfig().disableSysoutLogging();
--- End diff --

I think this has no effect on a CollectionEnvironment.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3639] add methods for registering datas...

2016-03-22 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1827#discussion_r57066925
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/TableTable.scala
 ---
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.schema
+
+import java.lang.Double
+import java.util
+import java.util.Collections
+
+import org.apache.calcite.plan.RelOptTable
+import org.apache.calcite.plan.RelOptTable.ToRelContext
+import org.apache.calcite.rel.{RelDistribution, RelCollation, RelNode}
+import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
+import org.apache.calcite.schema.Schema.TableType
+import org.apache.calcite.schema.impl.AbstractTable
+import org.apache.calcite.schema.{TranslatableTable, Statistic}
+import org.apache.calcite.util.ImmutableBitSet
+
+class TableTable(relNode: RelNode) extends AbstractTable with 
TranslatableTable {
+
+  override def getStatistic: Statistic = new DefaultTableStatistic
+
+  override def getJdbcTableType: TableType = ???
+
+  override def getRowType(typeFactory: RelDataTypeFactory): RelDataType = 
relNode.getRowType
+
+  override def toRel(context: ToRelContext, relOptTable: RelOptTable): 
RelNode = {
+relNode
+  }
+}
+
+class DefaultTableStatistic extends Statistic {
--- End diff --

Do not provide statistics here. It might override valid statistics that 
Calcite computed from the relational expression that the Table represents.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3639) Add methods and utilities to register DataSets and Tables in the TableEnvironment

2016-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3639?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15207269#comment-15207269
 ] 

ASF GitHub Bot commented on FLINK-3639:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1827#discussion_r57066656
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/TableTable.scala
 ---
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.schema
+
+import java.lang.Double
+import java.util
+import java.util.Collections
+
+import org.apache.calcite.plan.RelOptTable
+import org.apache.calcite.plan.RelOptTable.ToRelContext
+import org.apache.calcite.rel.{RelDistribution, RelCollation, RelNode}
+import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
+import org.apache.calcite.schema.Schema.TableType
+import org.apache.calcite.schema.impl.AbstractTable
+import org.apache.calcite.schema.{TranslatableTable, Statistic}
+import org.apache.calcite.util.ImmutableBitSet
+
+class TableTable(relNode: RelNode) extends AbstractTable with 
TranslatableTable {
--- End diff --

Add a brief comment what this class is about. The name might be confusing.


> Add methods and utilities to register DataSets and Tables in the 
> TableEnvironment
> -
>
> Key: FLINK-3639
> URL: https://issues.apache.org/jira/browse/FLINK-3639
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API
>Affects Versions: 1.1.0
>Reporter: Vasia Kalavri
>Assignee: Vasia Kalavri
>
> In order to make tables queryable from SQL we need to register them under a 
> unique name in the TableEnvironment.
> [This design 
> document|https://docs.google.com/document/d/1sITIShmJMGegzAjGqFuwiN_iw1urwykKsLiacokxSw0/edit]
>  describes the proposed API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3639] add methods for registering datas...

2016-03-22 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1827#discussion_r57066656
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/TableTable.scala
 ---
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.schema
+
+import java.lang.Double
+import java.util
+import java.util.Collections
+
+import org.apache.calcite.plan.RelOptTable
+import org.apache.calcite.plan.RelOptTable.ToRelContext
+import org.apache.calcite.rel.{RelDistribution, RelCollation, RelNode}
+import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
+import org.apache.calcite.schema.Schema.TableType
+import org.apache.calcite.schema.impl.AbstractTable
+import org.apache.calcite.schema.{TranslatableTable, Statistic}
+import org.apache.calcite.util.ImmutableBitSet
+
+class TableTable(relNode: RelNode) extends AbstractTable with 
TranslatableTable {
--- End diff --

Add a brief comment what this class is about. The name might be confusing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3639) Add methods and utilities to register DataSets and Tables in the TableEnvironment

2016-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3639?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15207264#comment-15207264
 ] 

ASF GitHub Bot commented on FLINK-3639:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1827#discussion_r57066078
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/TableEnvironment.scala
 ---
@@ -72,5 +74,68 @@ class TableEnvironment {
  new ScalaBatchTranslator(config).translate[T](table.relNode)
   }
 
-}
+  /**
+   * Registers a DataSet under a unique name, so that it can be used in 
SQL queries.
+   * The fields of the DataSet type are used to name the Table fields.
+   * @param name the Table name
+   * @param dataset the DataSet to register
+   */
+  def registerDataSet[T](name: String, dataset: DataSet[T]): Unit = {
--- End diff --

Does it make sense to have an `AbstractTableEnvironment` for the common 
parts of the Java and the Scala `TableEnvironment`s?


> Add methods and utilities to register DataSets and Tables in the 
> TableEnvironment
> -
>
> Key: FLINK-3639
> URL: https://issues.apache.org/jira/browse/FLINK-3639
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API
>Affects Versions: 1.1.0
>Reporter: Vasia Kalavri
>Assignee: Vasia Kalavri
>
> In order to make tables queryable from SQL we need to register them under a 
> unique name in the TableEnvironment.
> [This design 
> document|https://docs.google.com/document/d/1sITIShmJMGegzAjGqFuwiN_iw1urwykKsLiacokxSw0/edit]
>  describes the proposed API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3639] add methods for registering datas...

2016-03-22 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1827#discussion_r57066078
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/TableEnvironment.scala
 ---
@@ -72,5 +74,68 @@ class TableEnvironment {
  new ScalaBatchTranslator(config).translate[T](table.relNode)
   }
 
-}
+  /**
+   * Registers a DataSet under a unique name, so that it can be used in 
SQL queries.
+   * The fields of the DataSet type are used to name the Table fields.
+   * @param name the Table name
+   * @param dataset the DataSet to register
+   */
+  def registerDataSet[T](name: String, dataset: DataSet[T]): Unit = {
--- End diff --

Does it make sense to have an `AbstractTableEnvironment` for the common 
parts of the Java and the Scala `TableEnvironment`s?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3639) Add methods and utilities to register DataSets and Tables in the TableEnvironment

2016-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3639?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15207262#comment-15207262
 ] 

ASF GitHub Bot commented on FLINK-3639:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1827#discussion_r57065868
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/TableEnvironment.scala
 ---
@@ -72,5 +74,68 @@ class TableEnvironment {
  new ScalaBatchTranslator(config).translate[T](table.relNode)
   }
 
-}
+  /**
+   * Registers a DataSet under a unique name, so that it can be used in 
SQL queries.
+   * The fields of the DataSet type are used to name the Table fields.
+   * @param name the Table name
+   * @param dataset the DataSet to register
+   */
+  def registerDataSet[T](name: String, dataset: DataSet[T]): Unit = {
+
+val (fieldNames, fieldIndexes) = 
TranslationContext.getFieldInfo[T](dataset.getType)
+val dataSetTable = new DataSetTable[T](
+  dataset.javaSet,
+  fieldIndexes,
+  fieldNames
+)
+TranslationContext.addAndRegisterDataSet(dataSetTable, name)
+  }
+
+  /**
+   * Registers a DataSet under a unique name, so that it can be used in 
SQL queries.
+   * The fields of the DataSet type are renamed to the given set of fields.
+   *
+   * @param name the Table name
+   * @param dataset the DataSet to register
+   * @param fields the field names expression
+   */
+  def registerDataSet[T](name: String, dataset: DataSet[T], fields: 
Expression*): Unit = {
+
+val (fieldNames, fieldIndexes) = TranslationContext.getFieldInfo[T](
+  dataset.getType, fields.toArray)
 
+val dataSetTable = new DataSetTable[T](
+  dataset.javaSet,
+  fieldIndexes.toArray,
+  fieldNames.toArray
+)
+TranslationContext.addAndRegisterDataSet(dataSetTable, name)
+  }
+
+  /**
+   * Registers a Table under a unique name, so that it can be used in SQL 
queries.
+   * @param name the Table name
+   * @param table the Table to register
+   */
+  def registerTable[T](name: String, table: Table): Unit = {
+val tableTable = new TableTable(table.getRelNode())
+TranslationContext.registerTable(tableTable, name)
+  }
+
+  /**
+   * Retrieve a registered Table.
+   * @param tableName the name under which the Table has been registered
+   * @return the Table object
+   */
+  @throws[TableException]
+  def scan(tableName: String): Table = {
+if (TranslationContext.isRegistered(tableName)) {
+  val relBuilder = TranslationContext.getRelBuilder
+  relBuilder.scan(tableName)
+  new Table(relBuilder.build(), relBuilder)
+}
+else {
+  throw new TableException("Table \"" + tableName + "\" was not found 
in the registry.")
--- End diff --

Use Scala's `s""` string building here as well.


> Add methods and utilities to register DataSets and Tables in the 
> TableEnvironment
> -
>
> Key: FLINK-3639
> URL: https://issues.apache.org/jira/browse/FLINK-3639
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API
>Affects Versions: 1.1.0
>Reporter: Vasia Kalavri
>Assignee: Vasia Kalavri
>
> In order to make tables queryable from SQL we need to register them under a 
> unique name in the TableEnvironment.
> [This design 
> document|https://docs.google.com/document/d/1sITIShmJMGegzAjGqFuwiN_iw1urwykKsLiacokxSw0/edit]
>  describes the proposed API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3639] add methods for registering datas...

2016-03-22 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1827#discussion_r57065868
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/TableEnvironment.scala
 ---
@@ -72,5 +74,68 @@ class TableEnvironment {
  new ScalaBatchTranslator(config).translate[T](table.relNode)
   }
 
-}
+  /**
+   * Registers a DataSet under a unique name, so that it can be used in 
SQL queries.
+   * The fields of the DataSet type are used to name the Table fields.
+   * @param name the Table name
+   * @param dataset the DataSet to register
+   */
+  def registerDataSet[T](name: String, dataset: DataSet[T]): Unit = {
+
+val (fieldNames, fieldIndexes) = 
TranslationContext.getFieldInfo[T](dataset.getType)
+val dataSetTable = new DataSetTable[T](
+  dataset.javaSet,
+  fieldIndexes,
+  fieldNames
+)
+TranslationContext.addAndRegisterDataSet(dataSetTable, name)
+  }
+
+  /**
+   * Registers a DataSet under a unique name, so that it can be used in 
SQL queries.
+   * The fields of the DataSet type are renamed to the given set of fields.
+   *
+   * @param name the Table name
+   * @param dataset the DataSet to register
+   * @param fields the field names expression
+   */
+  def registerDataSet[T](name: String, dataset: DataSet[T], fields: 
Expression*): Unit = {
+
+val (fieldNames, fieldIndexes) = TranslationContext.getFieldInfo[T](
+  dataset.getType, fields.toArray)
 
+val dataSetTable = new DataSetTable[T](
+  dataset.javaSet,
+  fieldIndexes.toArray,
+  fieldNames.toArray
+)
+TranslationContext.addAndRegisterDataSet(dataSetTable, name)
+  }
+
+  /**
+   * Registers a Table under a unique name, so that it can be used in SQL 
queries.
+   * @param name the Table name
+   * @param table the Table to register
+   */
+  def registerTable[T](name: String, table: Table): Unit = {
+val tableTable = new TableTable(table.getRelNode())
+TranslationContext.registerTable(tableTable, name)
+  }
+
+  /**
+   * Retrieve a registered Table.
+   * @param tableName the name under which the Table has been registered
+   * @return the Table object
+   */
+  @throws[TableException]
+  def scan(tableName: String): Table = {
+if (TranslationContext.isRegistered(tableName)) {
+  val relBuilder = TranslationContext.getRelBuilder
+  relBuilder.scan(tableName)
+  new Table(relBuilder.build(), relBuilder)
+}
+else {
+  throw new TableException("Table \"" + tableName + "\" was not found 
in the registry.")
--- End diff --

Use Scala's `s""` string building here as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3639) Add methods and utilities to register DataSets and Tables in the TableEnvironment

2016-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3639?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15207261#comment-15207261
 ] 

ASF GitHub Bot commented on FLINK-3639:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1827#discussion_r57065751
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/TableEnvironment.scala
 ---
@@ -87,5 +90,72 @@ class TableEnvironment {
 new JavaBatchTranslator(config).translate[T](table.relNode)(typeInfo)
   }
 
-}
+  /**
+   * Registers a DataSet under a unique name, so that it can be used in 
SQL queries.
+   * The fields of the DataSet type are used to name the Table fields.
+   * @param name the Table name
+   * @param dataset the DataSet to register
+   */
+  def registerDataSet[T](name: String, dataset: DataSet[T]): Unit = {
+
+val (fieldNames, fieldIndexes) = 
TranslationContext.getFieldInfo[T](dataset.getType)
+val dataSetTable = new DataSetTable[T](
+  dataset,
+  fieldIndexes,
+  fieldNames
+)
+TranslationContext.addAndRegisterDataSet(dataSetTable, name)
+  }
+
+  /**
+   * Registers a DataSet under a unique name, so that it can be used in 
SQL queries.
+   * The fields of the DataSet type are renamed to the given set of fields.
+   *
+   * @param name the Table name
+   * @param dataset the DataSet to register
+   * @param fields the Table field names
+   */
+  def registerDataSet[T](name: String, dataset: DataSet[T], fields: 
String): Unit = {
+
+val exprs = ExpressionParser
+  .parseExpressionList(fields)
+  .toArray
+
+val (fieldNames, fieldIndexes) = 
TranslationContext.getFieldInfo[T](dataset.getType, exprs)
 
+val dataSetTable = new DataSetTable[T](
+  dataset,
+  fieldIndexes.toArray,
+  fieldNames.toArray
+)
+TranslationContext.addAndRegisterDataSet(dataSetTable, name)
+  }
+
+  /**
+   * Registers a Table under a unique name, so that it can be used in SQL 
queries.
+   * @param name the Table name
+   * @param table the Table to register
+   */
+  def registerTable[T](name: String, table: Table): Unit = {
+val tableTable = new TableTable(table.getRelNode())
+TranslationContext.registerTable(tableTable, name)
+  }
+
+  /**
+   * Retrieve a registered Table.
+   * @param tableName the name under which the Table has been registered
+   * @return the Table object
+   */
+  @throws[TableException]
+  def scan(tableName: String): Table = {
+if (TranslationContext.isRegistered(tableName)) {
+  val relBuilder = TranslationContext.getRelBuilder
+  relBuilder.scan(tableName)
+  new Table(relBuilder.build(), relBuilder)
+}
+else {
+  throw new TableException("Table \"" + tableName + "\" was not found 
in the registry.")
--- End diff --

Strings can be build with Scala like this `s"Table \"$tableName\" was not 
found in the registry."`


> Add methods and utilities to register DataSets and Tables in the 
> TableEnvironment
> -
>
> Key: FLINK-3639
> URL: https://issues.apache.org/jira/browse/FLINK-3639
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API
>Affects Versions: 1.1.0
>Reporter: Vasia Kalavri
>Assignee: Vasia Kalavri
>
> In order to make tables queryable from SQL we need to register them under a 
> unique name in the TableEnvironment.
> [This design 
> document|https://docs.google.com/document/d/1sITIShmJMGegzAjGqFuwiN_iw1urwykKsLiacokxSw0/edit]
>  describes the proposed API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3639] add methods for registering datas...

2016-03-22 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1827#discussion_r57065751
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/TableEnvironment.scala
 ---
@@ -87,5 +90,72 @@ class TableEnvironment {
 new JavaBatchTranslator(config).translate[T](table.relNode)(typeInfo)
   }
 
-}
+  /**
+   * Registers a DataSet under a unique name, so that it can be used in 
SQL queries.
+   * The fields of the DataSet type are used to name the Table fields.
+   * @param name the Table name
+   * @param dataset the DataSet to register
+   */
+  def registerDataSet[T](name: String, dataset: DataSet[T]): Unit = {
+
+val (fieldNames, fieldIndexes) = 
TranslationContext.getFieldInfo[T](dataset.getType)
+val dataSetTable = new DataSetTable[T](
+  dataset,
+  fieldIndexes,
+  fieldNames
+)
+TranslationContext.addAndRegisterDataSet(dataSetTable, name)
+  }
+
+  /**
+   * Registers a DataSet under a unique name, so that it can be used in 
SQL queries.
+   * The fields of the DataSet type are renamed to the given set of fields.
+   *
+   * @param name the Table name
+   * @param dataset the DataSet to register
+   * @param fields the Table field names
+   */
+  def registerDataSet[T](name: String, dataset: DataSet[T], fields: 
String): Unit = {
+
+val exprs = ExpressionParser
+  .parseExpressionList(fields)
+  .toArray
+
+val (fieldNames, fieldIndexes) = 
TranslationContext.getFieldInfo[T](dataset.getType, exprs)
 
+val dataSetTable = new DataSetTable[T](
+  dataset,
+  fieldIndexes.toArray,
+  fieldNames.toArray
+)
+TranslationContext.addAndRegisterDataSet(dataSetTable, name)
+  }
+
+  /**
+   * Registers a Table under a unique name, so that it can be used in SQL 
queries.
+   * @param name the Table name
+   * @param table the Table to register
+   */
+  def registerTable[T](name: String, table: Table): Unit = {
+val tableTable = new TableTable(table.getRelNode())
+TranslationContext.registerTable(tableTable, name)
+  }
+
+  /**
+   * Retrieve a registered Table.
+   * @param tableName the name under which the Table has been registered
+   * @return the Table object
+   */
+  @throws[TableException]
+  def scan(tableName: String): Table = {
+if (TranslationContext.isRegistered(tableName)) {
+  val relBuilder = TranslationContext.getRelBuilder
+  relBuilder.scan(tableName)
+  new Table(relBuilder.build(), relBuilder)
+}
+else {
+  throw new TableException("Table \"" + tableName + "\" was not found 
in the registry.")
--- End diff --

Strings can be build with Scala like this `s"Table \"$tableName\" was not 
found in the registry."`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3639) Add methods and utilities to register DataSets and Tables in the TableEnvironment

2016-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3639?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15207254#comment-15207254
 ] 

ASF GitHub Bot commented on FLINK-3639:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1827#discussion_r57065169
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
 ---
@@ -93,7 +93,10 @@ object FlinkRuleSets {
 DataSetCalcRule.INSTANCE,
 DataSetJoinRule.INSTANCE,
 DataSetScanRule.INSTANCE,
-DataSetUnionRule.INSTANCE
+DataSetUnionRule.INSTANCE,
+
+// convert a logical table scan to a relational expression
+TableScanRule.INSTANCE
--- End diff --

Can we move this rule to the top to the other default Calcite rules?


> Add methods and utilities to register DataSets and Tables in the 
> TableEnvironment
> -
>
> Key: FLINK-3639
> URL: https://issues.apache.org/jira/browse/FLINK-3639
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API
>Affects Versions: 1.1.0
>Reporter: Vasia Kalavri
>Assignee: Vasia Kalavri
>
> In order to make tables queryable from SQL we need to register them under a 
> unique name in the TableEnvironment.
> [This design 
> document|https://docs.google.com/document/d/1sITIShmJMGegzAjGqFuwiN_iw1urwykKsLiacokxSw0/edit]
>  describes the proposed API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3639] add methods for registering datas...

2016-03-22 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1827#discussion_r57065169
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
 ---
@@ -93,7 +93,10 @@ object FlinkRuleSets {
 DataSetCalcRule.INSTANCE,
 DataSetJoinRule.INSTANCE,
 DataSetScanRule.INSTANCE,
-DataSetUnionRule.INSTANCE
+DataSetUnionRule.INSTANCE,
+
+// convert a logical table scan to a relational expression
+TableScanRule.INSTANCE
--- End diff --

Can we move this rule to the top to the other default Calcite rules?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3639) Add methods and utilities to register DataSets and Tables in the TableEnvironment

2016-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3639?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15207253#comment-15207253
 ] 

ASF GitHub Bot commented on FLINK-3639:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1827#discussion_r57065041
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TranslationContext.scala
 ---
@@ -59,29 +64,55 @@ object TranslationContext {
   .traitDefs(ConventionTraitDef.INSTANCE)
   .build
 
-tabNames = Map[AbstractTable, String]()
-
+tablesRegistry = Map[String, AbstractTable]()
 relBuilder = RelBuilder.create(frameworkConfig)
-
 nameCntr.set(0)
 
   }
 
   def addDataSet(newTable: DataSetTable[_]): String = {
--- End diff --

Refactor to `registerTable(AbstractTable)` as well? 


> Add methods and utilities to register DataSets and Tables in the 
> TableEnvironment
> -
>
> Key: FLINK-3639
> URL: https://issues.apache.org/jira/browse/FLINK-3639
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API
>Affects Versions: 1.1.0
>Reporter: Vasia Kalavri
>Assignee: Vasia Kalavri
>
> In order to make tables queryable from SQL we need to register them under a 
> unique name in the TableEnvironment.
> [This design 
> document|https://docs.google.com/document/d/1sITIShmJMGegzAjGqFuwiN_iw1urwykKsLiacokxSw0/edit]
>  describes the proposed API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3639) Add methods and utilities to register DataSets and Tables in the TableEnvironment

2016-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3639?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15207249#comment-15207249
 ] 

ASF GitHub Bot commented on FLINK-3639:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1827#discussion_r57064833
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TranslationContext.scala
 ---
@@ -59,29 +64,55 @@ object TranslationContext {
   .traitDefs(ConventionTraitDef.INSTANCE)
   .build
 
-tabNames = Map[AbstractTable, String]()
-
+tablesRegistry = Map[String, AbstractTable]()
 relBuilder = RelBuilder.create(frameworkConfig)
-
 nameCntr.set(0)
 
   }
 
   def addDataSet(newTable: DataSetTable[_]): String = {
+val tabName = "DataSetTable_" + nameCntr.getAndIncrement()
+tables.add(tabName, newTable)
+tabName
+  }
+
+  @throws[TableException]
+  def addAndRegisterDataSet(table: DataSetTable[_], name: String): Unit = {
--- End diff --

I think `addAndRegisterDataSet(DataSetTable, String)` and 
`registerTable(TableTable, String)` can be combined to 
`registerTable(AbstractTable, String)` 


> Add methods and utilities to register DataSets and Tables in the 
> TableEnvironment
> -
>
> Key: FLINK-3639
> URL: https://issues.apache.org/jira/browse/FLINK-3639
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API
>Affects Versions: 1.1.0
>Reporter: Vasia Kalavri
>Assignee: Vasia Kalavri
>
> In order to make tables queryable from SQL we need to register them under a 
> unique name in the TableEnvironment.
> [This design 
> document|https://docs.google.com/document/d/1sITIShmJMGegzAjGqFuwiN_iw1urwykKsLiacokxSw0/edit]
>  describes the proposed API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3639] add methods for registering datas...

2016-03-22 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1827#discussion_r57065041
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TranslationContext.scala
 ---
@@ -59,29 +64,55 @@ object TranslationContext {
   .traitDefs(ConventionTraitDef.INSTANCE)
   .build
 
-tabNames = Map[AbstractTable, String]()
-
+tablesRegistry = Map[String, AbstractTable]()
 relBuilder = RelBuilder.create(frameworkConfig)
-
 nameCntr.set(0)
 
   }
 
   def addDataSet(newTable: DataSetTable[_]): String = {
--- End diff --

Refactor to `registerTable(AbstractTable)` as well? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3639] add methods for registering datas...

2016-03-22 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1827#discussion_r57064833
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TranslationContext.scala
 ---
@@ -59,29 +64,55 @@ object TranslationContext {
   .traitDefs(ConventionTraitDef.INSTANCE)
   .build
 
-tabNames = Map[AbstractTable, String]()
-
+tablesRegistry = Map[String, AbstractTable]()
 relBuilder = RelBuilder.create(frameworkConfig)
-
 nameCntr.set(0)
 
   }
 
   def addDataSet(newTable: DataSetTable[_]): String = {
+val tabName = "DataSetTable_" + nameCntr.getAndIncrement()
+tables.add(tabName, newTable)
+tabName
+  }
+
+  @throws[TableException]
+  def addAndRegisterDataSet(table: DataSetTable[_], name: String): Unit = {
--- End diff --

I think `addAndRegisterDataSet(DataSetTable, String)` and 
`registerTable(TableTable, String)` can be combined to 
`registerTable(AbstractTable, String)` 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-3656) Rework TableAPI tests

2016-03-22 Thread Vasia Kalavri (JIRA)
Vasia Kalavri created FLINK-3656:


 Summary: Rework TableAPI tests
 Key: FLINK-3656
 URL: https://issues.apache.org/jira/browse/FLINK-3656
 Project: Flink
  Issue Type: Improvement
  Components: Table API
Reporter: Vasia Kalavri


We should look into whether we could rework the Table API tests to extract 
check of Table API parts that are common for DataSet and DataStream.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3547) Add support for streaming projection, selection, and union

2016-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15207188#comment-15207188
 ] 

ASF GitHub Bot commented on FLINK-3547:
---

Github user vasia commented on the pull request:

https://github.com/apache/flink/pull/1820#issuecomment-19747
  
I have updated the PR. Thanks!


> Add support for streaming projection, selection, and union
> --
>
> Key: FLINK-3547
> URL: https://issues.apache.org/jira/browse/FLINK-3547
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API
>Reporter: Vasia Kalavri
>Assignee: Vasia Kalavri
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3547] add support for streaming filter,...

2016-03-22 Thread vasia
Github user vasia commented on the pull request:

https://github.com/apache/flink/pull/1820#issuecomment-19747
  
I have updated the PR. Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-3655) Allow comma-separated or multiple directories to be specified for FileInputFormat

2016-03-22 Thread Gna Phetsarath (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3655?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gna Phetsarath updated FLINK-3655:
--
Description: 
Allow comma-separated or multiple directories to be specified for 
FileInputFormat so that a DataSource will process the directories sequentially.

   
env.readFile("/data/2016/01/01/*/*,/data/2016/01/02/*/*,/data/2016/01/03/*/*")

in Scala

   env.readFile(paths: Seq[String])
or 
  env.readFile(path: String, otherPaths: String*)

Wildcard support would be a bonus.

  was:
Allow comma-separated directories to be specified for FileInputFormat so that a 
DataSource will process the directories sequentially.

   
env.readFile("/data/2016/01/01/*/*,/data/2016/01/02/*/*,/data/2016/01/03/*/*")

in Scala

   env.readFile(paths: Seq[String])
or 
  env.readFile(path: String, otherPaths: String*)

Wildcard support would be a bonus.


> Allow comma-separated or multiple directories to be specified for 
> FileInputFormat
> -
>
> Key: FLINK-3655
> URL: https://issues.apache.org/jira/browse/FLINK-3655
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.0.0
>Reporter: Gna Phetsarath
>Priority: Minor
>
> Allow comma-separated or multiple directories to be specified for 
> FileInputFormat so that a DataSource will process the directories 
> sequentially.
>
> env.readFile("/data/2016/01/01/*/*,/data/2016/01/02/*/*,/data/2016/01/03/*/*")
> in Scala
>env.readFile(paths: Seq[String])
> or 
>   env.readFile(path: String, otherPaths: String*)
> Wildcard support would be a bonus.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-3655) Allow comma-separated or multiple directories to be specified for FileInputFormat

2016-03-22 Thread Gna Phetsarath (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3655?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gna Phetsarath updated FLINK-3655:
--
Summary: Allow comma-separated or multiple directories to be specified for 
FileInputFormat  (was: Allow comma-separated directories to be specified for 
FileInputFormat)

> Allow comma-separated or multiple directories to be specified for 
> FileInputFormat
> -
>
> Key: FLINK-3655
> URL: https://issues.apache.org/jira/browse/FLINK-3655
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.0.0
>Reporter: Gna Phetsarath
>Priority: Minor
>
> Allow comma-separated directories to be specified for FileInputFormat so that 
> a DataSource will process the directories sequentially.
>
> env.readFile("/data/2016/01/01/*/*,/data/2016/01/02/*/*,/data/2016/01/03/*/*")
> in Scala
>env.readFile(paths: Seq[String])
> or 
>   env.readFile(path: String, otherPaths: String*)
> Wildcard support would be a bonus.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-3655) Allow comma-separated directories to be specified for FileInputFormat

2016-03-22 Thread Gna Phetsarath (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3655?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gna Phetsarath updated FLINK-3655:
--
Description: 
Allow comma-separated directories to be specified for FileInputFormat so that a 
DataSource will process the directories sequentially.

   
env.readFile("/data/2016/01/01/*/*,/data/2016/01/02/*/*,/data/2016/01/03/*/*")

in Scala

   env.readFile(paths: Seq[String])
or 
  env.readFile(path: String, otherPaths: String*)

Wildcard support would be a bonus.

  was:
Allow comma-separated directories to be specified for FileInputFormat so that a 
DataSource will process the directories sequentially.

env.readFile("/data/2016/01/01/*/*,/data/2016/01/02/*/*,/data/2016/01/03/*/*")}}

Wildcard support would be a bonus.


> Allow comma-separated directories to be specified for FileInputFormat
> -
>
> Key: FLINK-3655
> URL: https://issues.apache.org/jira/browse/FLINK-3655
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.0.0
>Reporter: Gna Phetsarath
>Priority: Minor
>
> Allow comma-separated directories to be specified for FileInputFormat so that 
> a DataSource will process the directories sequentially.
>
> env.readFile("/data/2016/01/01/*/*,/data/2016/01/02/*/*,/data/2016/01/03/*/*")
> in Scala
>env.readFile(paths: Seq[String])
> or 
>   env.readFile(path: String, otherPaths: String*)
> Wildcard support would be a bonus.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (FLINK-3613) Add standard deviation, mean, variance to list of Aggregations

2016-03-22 Thread Todd Lisonbee (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15207054#comment-15207054
 ] 

Todd Lisonbee edited comment on FLINK-3613 at 3/22/16 7:02 PM:
---

Attached is a design for improvements to DataSet.aggregate() needed to 
implement additional aggregations like Standard Deviation.

To maintain public API's it seems like the best path would be to have 
AggregateOperator implement CustomUnaryOperation but that seems weird because 
no other Operator is done that way.  But other options I see don't seem 
consistent with other Operators either.

I really could use some feedback on this.  Thanks.

Also, should I be posting this to the Dev mailing list?


was (Author: tlisonbee):
Attached is a design for improvements to DataSet.aggregate() needed to 
implement additional aggregations like Standard Deviation.

To maintain public API's it seems like the best path would be to have 
AggregateOperator implement CustomUnaryOperation but that seems weird because 
no other Operator is done that way.  But other options I see don't seem 
consistent with other Operators either.

I really could use some feedback on this.  Thanks.

> Add standard deviation, mean, variance to list of Aggregations
> --
>
> Key: FLINK-3613
> URL: https://issues.apache.org/jira/browse/FLINK-3613
> Project: Flink
>  Issue Type: Improvement
>Reporter: Todd Lisonbee
>Priority: Minor
> Attachments: DataSet-Aggregation-Design-March2016-v1.txt
>
>
> Implement standard deviation, mean, variance for 
> org.apache.flink.api.java.aggregation.Aggregations
> Ideally implementation should be single pass and numerically stable.
> References:
> "Scalable and Numerically Stable Descriptive Statistics in SystemML", Tian et 
> al, International Conference on Data Engineering 2012
> http://dl.acm.org/citation.cfm?id=2310392
> "The Kahan summation algorithm (also known as compensated summation) reduces 
> the numerical errors that occur when adding a sequence of finite precision 
> floating point numbers. Numerical errors arise due to truncation and 
> rounding. These errors can lead to numerical instability when calculating 
> variance."
> https://en.wikipedia.org/wiki/Kahan_summation_algorithm



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-3613) Add standard deviation, mean, variance to list of Aggregations

2016-03-22 Thread Todd Lisonbee (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3613?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Todd Lisonbee updated FLINK-3613:
-
Attachment: DataSet-Aggregation-Design-March2016-v1.txt

Attached is a design for improvements to DataSet.aggregate() needed to 
implement additional aggregations like Standard Deviation.

To maintain public API's it seems like the best path would be to have 
AggregateOperator implement CustomUnaryOperation but that seems weird because 
no other Operator is done that way.  But other options I see don't seem 
consistent with other Operators either.

I really could use some feedback on this.  Thanks.

> Add standard deviation, mean, variance to list of Aggregations
> --
>
> Key: FLINK-3613
> URL: https://issues.apache.org/jira/browse/FLINK-3613
> Project: Flink
>  Issue Type: Improvement
>Reporter: Todd Lisonbee
>Priority: Minor
> Attachments: DataSet-Aggregation-Design-March2016-v1.txt
>
>
> Implement standard deviation, mean, variance for 
> org.apache.flink.api.java.aggregation.Aggregations
> Ideally implementation should be single pass and numerically stable.
> References:
> "Scalable and Numerically Stable Descriptive Statistics in SystemML", Tian et 
> al, International Conference on Data Engineering 2012
> http://dl.acm.org/citation.cfm?id=2310392
> "The Kahan summation algorithm (also known as compensated summation) reduces 
> the numerical errors that occur when adding a sequence of finite precision 
> floating point numbers. Numerical errors arise due to truncation and 
> rounding. These errors can lead to numerical instability when calculating 
> variance."
> https://en.wikipedia.org/wiki/Kahan_summation_algorithm



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-3655) Allow comma-separated directories to be specified for FileInputFormat

2016-03-22 Thread Gna Phetsarath (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3655?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gna Phetsarath updated FLINK-3655:
--
Description: 
Allow comma-separated directories to be specified for FileInputFormat so that a 
DataSource will process the directories sequentially.

env.readFile("/data/2016/01/01/*/*,/data/2016/01/02/*/*,/data/2016/01/03/*/*")}}

Wildcard support would be a bonus.

  was:
Allow comma-separated multiple directories to be specified for FileInputFormat. 

env.readFile("/data/2016/01/01/*/*,/data/2016/01/02/*/*,/data/2016/01/03/*/*")}}

Wildcard support would be a bonus.


> Allow comma-separated directories to be specified for FileInputFormat
> -
>
> Key: FLINK-3655
> URL: https://issues.apache.org/jira/browse/FLINK-3655
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.0.0
>Reporter: Gna Phetsarath
>Priority: Minor
>
> Allow comma-separated directories to be specified for FileInputFormat so that 
> a DataSource will process the directories sequentially.
> env.readFile("/data/2016/01/01/*/*,/data/2016/01/02/*/*,/data/2016/01/03/*/*")}}
> Wildcard support would be a bonus.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-3655) Allow comma-separated directories to be specified for FileInputFormat

2016-03-22 Thread Gna Phetsarath (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3655?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gna Phetsarath updated FLINK-3655:
--
Summary: Allow comma-separated directories to be specified for 
FileInputFormat  (was: Allow comma-separated multiple directories to be 
specified for FileInputFormat)

> Allow comma-separated directories to be specified for FileInputFormat
> -
>
> Key: FLINK-3655
> URL: https://issues.apache.org/jira/browse/FLINK-3655
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.0.0
>Reporter: Gna Phetsarath
>Priority: Minor
>
> Allow comma-separated multiple directories to be specified for 
> FileInputFormat. 
> env.readFile("/data/2016/01/01/*/*,/data/2016/01/02/*/*,/data/2016/01/03/*/*")}}
> Wildcard support would be a bonus.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-3655) Allow comma-separated multiple directories to be specified for FileInputFormat

2016-03-22 Thread Gna Phetsarath (JIRA)
Gna Phetsarath created FLINK-3655:
-

 Summary: Allow comma-separated multiple directories to be 
specified for FileInputFormat
 Key: FLINK-3655
 URL: https://issues.apache.org/jira/browse/FLINK-3655
 Project: Flink
  Issue Type: Improvement
  Components: Core
Affects Versions: 1.0.0
Reporter: Gna Phetsarath
Priority: Minor


Allow comma-separated multiple directories to be specified for FileInputFormat. 

env.readFile("/data/2016/01/01/*/*,/data/2016/01/02/*/*,/data/2016/01/03/*/*")}}

Wildcard support would be a bonus.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2821) Change Akka configuration to allow accessing actors from different URLs

2016-03-22 Thread Robert Metzger (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2821?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15206980#comment-15206980
 ] 

Robert Metzger commented on FLINK-2821:
---

That sounds like a very good idea!
+1

> Change Akka configuration to allow accessing actors from different URLs
> ---
>
> Key: FLINK-2821
> URL: https://issues.apache.org/jira/browse/FLINK-2821
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Runtime
>Reporter: Robert Metzger
>Assignee: Maximilian Michels
>
> Akka expects the actor's URL to be exactly matching.
> As pointed out here, cases where users were complaining about this: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Error-trying-to-access-JM-through-proxy-td3018.html
>   - Proxy routing (as described here, send to the proxy URL, receiver 
> recognizes only original URL)
>   - Using hostname / IP interchangeably does not work (we solved this by 
> always putting IP addresses into URLs, never hostnames)
>   - Binding to multiple interfaces (any local 0.0.0.0) does not work. Still 
> no solution to that (but seems not too much of a restriction)
> I am aware that this is not possible due to Akka, so it is actually not a 
> Flink bug. But I think we should track the resolution of the issue here 
> anyways because its affecting our user's satisfaction.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15206875#comment-15206875
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/1741#issuecomment-199934108
  
Thanks a lot @tillrohrmann for taking the time to look into the bulk of 
code. I have already eagerly addressed most of your comments in the additional 
commits I pushed. Next, I'll revise the cluster shutdown logic and the 
re-connect in case of unresponsiveness of the resource manager.


> ResourceManager runtime components
> --
>
> Key: FLINK-3544
> URL: https://issues.apache.org/jira/browse/FLINK-3544
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Affects Versions: 1.1.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
> Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3544] Introduce ResourceManager compone...

2016-03-22 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/1741#issuecomment-199934108
  
Thanks a lot @tillrohrmann for taking the time to look into the bulk of 
code. I have already eagerly addressed most of your comments in the additional 
commits I pushed. Next, I'll revise the cluster shutdown logic and the 
re-connect in case of unresponsiveness of the resource manager.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3544] Introduce ResourceManager compone...

2016-03-22 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r57031901
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -312,59 +323,121 @@ class JobManager(
 
   leaderSessionID = None
 
-case RegisterTaskManager(
-  connectionInfo,
-  hardwareInformation,
-  numberOfSlots) =>
+case msg: RegisterResourceManager =>
+  log.debug(s"Resource manager registration: $msg")
+
+  // ditch current resource manager (if any)
+  currentResourceManager = Option(msg.resourceManager())
+
+  val taskManagerResources = 
instanceManager.getAllRegisteredInstances.asScala.map(
+instance => instance.getResourceId).toList.asJava
+
+  // confirm registration and send known task managers with their 
resource ids
+  sender ! decorateMessage(new RegisterResourceManagerSuccessful(self, 
taskManagerResources))
+
+case msg: DisconnectResourceManager =>
+  log.debug(s"Resource manager disconnect: $msg")
+
+  currentResourceManager match {
+case Some(rm) if rm.equals(msg.resourceManager()) =>
+  // we should ditch the current resource manager
+  log.debug(s"Disconnecting resource manager $rm.")
+  // send the old one a disconnect message
+  rm ! decorateMessage(new TriggerRegistrationAtJobManager(self))
+  currentResourceManager = None
+case None =>
+  // not connected, thus ignoring this message
+  log.warn(s"No resource manager ${msg.resourceManager()} 
connected. Can't disconnect.")
+  }
+
+case msg @ RegisterTaskManager(
+  resourceId,
+  connectionInfo,
+  hardwareInformation,
+  numberOfSlots) =>
+  // we are being informed by the ResourceManager that a new task 
manager is available
+  log.debug(s"RegisterTaskManager: $msg")
 
   val taskManager = sender()
 
+  currentResourceManager match {
+case Some(rm) =>
+  val future = (rm ? decorateMessage(new 
RegisterResource(taskManager, msg)))(timeout)
+  future.onComplete {
+case scala.util.Success(response) =>
+  // the resource manager is available and answered
+  self ! response
+case scala.util.Failure(t) =>
--- End diff --

Done (doesn't change the diff).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15206850#comment-15206850
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r57031901
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -312,59 +323,121 @@ class JobManager(
 
   leaderSessionID = None
 
-case RegisterTaskManager(
-  connectionInfo,
-  hardwareInformation,
-  numberOfSlots) =>
+case msg: RegisterResourceManager =>
+  log.debug(s"Resource manager registration: $msg")
+
+  // ditch current resource manager (if any)
+  currentResourceManager = Option(msg.resourceManager())
+
+  val taskManagerResources = 
instanceManager.getAllRegisteredInstances.asScala.map(
+instance => instance.getResourceId).toList.asJava
+
+  // confirm registration and send known task managers with their 
resource ids
+  sender ! decorateMessage(new RegisterResourceManagerSuccessful(self, 
taskManagerResources))
+
+case msg: DisconnectResourceManager =>
+  log.debug(s"Resource manager disconnect: $msg")
+
+  currentResourceManager match {
+case Some(rm) if rm.equals(msg.resourceManager()) =>
+  // we should ditch the current resource manager
+  log.debug(s"Disconnecting resource manager $rm.")
+  // send the old one a disconnect message
+  rm ! decorateMessage(new TriggerRegistrationAtJobManager(self))
+  currentResourceManager = None
+case None =>
+  // not connected, thus ignoring this message
+  log.warn(s"No resource manager ${msg.resourceManager()} 
connected. Can't disconnect.")
+  }
+
+case msg @ RegisterTaskManager(
+  resourceId,
+  connectionInfo,
+  hardwareInformation,
+  numberOfSlots) =>
+  // we are being informed by the ResourceManager that a new task 
manager is available
+  log.debug(s"RegisterTaskManager: $msg")
 
   val taskManager = sender()
 
+  currentResourceManager match {
+case Some(rm) =>
+  val future = (rm ? decorateMessage(new 
RegisterResource(taskManager, msg)))(timeout)
+  future.onComplete {
+case scala.util.Success(response) =>
+  // the resource manager is available and answered
+  self ! response
+case scala.util.Failure(t) =>
--- End diff --

Done (doesn't change the diff).


> ResourceManager runtime components
> --
>
> Key: FLINK-3544
> URL: https://issues.apache.org/jira/browse/FLINK-3544
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Affects Versions: 1.1.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
> Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15206835#comment-15206835
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/1741#issuecomment-199920379
  
Great work @mxm! I really like the new architecture :-)

I had some minor comments. The only thing which is important to fix is that 
the `JobManager` can terminate if it is not connected to a 
`FlinkResourceManager`. Apart from that, I think we should merge it soon so 
that it can get some exposure. 


> ResourceManager runtime components
> --
>
> Key: FLINK-3544
> URL: https://issues.apache.org/jira/browse/FLINK-3544
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Affects Versions: 1.1.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
> Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3544] Introduce ResourceManager compone...

2016-03-22 Thread tillrohrmann
Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/1741#issuecomment-199920379
  
Great work @mxm! I really like the new architecture :-)

I had some minor comments. The only thing which is important to fix is that 
the `JobManager` can terminate if it is not connected to a 
`FlinkResourceManager`. Apart from that, I think we should merge it soon so 
that it can get some exposure. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15206832#comment-15206832
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r57030485
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java ---
@@ -0,0 +1,647 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import akka.actor.ActorRef;
+import akka.actor.Props;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.clusterframework.messages.StopCluster;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.yarn.messages.ContainersAllocated;
+import org.apache.flink.yarn.messages.ContainersComplete;
+
+import 
org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.NMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+import org.slf4j.Logger;
+
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Specialized Flink Resource Manager implementation for YARN clusters. It 
is started as the
+ * YARN ApplicationMaster and implements the YARN-specific logic for 
container requests and failure
+ * monitoring.
+ */
+public class YarnFlinkResourceManager extends 
FlinkResourceManager {
+   
+   /** The heartbeat interval while the resource master is waiting for 
containers */
+   private static final int FAST_YARN_HEARTBEAT_INTERVAL_MS = 500;
+
+   /** The default heartbeat interval during regular operation */
+   private static final int DEFAULT_YARN_HEARTBEAT_INTERVAL_MS = 5000;
+
+   /** The containers where a TaskManager is starting and we are waiting 
for it to register */
+   private final Map containersInLaunch;
+
+   /** Containers we have released, where we are waiting for an 
acknowledgement that
+* they are released */
+   private final Map containersBeingReturned;
+
+   /** The YARN / Hadoop configuration object */
+   private final YarnConfiguration yarnConfig;
+
+   /** The TaskManager container parameters (like container memory size) */
+   private final ContaineredTaskManagerParameters taskManagerParameters;
+
+   /** Context information used to start a TaskManager Java process */
+   private final ContainerLaunchContext taskManagerLaunchContext;
+
+   /** Host name for the container running this process */
+   private fi

[GitHub] flink pull request: [FLINK-3544] Introduce ResourceManager compone...

2016-03-22 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r57030485
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java ---
@@ -0,0 +1,647 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import akka.actor.ActorRef;
+import akka.actor.Props;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.clusterframework.messages.StopCluster;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.yarn.messages.ContainersAllocated;
+import org.apache.flink.yarn.messages.ContainersComplete;
+
+import 
org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.NMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+import org.slf4j.Logger;
+
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Specialized Flink Resource Manager implementation for YARN clusters. It 
is started as the
+ * YARN ApplicationMaster and implements the YARN-specific logic for 
container requests and failure
+ * monitoring.
+ */
+public class YarnFlinkResourceManager extends 
FlinkResourceManager {
+   
+   /** The heartbeat interval while the resource master is waiting for 
containers */
+   private static final int FAST_YARN_HEARTBEAT_INTERVAL_MS = 500;
+
+   /** The default heartbeat interval during regular operation */
+   private static final int DEFAULT_YARN_HEARTBEAT_INTERVAL_MS = 5000;
+
+   /** The containers where a TaskManager is starting and we are waiting 
for it to register */
+   private final Map containersInLaunch;
+
+   /** Containers we have released, where we are waiting for an 
acknowledgement that
+* they are released */
+   private final Map containersBeingReturned;
+
+   /** The YARN / Hadoop configuration object */
+   private final YarnConfiguration yarnConfig;
+
+   /** The TaskManager container parameters (like container memory size) */
+   private final ContaineredTaskManagerParameters taskManagerParameters;
+
+   /** Context information used to start a TaskManager Java process */
+   private final ContainerLaunchContext taskManagerLaunchContext;
+
+   /** Host name for the container running this process */
+   private final String applicationMasterHostName;
+
+   /** Web interface URL, may be null */
+   private final String webInterfaceURL;
+
+   /** Default heartbeat interval between this actor and the YARN resource 
manager */
+   private final 

[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15206815#comment-15206815
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r57028233
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -312,59 +323,121 @@ class JobManager(
 
   leaderSessionID = None
 
-case RegisterTaskManager(
-  connectionInfo,
-  hardwareInformation,
-  numberOfSlots) =>
+case msg: RegisterResourceManager =>
+  log.debug(s"Resource manager registration: $msg")
+
+  // ditch current resource manager (if any)
+  currentResourceManager = Option(msg.resourceManager())
+
+  val taskManagerResources = 
instanceManager.getAllRegisteredInstances.asScala.map(
+instance => instance.getResourceId).toList.asJava
+
+  // confirm registration and send known task managers with their 
resource ids
+  sender ! decorateMessage(new RegisterResourceManagerSuccessful(self, 
taskManagerResources))
+
+case msg: DisconnectResourceManager =>
+  log.debug(s"Resource manager disconnect: $msg")
+
+  currentResourceManager match {
+case Some(rm) if rm.equals(msg.resourceManager()) =>
+  // we should ditch the current resource manager
+  log.debug(s"Disconnecting resource manager $rm.")
+  // send the old one a disconnect message
+  rm ! decorateMessage(new TriggerRegistrationAtJobManager(self))
+  currentResourceManager = None
+case None =>
+  // not connected, thus ignoring this message
+  log.warn(s"No resource manager ${msg.resourceManager()} 
connected. Can't disconnect.")
+  }
+
+case msg @ RegisterTaskManager(
+  resourceId,
+  connectionInfo,
+  hardwareInformation,
+  numberOfSlots) =>
+  // we are being informed by the ResourceManager that a new task 
manager is available
+  log.debug(s"RegisterTaskManager: $msg")
 
   val taskManager = sender()
 
+  currentResourceManager match {
+case Some(rm) =>
+  val future = (rm ? decorateMessage(new 
RegisterResource(taskManager, msg)))(timeout)
+  future.onComplete {
+case scala.util.Success(response) =>
+  // the resource manager is available and answered
+  self ! response
+case scala.util.Failure(t) =>
--- End diff --

Could be good to log the failure.


> ResourceManager runtime components
> --
>
> Key: FLINK-3544
> URL: https://issues.apache.org/jira/browse/FLINK-3544
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Affects Versions: 1.1.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
> Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3544] Introduce ResourceManager compone...

2016-03-22 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r57028233
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -312,59 +323,121 @@ class JobManager(
 
   leaderSessionID = None
 
-case RegisterTaskManager(
-  connectionInfo,
-  hardwareInformation,
-  numberOfSlots) =>
+case msg: RegisterResourceManager =>
+  log.debug(s"Resource manager registration: $msg")
+
+  // ditch current resource manager (if any)
+  currentResourceManager = Option(msg.resourceManager())
+
+  val taskManagerResources = 
instanceManager.getAllRegisteredInstances.asScala.map(
+instance => instance.getResourceId).toList.asJava
+
+  // confirm registration and send known task managers with their 
resource ids
+  sender ! decorateMessage(new RegisterResourceManagerSuccessful(self, 
taskManagerResources))
+
+case msg: DisconnectResourceManager =>
+  log.debug(s"Resource manager disconnect: $msg")
+
+  currentResourceManager match {
+case Some(rm) if rm.equals(msg.resourceManager()) =>
+  // we should ditch the current resource manager
+  log.debug(s"Disconnecting resource manager $rm.")
+  // send the old one a disconnect message
+  rm ! decorateMessage(new TriggerRegistrationAtJobManager(self))
+  currentResourceManager = None
+case None =>
+  // not connected, thus ignoring this message
+  log.warn(s"No resource manager ${msg.resourceManager()} 
connected. Can't disconnect.")
+  }
+
+case msg @ RegisterTaskManager(
+  resourceId,
+  connectionInfo,
+  hardwareInformation,
+  numberOfSlots) =>
+  // we are being informed by the ResourceManager that a new task 
manager is available
+  log.debug(s"RegisterTaskManager: $msg")
 
   val taskManager = sender()
 
+  currentResourceManager match {
+case Some(rm) =>
+  val future = (rm ? decorateMessage(new 
RegisterResource(taskManager, msg)))(timeout)
+  future.onComplete {
+case scala.util.Success(response) =>
+  // the resource manager is available and answered
+  self ! response
+case scala.util.Failure(t) =>
--- End diff --

Could be good to log the failure.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2821) Change Akka configuration to allow accessing actors from different URLs

2016-03-22 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2821?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15206810#comment-15206810
 ] 

Stephan Ewen commented on FLINK-2821:
-

How about the following solution (multiple steps):

  - We make the default Scala version from Flink 1.1 on Scala 2.11. That 
version is stable and many people seem to use it anyways (my impression from 
the mailing lists)
  - We make the default Akka version Akka 2.4.x
  - The "change-scala-version.sh" script needs to adjust both Scala and Akka 
versions.

That way, the default (don't care which Scala version) releases would have a 
fix.
Specially downgraded versions would have the restriction.

> Change Akka configuration to allow accessing actors from different URLs
> ---
>
> Key: FLINK-2821
> URL: https://issues.apache.org/jira/browse/FLINK-2821
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Runtime
>Reporter: Robert Metzger
>Assignee: Maximilian Michels
>
> Akka expects the actor's URL to be exactly matching.
> As pointed out here, cases where users were complaining about this: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Error-trying-to-access-JM-through-proxy-td3018.html
>   - Proxy routing (as described here, send to the proxy URL, receiver 
> recognizes only original URL)
>   - Using hostname / IP interchangeably does not work (we solved this by 
> always putting IP addresses into URLs, never hostnames)
>   - Binding to multiple interfaces (any local 0.0.0.0) does not work. Still 
> no solution to that (but seems not too much of a restriction)
> I am aware that this is not possible due to Akka, so it is actually not a 
> Flink bug. But I think we should track the resolution of the issue here 
> anyways because its affecting our user's satisfaction.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-3654) Disable Write-Ahead-Log in RocksDB State

2016-03-22 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3654?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-3654:

Component/s: (was: Sure)
 Streaming

> Disable Write-Ahead-Log in RocksDB State
> 
>
> Key: FLINK-3654
> URL: https://issues.apache.org/jira/browse/FLINK-3654
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>
> We do our own checkpointing of the RocksDB database so the WAL is useless to 
> us. Disabling writes to the WAL should give us a very large performance boost.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-3654) Disable Write-Ahead-Log in RocksDB State

2016-03-22 Thread Aljoscha Krettek (JIRA)
Aljoscha Krettek created FLINK-3654:
---

 Summary: Disable Write-Ahead-Log in RocksDB State
 Key: FLINK-3654
 URL: https://issues.apache.org/jira/browse/FLINK-3654
 Project: Flink
  Issue Type: Improvement
  Components: Sure
Reporter: Aljoscha Krettek
Assignee: Aljoscha Krettek


We do our own checkpointing of the RocksDB database so the WAL is useless to 
us. Disabling writes to the WAL should give us a very large performance boost.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3211) Add AWS Kinesis streaming connector

2016-03-22 Thread Robert Metzger (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15206777#comment-15206777
 ] 

Robert Metzger commented on FLINK-3211:
---

Great, thank you. I'm looking forward reviewing it.

> Add AWS Kinesis streaming connector
> ---
>
> Key: FLINK-3211
> URL: https://issues.apache.org/jira/browse/FLINK-3211
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> AWS Kinesis is a widely adopted message queue used by AWS users, much like a 
> cloud service version of Apache Kafka. Support for AWS Kinesis will be a 
> great addition to the handful of Flink's streaming connectors to external 
> systems and a great reach out to the AWS community.
> AWS supports two different ways to consume Kinesis data: with the low-level 
> AWS SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK 
> can be used to consume Kinesis data, including stream read beginning from a 
> specific offset (or "record sequence number" in Kinesis terminology). On the 
> other hand, AWS officially recommends using KCL, which offers a higher-level 
> of abstraction that also comes with checkpointing and failure recovery by 
> using a KCL-managed AWS DynamoDB "leash table" as the checkpoint state 
> storage.
> However, KCL is essentially a stream processing library that wraps all the 
> partition-to-task (or "shard" in Kinesis terminology) determination and 
> checkpointing to allow the user to focus only on streaming application logic. 
> This leads to the understanding that we can not use the KCL to implement the 
> Kinesis streaming connector if we are aiming for a deep integration of Flink 
> with Kinesis that provides exactly once guarantees (KCL promises only 
> at-least-once). Therefore, AWS SDK will be the way to go for the 
> implementation of this feature.
> With the ability to read from specific offsets, and also the fact that 
> Kinesis and Kafka share a lot of similarities, the basic principles of the 
> implementation of Flink's Kinesis streaming connector will very much resemble 
> the Kafka connector. We can basically follow the outlines described in 
> [~StephanEwen]'s description [3] and [~rmetzger]'s Kafka connector 
> implementation [4]. A few tweaks due to some of Kinesis v.s. Kafka 
> differences is described as following:
> 1. While the Kafka connector can support reading from multiple topics, I 
> currently don't think this is a good idea for Kinesis streams (a Kinesis 
> Stream is logically equivalent to a Kafka topic). Kinesis streams can exist 
> in different AWS regions, and each Kinesis stream under the same AWS user 
> account may have completely independent access settings with different 
> authorization keys. Overall, a Kinesis stream feels like a much more 
> consolidated resource compared to Kafka topics. It would be great to hear 
> more thoughts on this part.
> 2. While Kafka has brokers that can hold multiple partitions, the only 
> partitioning abstraction for AWS Kinesis is "shards". Therefore, in contrast 
> to the Kafka connector having per broker connections where the connections 
> can handle multiple Kafka partitions, the Kinesis connector will only need to 
> have simple per shard connections.
> 3. Kinesis itself does not support committing offsets back to Kinesis. If we 
> were to implement this feature like the Kafka connector with Kafka / ZK to 
> sync outside view of progress, we probably could use ZK or DynamoDB like the 
> way KCL works. More thoughts on this part will be very helpful too.
> As for the Kinesis Sink, it should be possible to use the AWS KPL (Kinesis 
> Producer Library) [5]. However, for higher code consistency with the proposed 
> Kinesis Consumer, I think it will be better to stick with the AWS SDK for the 
> implementation. The implementation should be straight forward, being almost 
> if not completely the same as the Kafka sink.
> References:
> [1] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-sdk.html
> [2] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html
> [3] 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-Connector-td2872.html
> [4] http://data-artisans.com/kafka-flink-a-practical-how-to/
> [5] 
> http://docs.aws.amazon.com//kinesis/latest/dev/developing-producers-with-kpl.html#d0e4998



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (FLINK-3211) Add AWS Kinesis streaming connector

2016-03-22 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15206770#comment-15206770
 ] 

Tzu-Li (Gordon) Tai edited comment on FLINK-3211 at 3/22/16 4:52 PM:
-

No problem :) I'll have it ready soon, hopefully by the end of March.


was (Author: tzulitai):
No problem :) I'll have it ready soon.

> Add AWS Kinesis streaming connector
> ---
>
> Key: FLINK-3211
> URL: https://issues.apache.org/jira/browse/FLINK-3211
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> AWS Kinesis is a widely adopted message queue used by AWS users, much like a 
> cloud service version of Apache Kafka. Support for AWS Kinesis will be a 
> great addition to the handful of Flink's streaming connectors to external 
> systems and a great reach out to the AWS community.
> AWS supports two different ways to consume Kinesis data: with the low-level 
> AWS SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK 
> can be used to consume Kinesis data, including stream read beginning from a 
> specific offset (or "record sequence number" in Kinesis terminology). On the 
> other hand, AWS officially recommends using KCL, which offers a higher-level 
> of abstraction that also comes with checkpointing and failure recovery by 
> using a KCL-managed AWS DynamoDB "leash table" as the checkpoint state 
> storage.
> However, KCL is essentially a stream processing library that wraps all the 
> partition-to-task (or "shard" in Kinesis terminology) determination and 
> checkpointing to allow the user to focus only on streaming application logic. 
> This leads to the understanding that we can not use the KCL to implement the 
> Kinesis streaming connector if we are aiming for a deep integration of Flink 
> with Kinesis that provides exactly once guarantees (KCL promises only 
> at-least-once). Therefore, AWS SDK will be the way to go for the 
> implementation of this feature.
> With the ability to read from specific offsets, and also the fact that 
> Kinesis and Kafka share a lot of similarities, the basic principles of the 
> implementation of Flink's Kinesis streaming connector will very much resemble 
> the Kafka connector. We can basically follow the outlines described in 
> [~StephanEwen]'s description [3] and [~rmetzger]'s Kafka connector 
> implementation [4]. A few tweaks due to some of Kinesis v.s. Kafka 
> differences is described as following:
> 1. While the Kafka connector can support reading from multiple topics, I 
> currently don't think this is a good idea for Kinesis streams (a Kinesis 
> Stream is logically equivalent to a Kafka topic). Kinesis streams can exist 
> in different AWS regions, and each Kinesis stream under the same AWS user 
> account may have completely independent access settings with different 
> authorization keys. Overall, a Kinesis stream feels like a much more 
> consolidated resource compared to Kafka topics. It would be great to hear 
> more thoughts on this part.
> 2. While Kafka has brokers that can hold multiple partitions, the only 
> partitioning abstraction for AWS Kinesis is "shards". Therefore, in contrast 
> to the Kafka connector having per broker connections where the connections 
> can handle multiple Kafka partitions, the Kinesis connector will only need to 
> have simple per shard connections.
> 3. Kinesis itself does not support committing offsets back to Kinesis. If we 
> were to implement this feature like the Kafka connector with Kafka / ZK to 
> sync outside view of progress, we probably could use ZK or DynamoDB like the 
> way KCL works. More thoughts on this part will be very helpful too.
> As for the Kinesis Sink, it should be possible to use the AWS KPL (Kinesis 
> Producer Library) [5]. However, for higher code consistency with the proposed 
> Kinesis Consumer, I think it will be better to stick with the AWS SDK for the 
> implementation. The implementation should be straight forward, being almost 
> if not completely the same as the Kafka sink.
> References:
> [1] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-sdk.html
> [2] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html
> [3] 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-Connector-td2872.html
> [4] http://data-artisans.com/kafka-flink-a-practical-how-to/
> [5] 
> http://docs.aws.amazon.com//kinesis/latest/dev/developing-producers-with-kpl.html#d0e4998



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (FLINK-3211) Add AWS Kinesis streaming connector

2016-03-22 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15206770#comment-15206770
 ] 

Tzu-Li (Gordon) Tai edited comment on FLINK-3211 at 3/22/16 4:51 PM:
-

No problem :) I'll have it ready soon.


was (Author: tzulitai):
No problem :)

> Add AWS Kinesis streaming connector
> ---
>
> Key: FLINK-3211
> URL: https://issues.apache.org/jira/browse/FLINK-3211
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> AWS Kinesis is a widely adopted message queue used by AWS users, much like a 
> cloud service version of Apache Kafka. Support for AWS Kinesis will be a 
> great addition to the handful of Flink's streaming connectors to external 
> systems and a great reach out to the AWS community.
> AWS supports two different ways to consume Kinesis data: with the low-level 
> AWS SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK 
> can be used to consume Kinesis data, including stream read beginning from a 
> specific offset (or "record sequence number" in Kinesis terminology). On the 
> other hand, AWS officially recommends using KCL, which offers a higher-level 
> of abstraction that also comes with checkpointing and failure recovery by 
> using a KCL-managed AWS DynamoDB "leash table" as the checkpoint state 
> storage.
> However, KCL is essentially a stream processing library that wraps all the 
> partition-to-task (or "shard" in Kinesis terminology) determination and 
> checkpointing to allow the user to focus only on streaming application logic. 
> This leads to the understanding that we can not use the KCL to implement the 
> Kinesis streaming connector if we are aiming for a deep integration of Flink 
> with Kinesis that provides exactly once guarantees (KCL promises only 
> at-least-once). Therefore, AWS SDK will be the way to go for the 
> implementation of this feature.
> With the ability to read from specific offsets, and also the fact that 
> Kinesis and Kafka share a lot of similarities, the basic principles of the 
> implementation of Flink's Kinesis streaming connector will very much resemble 
> the Kafka connector. We can basically follow the outlines described in 
> [~StephanEwen]'s description [3] and [~rmetzger]'s Kafka connector 
> implementation [4]. A few tweaks due to some of Kinesis v.s. Kafka 
> differences is described as following:
> 1. While the Kafka connector can support reading from multiple topics, I 
> currently don't think this is a good idea for Kinesis streams (a Kinesis 
> Stream is logically equivalent to a Kafka topic). Kinesis streams can exist 
> in different AWS regions, and each Kinesis stream under the same AWS user 
> account may have completely independent access settings with different 
> authorization keys. Overall, a Kinesis stream feels like a much more 
> consolidated resource compared to Kafka topics. It would be great to hear 
> more thoughts on this part.
> 2. While Kafka has brokers that can hold multiple partitions, the only 
> partitioning abstraction for AWS Kinesis is "shards". Therefore, in contrast 
> to the Kafka connector having per broker connections where the connections 
> can handle multiple Kafka partitions, the Kinesis connector will only need to 
> have simple per shard connections.
> 3. Kinesis itself does not support committing offsets back to Kinesis. If we 
> were to implement this feature like the Kafka connector with Kafka / ZK to 
> sync outside view of progress, we probably could use ZK or DynamoDB like the 
> way KCL works. More thoughts on this part will be very helpful too.
> As for the Kinesis Sink, it should be possible to use the AWS KPL (Kinesis 
> Producer Library) [5]. However, for higher code consistency with the proposed 
> Kinesis Consumer, I think it will be better to stick with the AWS SDK for the 
> implementation. The implementation should be straight forward, being almost 
> if not completely the same as the Kafka sink.
> References:
> [1] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-sdk.html
> [2] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html
> [3] 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-Connector-td2872.html
> [4] http://data-artisans.com/kafka-flink-a-practical-how-to/
> [5] 
> http://docs.aws.amazon.com//kinesis/latest/dev/developing-producers-with-kpl.html#d0e4998



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3211) Add AWS Kinesis streaming connector

2016-03-22 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15206770#comment-15206770
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-3211:


No problem :)

> Add AWS Kinesis streaming connector
> ---
>
> Key: FLINK-3211
> URL: https://issues.apache.org/jira/browse/FLINK-3211
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> AWS Kinesis is a widely adopted message queue used by AWS users, much like a 
> cloud service version of Apache Kafka. Support for AWS Kinesis will be a 
> great addition to the handful of Flink's streaming connectors to external 
> systems and a great reach out to the AWS community.
> AWS supports two different ways to consume Kinesis data: with the low-level 
> AWS SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK 
> can be used to consume Kinesis data, including stream read beginning from a 
> specific offset (or "record sequence number" in Kinesis terminology). On the 
> other hand, AWS officially recommends using KCL, which offers a higher-level 
> of abstraction that also comes with checkpointing and failure recovery by 
> using a KCL-managed AWS DynamoDB "leash table" as the checkpoint state 
> storage.
> However, KCL is essentially a stream processing library that wraps all the 
> partition-to-task (or "shard" in Kinesis terminology) determination and 
> checkpointing to allow the user to focus only on streaming application logic. 
> This leads to the understanding that we can not use the KCL to implement the 
> Kinesis streaming connector if we are aiming for a deep integration of Flink 
> with Kinesis that provides exactly once guarantees (KCL promises only 
> at-least-once). Therefore, AWS SDK will be the way to go for the 
> implementation of this feature.
> With the ability to read from specific offsets, and also the fact that 
> Kinesis and Kafka share a lot of similarities, the basic principles of the 
> implementation of Flink's Kinesis streaming connector will very much resemble 
> the Kafka connector. We can basically follow the outlines described in 
> [~StephanEwen]'s description [3] and [~rmetzger]'s Kafka connector 
> implementation [4]. A few tweaks due to some of Kinesis v.s. Kafka 
> differences is described as following:
> 1. While the Kafka connector can support reading from multiple topics, I 
> currently don't think this is a good idea for Kinesis streams (a Kinesis 
> Stream is logically equivalent to a Kafka topic). Kinesis streams can exist 
> in different AWS regions, and each Kinesis stream under the same AWS user 
> account may have completely independent access settings with different 
> authorization keys. Overall, a Kinesis stream feels like a much more 
> consolidated resource compared to Kafka topics. It would be great to hear 
> more thoughts on this part.
> 2. While Kafka has brokers that can hold multiple partitions, the only 
> partitioning abstraction for AWS Kinesis is "shards". Therefore, in contrast 
> to the Kafka connector having per broker connections where the connections 
> can handle multiple Kafka partitions, the Kinesis connector will only need to 
> have simple per shard connections.
> 3. Kinesis itself does not support committing offsets back to Kinesis. If we 
> were to implement this feature like the Kafka connector with Kafka / ZK to 
> sync outside view of progress, we probably could use ZK or DynamoDB like the 
> way KCL works. More thoughts on this part will be very helpful too.
> As for the Kinesis Sink, it should be possible to use the AWS KPL (Kinesis 
> Producer Library) [5]. However, for higher code consistency with the proposed 
> Kinesis Consumer, I think it will be better to stick with the AWS SDK for the 
> implementation. The implementation should be straight forward, being almost 
> if not completely the same as the Kafka sink.
> References:
> [1] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-sdk.html
> [2] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html
> [3] 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-Connector-td2872.html
> [4] http://data-artisans.com/kafka-flink-a-practical-how-to/
> [5] 
> http://docs.aws.amazon.com//kinesis/latest/dev/developing-producers-with-kpl.html#d0e4998



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3579) Improve String concatenation

2016-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15206764#comment-15206764
 ] 

ASF GitHub Bot commented on FLINK-3579:
---

Github user ramkrish86 commented on the pull request:

https://github.com/apache/flink/pull/1821#issuecomment-199900035
  
Updated PR. RexNodeTranslator had > 100 character line. That is now 
corrected. 


> Improve String concatenation
> 
>
> Key: FLINK-3579
> URL: https://issues.apache.org/jira/browse/FLINK-3579
> Project: Flink
>  Issue Type: Bug
>  Components: Table API
>Reporter: Timo Walther
>Assignee: ramkrishna.s.vasudevan
>Priority: Minor
>
> Concatenation of a String and non-String does not work properly.
> e.g. {{f0 + 42}} leads to RelBuilder Exception
> ExpressionParser does not like {{f0 + 42.cast(STRING)}} either.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: FLINK-3579 Improve String concatenation (Ram)

2016-03-22 Thread ramkrish86
Github user ramkrish86 commented on the pull request:

https://github.com/apache/flink/pull/1821#issuecomment-199900035
  
Updated PR. RexNodeTranslator had > 100 character line. That is now 
corrected. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15206695#comment-15206695
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r57017514
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java ---
@@ -0,0 +1,647 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import akka.actor.ActorRef;
+import akka.actor.Props;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.clusterframework.messages.StopCluster;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.yarn.messages.ContainersAllocated;
+import org.apache.flink.yarn.messages.ContainersComplete;
+
+import 
org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.NMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+import org.slf4j.Logger;
+
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Specialized Flink Resource Manager implementation for YARN clusters. It 
is started as the
+ * YARN ApplicationMaster and implements the YARN-specific logic for 
container requests and failure
+ * monitoring.
+ */
+public class YarnFlinkResourceManager extends 
FlinkResourceManager {
+   
+   /** The heartbeat interval while the resource master is waiting for 
containers */
+   private static final int FAST_YARN_HEARTBEAT_INTERVAL_MS = 500;
+
+   /** The default heartbeat interval during regular operation */
+   private static final int DEFAULT_YARN_HEARTBEAT_INTERVAL_MS = 5000;
+
+   /** The containers where a TaskManager is starting and we are waiting 
for it to register */
+   private final Map containersInLaunch;
+
+   /** Containers we have released, where we are waiting for an 
acknowledgement that
+* they are released */
+   private final Map containersBeingReturned;
+
+   /** The YARN / Hadoop configuration object */
+   private final YarnConfiguration yarnConfig;
+
+   /** The TaskManager container parameters (like container memory size) */
+   private final ContaineredTaskManagerParameters taskManagerParameters;
+
+   /** Context information used to start a TaskManager Java process */
+   private final ContainerLaunchContext taskManagerLaunchContext;
+
+   /** Host name for the container running this process */
+   p

[jira] [Commented] (FLINK-3211) Add AWS Kinesis streaming connector

2016-03-22 Thread Robert Metzger (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15206755#comment-15206755
 ] 

Robert Metzger commented on FLINK-3211:
---

Hi Tzu,
thank you for your response. I'm happy to see that you are concerned about 
having good tests for the code. We have those tests for all connectors which 
connect with an open source system, running on the JVM.

But for example our RabbitMQ connector also doesn't contain any tests 
connecting to a real RMQ. There are some tests using a mocked RMQ.
Its fine to follow the same approach with the Kinesis connector. Lets just 
manually test the code on AWS.

Please open a pull request as soon as you have it ready so that we can start 
reviewing it. It would be nice to put it into the Flink 1.1 release.



> Add AWS Kinesis streaming connector
> ---
>
> Key: FLINK-3211
> URL: https://issues.apache.org/jira/browse/FLINK-3211
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> AWS Kinesis is a widely adopted message queue used by AWS users, much like a 
> cloud service version of Apache Kafka. Support for AWS Kinesis will be a 
> great addition to the handful of Flink's streaming connectors to external 
> systems and a great reach out to the AWS community.
> AWS supports two different ways to consume Kinesis data: with the low-level 
> AWS SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK 
> can be used to consume Kinesis data, including stream read beginning from a 
> specific offset (or "record sequence number" in Kinesis terminology). On the 
> other hand, AWS officially recommends using KCL, which offers a higher-level 
> of abstraction that also comes with checkpointing and failure recovery by 
> using a KCL-managed AWS DynamoDB "leash table" as the checkpoint state 
> storage.
> However, KCL is essentially a stream processing library that wraps all the 
> partition-to-task (or "shard" in Kinesis terminology) determination and 
> checkpointing to allow the user to focus only on streaming application logic. 
> This leads to the understanding that we can not use the KCL to implement the 
> Kinesis streaming connector if we are aiming for a deep integration of Flink 
> with Kinesis that provides exactly once guarantees (KCL promises only 
> at-least-once). Therefore, AWS SDK will be the way to go for the 
> implementation of this feature.
> With the ability to read from specific offsets, and also the fact that 
> Kinesis and Kafka share a lot of similarities, the basic principles of the 
> implementation of Flink's Kinesis streaming connector will very much resemble 
> the Kafka connector. We can basically follow the outlines described in 
> [~StephanEwen]'s description [3] and [~rmetzger]'s Kafka connector 
> implementation [4]. A few tweaks due to some of Kinesis v.s. Kafka 
> differences is described as following:
> 1. While the Kafka connector can support reading from multiple topics, I 
> currently don't think this is a good idea for Kinesis streams (a Kinesis 
> Stream is logically equivalent to a Kafka topic). Kinesis streams can exist 
> in different AWS regions, and each Kinesis stream under the same AWS user 
> account may have completely independent access settings with different 
> authorization keys. Overall, a Kinesis stream feels like a much more 
> consolidated resource compared to Kafka topics. It would be great to hear 
> more thoughts on this part.
> 2. While Kafka has brokers that can hold multiple partitions, the only 
> partitioning abstraction for AWS Kinesis is "shards". Therefore, in contrast 
> to the Kafka connector having per broker connections where the connections 
> can handle multiple Kafka partitions, the Kinesis connector will only need to 
> have simple per shard connections.
> 3. Kinesis itself does not support committing offsets back to Kinesis. If we 
> were to implement this feature like the Kafka connector with Kafka / ZK to 
> sync outside view of progress, we probably could use ZK or DynamoDB like the 
> way KCL works. More thoughts on this part will be very helpful too.
> As for the Kinesis Sink, it should be possible to use the AWS KPL (Kinesis 
> Producer Library) [5]. However, for higher code consistency with the proposed 
> Kinesis Consumer, I think it will be better to stick with the AWS SDK for the 
> implementation. The implementation should be straight forward, being almost 
> if not completely the same as the Kafka sink.
> References:
> [1] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-sdk.html
> [2] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-co

[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15206753#comment-15206753
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r57021932
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerITCase.java
 ---
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager;
+
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.messages.StopCluster;
+import 
org.apache.flink.runtime.clusterframework.messages.StopClusterSuccessful;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.InstanceConnectionInfo;
+import org.apache.flink.runtime.messages.Messages;
+import org.apache.flink.runtime.messages.RegistrationMessages;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testutils.TestingResourceManager;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import scala.Option;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+
+public class ResourceManagerITCase {
+
+   private static ActorSystem system;
+
+   private static Configuration config = new Configuration();
+
+   @Before
+   public void setup() {
+   system = 
AkkaUtils.createActorSystem(AkkaUtils.getDefaultAkkaConfig());
+   }
+
+   @After
+   public void teardown() {
+   JavaTestKit.shutdownActorSystem(system);
+   }
+
+   /**
+* Tests whether the resource manager connects and reconciles existing 
task managers.
+*/
+   @Test
+   public void testResourceManagerReconciliation() {
+
+   new JavaTestKit(system){{
+   new Within(duration("10 seconds")) {
+   @Override
+   protected void run() {
+
+   ActorGateway jobManager = 
TestingUtils.createJobManager(system, config);
+   ActorGateway me =
+   TestingUtils.createForwardingActor(system, 
getTestActor(), Option.empty());
+
+   // !! no resource manager started !!
+
+   ResourceID resourceID = ResourceID.generate();
+
+   jobManager.tell(
+   new RegistrationMessages.RegisterTaskManager(
+   resourceID,
+   
Mockito.mock(InstanceConnectionInfo.class),
+   null,
+   1),
+   me);
+
+   
expectMsgClass(RegistrationMessages.AcknowledgeRegistration.class);
+
+   // now start the resource manager
+   ActorGateway resourceManager =
+   TestingUtils.createResourceManager(system, 
jobManager.actor(), config);
+
+   // register at testing job manager to receive a message 
once a resource manager registers
+   resourceManager.tell(new 
TestingResourceManager.NotifyWhenResourceManagerConnected(), me);
+
+   // Wait for resource manager
+   expectMsgEquals(Messages.getAcknowledge());
+
+   // check if we registered the task manager resource
+  

[GitHub] flink pull request: [FLINK-3544] Introduce ResourceManager compone...

2016-03-22 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r57021932
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerITCase.java
 ---
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager;
+
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.messages.StopCluster;
+import 
org.apache.flink.runtime.clusterframework.messages.StopClusterSuccessful;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.InstanceConnectionInfo;
+import org.apache.flink.runtime.messages.Messages;
+import org.apache.flink.runtime.messages.RegistrationMessages;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testutils.TestingResourceManager;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import scala.Option;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+
+public class ResourceManagerITCase {
+
+   private static ActorSystem system;
+
+   private static Configuration config = new Configuration();
+
+   @Before
+   public void setup() {
+   system = 
AkkaUtils.createActorSystem(AkkaUtils.getDefaultAkkaConfig());
+   }
+
+   @After
+   public void teardown() {
+   JavaTestKit.shutdownActorSystem(system);
+   }
+
+   /**
+* Tests whether the resource manager connects and reconciles existing 
task managers.
+*/
+   @Test
+   public void testResourceManagerReconciliation() {
+
+   new JavaTestKit(system){{
+   new Within(duration("10 seconds")) {
+   @Override
+   protected void run() {
+
+   ActorGateway jobManager = 
TestingUtils.createJobManager(system, config);
+   ActorGateway me =
+   TestingUtils.createForwardingActor(system, 
getTestActor(), Option.empty());
+
+   // !! no resource manager started !!
+
+   ResourceID resourceID = ResourceID.generate();
+
+   jobManager.tell(
+   new RegistrationMessages.RegisterTaskManager(
+   resourceID,
+   
Mockito.mock(InstanceConnectionInfo.class),
+   null,
+   1),
+   me);
+
+   
expectMsgClass(RegistrationMessages.AcknowledgeRegistration.class);
+
+   // now start the resource manager
+   ActorGateway resourceManager =
+   TestingUtils.createResourceManager(system, 
jobManager.actor(), config);
+
+   // register at testing job manager to receive a message 
once a resource manager registers
+   resourceManager.tell(new 
TestingResourceManager.NotifyWhenResourceManagerConnected(), me);
+
+   // Wait for resource manager
+   expectMsgEquals(Messages.getAcknowledge());
+
+   // check if we registered the task manager resource
+   resourceManager.tell(new 
TestingResourceManager.GetRegisteredResources(), me);
+
+   TestingResourceManager.GetRegisteredResourcesReply 
reply =
+   
expectMsgClass(TestingResourceManager.GetRegiste

[jira] [Commented] (FLINK-3579) Improve String concatenation

2016-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15206750#comment-15206750
 ] 

ASF GitHub Bot commented on FLINK-3579:
---

Github user ramkrish86 commented on the pull request:

https://github.com/apache/flink/pull/1821#issuecomment-199897376
  
@vasia 
Thanks for the info. Let me check that and update the PR.


> Improve String concatenation
> 
>
> Key: FLINK-3579
> URL: https://issues.apache.org/jira/browse/FLINK-3579
> Project: Flink
>  Issue Type: Bug
>  Components: Table API
>Reporter: Timo Walther
>Assignee: ramkrishna.s.vasudevan
>Priority: Minor
>
> Concatenation of a String and non-String does not work properly.
> e.g. {{f0 + 42}} leads to RelBuilder Exception
> ExpressionParser does not like {{f0 + 42.cast(STRING)}} either.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: FLINK-3579 Improve String concatenation (Ram)

2016-03-22 Thread ramkrish86
Github user ramkrish86 commented on the pull request:

https://github.com/apache/flink/pull/1821#issuecomment-199897376
  
@vasia 
Thanks for the info. Let me check that and update the PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3211) Add AWS Kinesis streaming connector

2016-03-22 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15206739#comment-15206739
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-3211:


Hi, thanks Roy.

I don't think using the actual Kinesis API will be feasible since the tests 
will be part of the build.
I did take a look at kinesalite at first and gave it a try, but was still quite 
tedious to integrate it into the code base for the sake of tests since it isn't 
Java/Scala.

I am still quite new and unfamiliar with PR to the Flink community, and was 
uncertain of sending out the PR before all tests were properly covered.
Nevertheless, if integration tests aren't too much of an issue for the 
connector and the connector is urgently needed, I can start polishing things up 
and prepare the PR!

> Add AWS Kinesis streaming connector
> ---
>
> Key: FLINK-3211
> URL: https://issues.apache.org/jira/browse/FLINK-3211
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> AWS Kinesis is a widely adopted message queue used by AWS users, much like a 
> cloud service version of Apache Kafka. Support for AWS Kinesis will be a 
> great addition to the handful of Flink's streaming connectors to external 
> systems and a great reach out to the AWS community.
> AWS supports two different ways to consume Kinesis data: with the low-level 
> AWS SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK 
> can be used to consume Kinesis data, including stream read beginning from a 
> specific offset (or "record sequence number" in Kinesis terminology). On the 
> other hand, AWS officially recommends using KCL, which offers a higher-level 
> of abstraction that also comes with checkpointing and failure recovery by 
> using a KCL-managed AWS DynamoDB "leash table" as the checkpoint state 
> storage.
> However, KCL is essentially a stream processing library that wraps all the 
> partition-to-task (or "shard" in Kinesis terminology) determination and 
> checkpointing to allow the user to focus only on streaming application logic. 
> This leads to the understanding that we can not use the KCL to implement the 
> Kinesis streaming connector if we are aiming for a deep integration of Flink 
> with Kinesis that provides exactly once guarantees (KCL promises only 
> at-least-once). Therefore, AWS SDK will be the way to go for the 
> implementation of this feature.
> With the ability to read from specific offsets, and also the fact that 
> Kinesis and Kafka share a lot of similarities, the basic principles of the 
> implementation of Flink's Kinesis streaming connector will very much resemble 
> the Kafka connector. We can basically follow the outlines described in 
> [~StephanEwen]'s description [3] and [~rmetzger]'s Kafka connector 
> implementation [4]. A few tweaks due to some of Kinesis v.s. Kafka 
> differences is described as following:
> 1. While the Kafka connector can support reading from multiple topics, I 
> currently don't think this is a good idea for Kinesis streams (a Kinesis 
> Stream is logically equivalent to a Kafka topic). Kinesis streams can exist 
> in different AWS regions, and each Kinesis stream under the same AWS user 
> account may have completely independent access settings with different 
> authorization keys. Overall, a Kinesis stream feels like a much more 
> consolidated resource compared to Kafka topics. It would be great to hear 
> more thoughts on this part.
> 2. While Kafka has brokers that can hold multiple partitions, the only 
> partitioning abstraction for AWS Kinesis is "shards". Therefore, in contrast 
> to the Kafka connector having per broker connections where the connections 
> can handle multiple Kafka partitions, the Kinesis connector will only need to 
> have simple per shard connections.
> 3. Kinesis itself does not support committing offsets back to Kinesis. If we 
> were to implement this feature like the Kafka connector with Kafka / ZK to 
> sync outside view of progress, we probably could use ZK or DynamoDB like the 
> way KCL works. More thoughts on this part will be very helpful too.
> As for the Kinesis Sink, it should be possible to use the AWS KPL (Kinesis 
> Producer Library) [5]. However, for higher code consistency with the proposed 
> Kinesis Consumer, I think it will be better to stick with the AWS SDK for the 
> implementation. The implementation should be straight forward, being almost 
> if not completely the same as the Kafka sink.
> References:
> [1] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-sdk.html
> [2] 
> http://docs.aws.amazon.com/kinesis/latest/dev

[jira] [Commented] (FLINK-2946) Add orderBy() to Table API

2016-03-22 Thread Dawid Wysakowicz (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15206731#comment-15206731
 ] 

Dawid Wysakowicz commented on FLINK-2946:
-

Hi, I have started working on this issue and would be really gratefull if 
someone could have a quick look on what I did so far. Did I go the right 
direction? Also I am not sure how to approach the 
parralelism/sortPartition/partitionByRange issue. Generally any advices welcome.

The changes are here: https://github.com/dawidwys/flink/tree/tableSort

> Add orderBy() to Table API
> --
>
> Key: FLINK-2946
> URL: https://issues.apache.org/jira/browse/FLINK-2946
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API
>Reporter: Timo Walther
>Assignee: Dawid Wysakowicz
>
> In order to implement a FLINK-2099 prototype that uses the Table APIs code 
> generation facilities, the Table API needs a sorting feature.
> I would implement it the next days. Ideas how to implement such a sorting 
> feature are very welcome. Is there any more efficient way instead of 
> {{.sortPartition(...).setParallism(1)}}? Is it better to sort locally on the 
> nodes first and finally sort on one node afterwards?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3211) Add AWS Kinesis streaming connector

2016-03-22 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15206709#comment-15206709
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-3211:


Hi Roy & Robert

Apologies for the late reply.
I already have a working version that I have been using in non-production 
environments for a while now.
The main reason for not sending a PR yet is because of the lack of tests for 
the Kinesis producer. The main issue is with integration tests, which is hard 
since Kinesis is an AWS service and doesn't come with a local version.

If this isn't too much of an issue, I can surely PR soon!

> Add AWS Kinesis streaming connector
> ---
>
> Key: FLINK-3211
> URL: https://issues.apache.org/jira/browse/FLINK-3211
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> AWS Kinesis is a widely adopted message queue used by AWS users, much like a 
> cloud service version of Apache Kafka. Support for AWS Kinesis will be a 
> great addition to the handful of Flink's streaming connectors to external 
> systems and a great reach out to the AWS community.
> AWS supports two different ways to consume Kinesis data: with the low-level 
> AWS SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK 
> can be used to consume Kinesis data, including stream read beginning from a 
> specific offset (or "record sequence number" in Kinesis terminology). On the 
> other hand, AWS officially recommends using KCL, which offers a higher-level 
> of abstraction that also comes with checkpointing and failure recovery by 
> using a KCL-managed AWS DynamoDB "leash table" as the checkpoint state 
> storage.
> However, KCL is essentially a stream processing library that wraps all the 
> partition-to-task (or "shard" in Kinesis terminology) determination and 
> checkpointing to allow the user to focus only on streaming application logic. 
> This leads to the understanding that we can not use the KCL to implement the 
> Kinesis streaming connector if we are aiming for a deep integration of Flink 
> with Kinesis that provides exactly once guarantees (KCL promises only 
> at-least-once). Therefore, AWS SDK will be the way to go for the 
> implementation of this feature.
> With the ability to read from specific offsets, and also the fact that 
> Kinesis and Kafka share a lot of similarities, the basic principles of the 
> implementation of Flink's Kinesis streaming connector will very much resemble 
> the Kafka connector. We can basically follow the outlines described in 
> [~StephanEwen]'s description [3] and [~rmetzger]'s Kafka connector 
> implementation [4]. A few tweaks due to some of Kinesis v.s. Kafka 
> differences is described as following:
> 1. While the Kafka connector can support reading from multiple topics, I 
> currently don't think this is a good idea for Kinesis streams (a Kinesis 
> Stream is logically equivalent to a Kafka topic). Kinesis streams can exist 
> in different AWS regions, and each Kinesis stream under the same AWS user 
> account may have completely independent access settings with different 
> authorization keys. Overall, a Kinesis stream feels like a much more 
> consolidated resource compared to Kafka topics. It would be great to hear 
> more thoughts on this part.
> 2. While Kafka has brokers that can hold multiple partitions, the only 
> partitioning abstraction for AWS Kinesis is "shards". Therefore, in contrast 
> to the Kafka connector having per broker connections where the connections 
> can handle multiple Kafka partitions, the Kinesis connector will only need to 
> have simple per shard connections.
> 3. Kinesis itself does not support committing offsets back to Kinesis. If we 
> were to implement this feature like the Kafka connector with Kafka / ZK to 
> sync outside view of progress, we probably could use ZK or DynamoDB like the 
> way KCL works. More thoughts on this part will be very helpful too.
> As for the Kinesis Sink, it should be possible to use the AWS KPL (Kinesis 
> Producer Library) [5]. However, for higher code consistency with the proposed 
> Kinesis Consumer, I think it will be better to stick with the AWS SDK for the 
> implementation. The implementation should be straight forward, being almost 
> if not completely the same as the Kafka sink.
> References:
> [1] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-sdk.html
> [2] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html
> [3] 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-Connector-td2872.html
> [4] http://data-artisans.com/kafka

[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15206703#comment-15206703
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r57018050
  
--- Diff: 
flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala ---
@@ -115,7 +115,7 @@ class ApplicationClient(
   val jobManager = context.actorSelection(jobManagerAkkaURL)
 
   jobManager ! decorateMessage(
-RegisterApplicationClient
+RegisterInfoMessageListener.get()
--- End diff --

`getInstance`?


> ResourceManager runtime components
> --
>
> Key: FLINK-3544
> URL: https://issues.apache.org/jira/browse/FLINK-3544
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Affects Versions: 1.1.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
> Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3211) Add AWS Kinesis streaming connector

2016-03-22 Thread Roy Ben-Alta (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15206699#comment-15206699
 ] 

Roy Ben-Alta commented on FLINK-3211:
-

Hi Robert,

I am the Business Development Manager for Amazon Kinesis and it will be great 
to have Apache Flink connector to Amazon Kinesis.Please send me email to 
benal...@amazon.com and we can chat over emails. Will be happy to provide you 
useful collateral in regards to Amazon Kinesis streams.

> Add AWS Kinesis streaming connector
> ---
>
> Key: FLINK-3211
> URL: https://issues.apache.org/jira/browse/FLINK-3211
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> AWS Kinesis is a widely adopted message queue used by AWS users, much like a 
> cloud service version of Apache Kafka. Support for AWS Kinesis will be a 
> great addition to the handful of Flink's streaming connectors to external 
> systems and a great reach out to the AWS community.
> AWS supports two different ways to consume Kinesis data: with the low-level 
> AWS SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK 
> can be used to consume Kinesis data, including stream read beginning from a 
> specific offset (or "record sequence number" in Kinesis terminology). On the 
> other hand, AWS officially recommends using KCL, which offers a higher-level 
> of abstraction that also comes with checkpointing and failure recovery by 
> using a KCL-managed AWS DynamoDB "leash table" as the checkpoint state 
> storage.
> However, KCL is essentially a stream processing library that wraps all the 
> partition-to-task (or "shard" in Kinesis terminology) determination and 
> checkpointing to allow the user to focus only on streaming application logic. 
> This leads to the understanding that we can not use the KCL to implement the 
> Kinesis streaming connector if we are aiming for a deep integration of Flink 
> with Kinesis that provides exactly once guarantees (KCL promises only 
> at-least-once). Therefore, AWS SDK will be the way to go for the 
> implementation of this feature.
> With the ability to read from specific offsets, and also the fact that 
> Kinesis and Kafka share a lot of similarities, the basic principles of the 
> implementation of Flink's Kinesis streaming connector will very much resemble 
> the Kafka connector. We can basically follow the outlines described in 
> [~StephanEwen]'s description [3] and [~rmetzger]'s Kafka connector 
> implementation [4]. A few tweaks due to some of Kinesis v.s. Kafka 
> differences is described as following:
> 1. While the Kafka connector can support reading from multiple topics, I 
> currently don't think this is a good idea for Kinesis streams (a Kinesis 
> Stream is logically equivalent to a Kafka topic). Kinesis streams can exist 
> in different AWS regions, and each Kinesis stream under the same AWS user 
> account may have completely independent access settings with different 
> authorization keys. Overall, a Kinesis stream feels like a much more 
> consolidated resource compared to Kafka topics. It would be great to hear 
> more thoughts on this part.
> 2. While Kafka has brokers that can hold multiple partitions, the only 
> partitioning abstraction for AWS Kinesis is "shards". Therefore, in contrast 
> to the Kafka connector having per broker connections where the connections 
> can handle multiple Kafka partitions, the Kinesis connector will only need to 
> have simple per shard connections.
> 3. Kinesis itself does not support committing offsets back to Kinesis. If we 
> were to implement this feature like the Kafka connector with Kafka / ZK to 
> sync outside view of progress, we probably could use ZK or DynamoDB like the 
> way KCL works. More thoughts on this part will be very helpful too.
> As for the Kinesis Sink, it should be possible to use the AWS KPL (Kinesis 
> Producer Library) [5]. However, for higher code consistency with the proposed 
> Kinesis Consumer, I think it will be better to stick with the AWS SDK for the 
> implementation. The implementation should be straight forward, being almost 
> if not completely the same as the Kafka sink.
> References:
> [1] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-sdk.html
> [2] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html
> [3] 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-Connector-td2872.html
> [4] http://data-artisans.com/kafka-flink-a-practical-how-to/
> [5] 
> http://docs.aws.amazon.com//kinesis/latest/dev/developing-producers-with-kpl.html#d0e4998



--
This message was sent

[jira] [Commented] (FLINK-3211) Add AWS Kinesis streaming connector

2016-03-22 Thread Roy Ben-Alta (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15206716#comment-15206716
 ] 

Roy Ben-Alta commented on FLINK-3211:
-

Hi Tzu,

There is no Kinesis local exist but there is one I found on Git: 
https://github.com/mhart/kinesalite
However, you should not have issue to test it on AWS by using Kinesis API (You 
can create stream with x number of shards and delete once your test are 
completed).
Feel free to contact me at benaltar@amazon and I will be happy to provide you 
with some credits if needed for the sake of the test.

Roy.


> Add AWS Kinesis streaming connector
> ---
>
> Key: FLINK-3211
> URL: https://issues.apache.org/jira/browse/FLINK-3211
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> AWS Kinesis is a widely adopted message queue used by AWS users, much like a 
> cloud service version of Apache Kafka. Support for AWS Kinesis will be a 
> great addition to the handful of Flink's streaming connectors to external 
> systems and a great reach out to the AWS community.
> AWS supports two different ways to consume Kinesis data: with the low-level 
> AWS SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK 
> can be used to consume Kinesis data, including stream read beginning from a 
> specific offset (or "record sequence number" in Kinesis terminology). On the 
> other hand, AWS officially recommends using KCL, which offers a higher-level 
> of abstraction that also comes with checkpointing and failure recovery by 
> using a KCL-managed AWS DynamoDB "leash table" as the checkpoint state 
> storage.
> However, KCL is essentially a stream processing library that wraps all the 
> partition-to-task (or "shard" in Kinesis terminology) determination and 
> checkpointing to allow the user to focus only on streaming application logic. 
> This leads to the understanding that we can not use the KCL to implement the 
> Kinesis streaming connector if we are aiming for a deep integration of Flink 
> with Kinesis that provides exactly once guarantees (KCL promises only 
> at-least-once). Therefore, AWS SDK will be the way to go for the 
> implementation of this feature.
> With the ability to read from specific offsets, and also the fact that 
> Kinesis and Kafka share a lot of similarities, the basic principles of the 
> implementation of Flink's Kinesis streaming connector will very much resemble 
> the Kafka connector. We can basically follow the outlines described in 
> [~StephanEwen]'s description [3] and [~rmetzger]'s Kafka connector 
> implementation [4]. A few tweaks due to some of Kinesis v.s. Kafka 
> differences is described as following:
> 1. While the Kafka connector can support reading from multiple topics, I 
> currently don't think this is a good idea for Kinesis streams (a Kinesis 
> Stream is logically equivalent to a Kafka topic). Kinesis streams can exist 
> in different AWS regions, and each Kinesis stream under the same AWS user 
> account may have completely independent access settings with different 
> authorization keys. Overall, a Kinesis stream feels like a much more 
> consolidated resource compared to Kafka topics. It would be great to hear 
> more thoughts on this part.
> 2. While Kafka has brokers that can hold multiple partitions, the only 
> partitioning abstraction for AWS Kinesis is "shards". Therefore, in contrast 
> to the Kafka connector having per broker connections where the connections 
> can handle multiple Kafka partitions, the Kinesis connector will only need to 
> have simple per shard connections.
> 3. Kinesis itself does not support committing offsets back to Kinesis. If we 
> were to implement this feature like the Kafka connector with Kafka / ZK to 
> sync outside view of progress, we probably could use ZK or DynamoDB like the 
> way KCL works. More thoughts on this part will be very helpful too.
> As for the Kinesis Sink, it should be possible to use the AWS KPL (Kinesis 
> Producer Library) [5]. However, for higher code consistency with the proposed 
> Kinesis Consumer, I think it will be better to stick with the AWS SDK for the 
> implementation. The implementation should be straight forward, being almost 
> if not completely the same as the Kafka sink.
> References:
> [1] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-sdk.html
> [2] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html
> [3] 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-Connector-td2872.html
> [4] http://data-artisans.com/kafka-flink-a-practical-how-to/
> [5] 
> h

[GitHub] flink pull request: [FLINK-3544] Introduce ResourceManager compone...

2016-03-22 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r57017514
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java ---
@@ -0,0 +1,647 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import akka.actor.ActorRef;
+import akka.actor.Props;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.clusterframework.messages.StopCluster;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.yarn.messages.ContainersAllocated;
+import org.apache.flink.yarn.messages.ContainersComplete;
+
+import 
org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.NMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+import org.slf4j.Logger;
+
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Specialized Flink Resource Manager implementation for YARN clusters. It 
is started as the
+ * YARN ApplicationMaster and implements the YARN-specific logic for 
container requests and failure
+ * monitoring.
+ */
+public class YarnFlinkResourceManager extends 
FlinkResourceManager {
+   
+   /** The heartbeat interval while the resource master is waiting for 
containers */
+   private static final int FAST_YARN_HEARTBEAT_INTERVAL_MS = 500;
+
+   /** The default heartbeat interval during regular operation */
+   private static final int DEFAULT_YARN_HEARTBEAT_INTERVAL_MS = 5000;
+
+   /** The containers where a TaskManager is starting and we are waiting 
for it to register */
+   private final Map containersInLaunch;
+
+   /** Containers we have released, where we are waiting for an 
acknowledgement that
+* they are released */
+   private final Map containersBeingReturned;
+
+   /** The YARN / Hadoop configuration object */
+   private final YarnConfiguration yarnConfig;
+
+   /** The TaskManager container parameters (like container memory size) */
+   private final ContaineredTaskManagerParameters taskManagerParameters;
+
+   /** Context information used to start a TaskManager Java process */
+   private final ContainerLaunchContext taskManagerLaunchContext;
+
+   /** Host name for the container running this process */
+   private final String applicationMasterHostName;
+
+   /** Web interface URL, may be null */
+   private final String webInterfaceURL;
+
+   /** Default heartbeat interval between this actor and the YARN resource 
manager */
+   priva

[jira] [Commented] (FLINK-3653) recovery.zookeeper.storageDir is not documented on the configuration page

2016-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15206705#comment-15206705
 ] 

ASF GitHub Bot commented on FLINK-3653:
---

GitHub user stefanobaghino opened a pull request:

https://github.com/apache/flink/pull/1828

[FLINK-3653] recovery.zookeeper.storageDir is not documented on the 
configuration page

Also: minor _hotfix_ to the `state.backend.fs.checkpointdir` option 
description (punctuation, casing and formatting, _boyscout rule_)

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/radicalbit/flink 3653

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1828.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1828


commit 28f279ad70f1ac0316b770fab62a1c1568e0aac5
Author: Stefano Baghino 
Date:   2016-03-22T16:17:32Z

[FLINK-3653] recovery.zookeeper.storageDir is not documented on the 
configuration page




> recovery.zookeeper.storageDir is not documented on the configuration page
> -
>
> Key: FLINK-3653
> URL: https://issues.apache.org/jira/browse/FLINK-3653
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.0.0
>Reporter: Stefano Baghino
>Assignee: Stefano Baghino
>Priority: Minor
> Fix For: 1.1.0
>
>
> The {{recovery.zookeeper.storageDir}} option is documented in the HA page but 
> is missing from the configuration page. Since it's required for HA I think it 
> would be a good idea to have it on both pages.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15206704#comment-15206704
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r57018075
  
--- Diff: 
flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala ---
@@ -148,7 +148,7 @@ class ApplicationClient(
   INITIAL_POLLING_DELAY,
   WAIT_FOR_YARN_INTERVAL,
   yarnJobManager.get,
-  decorateMessage(PollYarnClusterStatus))
+  decorateMessage(GetClusterStatus.get()))
--- End diff --

`getInstance`?


> ResourceManager runtime components
> --
>
> Key: FLINK-3544
> URL: https://issues.apache.org/jira/browse/FLINK-3544
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Affects Versions: 1.1.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
> Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3544] Introduce ResourceManager compone...

2016-03-22 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r57018050
  
--- Diff: 
flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala ---
@@ -115,7 +115,7 @@ class ApplicationClient(
   val jobManager = context.actorSelection(jobManagerAkkaURL)
 
   jobManager ! decorateMessage(
-RegisterApplicationClient
+RegisterInfoMessageListener.get()
--- End diff --

`getInstance`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3653] recovery.zookeeper.storageDir is ...

2016-03-22 Thread stefanobaghino
GitHub user stefanobaghino opened a pull request:

https://github.com/apache/flink/pull/1828

[FLINK-3653] recovery.zookeeper.storageDir is not documented on the 
configuration page

Also: minor _hotfix_ to the `state.backend.fs.checkpointdir` option 
description (punctuation, casing and formatting, _boyscout rule_)

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/radicalbit/flink 3653

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1828.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1828


commit 28f279ad70f1ac0316b770fab62a1c1568e0aac5
Author: Stefano Baghino 
Date:   2016-03-22T16:17:32Z

[FLINK-3653] recovery.zookeeper.storageDir is not documented on the 
configuration page




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3544] Introduce ResourceManager compone...

2016-03-22 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r57018075
  
--- Diff: 
flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala ---
@@ -148,7 +148,7 @@ class ApplicationClient(
   INITIAL_POLLING_DELAY,
   WAIT_FOR_YARN_INTERVAL,
   yarnJobManager.get,
-  decorateMessage(PollYarnClusterStatus))
+  decorateMessage(GetClusterStatus.get()))
--- End diff --

`getInstance`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3211) Add AWS Kinesis streaming connector

2016-03-22 Thread Robert Metzger (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15206677#comment-15206677
 ] 

Robert Metzger commented on FLINK-3211:
---

Hi Roy,

I'll try to contact Tzu-Li via email to get an estimate when this is done. How 
urgently do you need the connector?
Maybe I can allocate some time soon to get a first version into Flink.

> Add AWS Kinesis streaming connector
> ---
>
> Key: FLINK-3211
> URL: https://issues.apache.org/jira/browse/FLINK-3211
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Affects Versions: 1.0.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> AWS Kinesis is a widely adopted message queue used by AWS users, much like a 
> cloud service version of Apache Kafka. Support for AWS Kinesis will be a 
> great addition to the handful of Flink's streaming connectors to external 
> systems and a great reach out to the AWS community.
> AWS supports two different ways to consume Kinesis data: with the low-level 
> AWS SDK [1], or with the high-level KCL (Kinesis Client Library) [2]. AWS SDK 
> can be used to consume Kinesis data, including stream read beginning from a 
> specific offset (or "record sequence number" in Kinesis terminology). On the 
> other hand, AWS officially recommends using KCL, which offers a higher-level 
> of abstraction that also comes with checkpointing and failure recovery by 
> using a KCL-managed AWS DynamoDB "leash table" as the checkpoint state 
> storage.
> However, KCL is essentially a stream processing library that wraps all the 
> partition-to-task (or "shard" in Kinesis terminology) determination and 
> checkpointing to allow the user to focus only on streaming application logic. 
> This leads to the understanding that we can not use the KCL to implement the 
> Kinesis streaming connector if we are aiming for a deep integration of Flink 
> with Kinesis that provides exactly once guarantees (KCL promises only 
> at-least-once). Therefore, AWS SDK will be the way to go for the 
> implementation of this feature.
> With the ability to read from specific offsets, and also the fact that 
> Kinesis and Kafka share a lot of similarities, the basic principles of the 
> implementation of Flink's Kinesis streaming connector will very much resemble 
> the Kafka connector. We can basically follow the outlines described in 
> [~StephanEwen]'s description [3] and [~rmetzger]'s Kafka connector 
> implementation [4]. A few tweaks due to some of Kinesis v.s. Kafka 
> differences is described as following:
> 1. While the Kafka connector can support reading from multiple topics, I 
> currently don't think this is a good idea for Kinesis streams (a Kinesis 
> Stream is logically equivalent to a Kafka topic). Kinesis streams can exist 
> in different AWS regions, and each Kinesis stream under the same AWS user 
> account may have completely independent access settings with different 
> authorization keys. Overall, a Kinesis stream feels like a much more 
> consolidated resource compared to Kafka topics. It would be great to hear 
> more thoughts on this part.
> 2. While Kafka has brokers that can hold multiple partitions, the only 
> partitioning abstraction for AWS Kinesis is "shards". Therefore, in contrast 
> to the Kafka connector having per broker connections where the connections 
> can handle multiple Kafka partitions, the Kinesis connector will only need to 
> have simple per shard connections.
> 3. Kinesis itself does not support committing offsets back to Kinesis. If we 
> were to implement this feature like the Kafka connector with Kafka / ZK to 
> sync outside view of progress, we probably could use ZK or DynamoDB like the 
> way KCL works. More thoughts on this part will be very helpful too.
> As for the Kinesis Sink, it should be possible to use the AWS KPL (Kinesis 
> Producer Library) [5]. However, for higher code consistency with the proposed 
> Kinesis Consumer, I think it will be better to stick with the AWS SDK for the 
> implementation. The implementation should be straight forward, being almost 
> if not completely the same as the Kafka sink.
> References:
> [1] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-sdk.html
> [2] 
> http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html
> [3] 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-Connector-td2872.html
> [4] http://data-artisans.com/kafka-flink-a-practical-how-to/
> [5] 
> http://docs.aws.amazon.com//kinesis/latest/dev/developing-producers-with-kpl.html#d0e4998



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-3653) recovery.zookeeper.storageDir is not documented on the configuration page

2016-03-22 Thread Stefano Baghino (JIRA)
Stefano Baghino created FLINK-3653:
--

 Summary: recovery.zookeeper.storageDir is not documented on the 
configuration page
 Key: FLINK-3653
 URL: https://issues.apache.org/jira/browse/FLINK-3653
 Project: Flink
  Issue Type: Bug
  Components: Documentation
Affects Versions: 1.0.0
Reporter: Stefano Baghino
Assignee: Stefano Baghino
Priority: Minor
 Fix For: 1.1.0


The {{recovery.zookeeper.storageDir}} option is documented in the HA page but 
is missing from the configuration page. Since it's required for HA I think it 
would be a good idea to have it on both pages.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15206658#comment-15206658
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r57015042
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java 
---
@@ -0,0 +1,601 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+
+import org.apache.flink.client.CliFrontend;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.jobmanager.MemoryArchivist;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.process.ProcessReaper;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.runtime.webmonitor.WebMonitor;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.util.Records;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.security.PrivilegedAction;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This class is the executable entry point for the YARN application 
master.
+ * It starts actor system and the actors for {@link 
org.apache.flink.runtime.jobmanager.JobManager}
+ * and {@link YarnFlinkResourceManager}.
+ * 
+ * The JobManager handles Flink job execution, while the 
YarnFlinkResourceManager handles container
+ * allocation and failure detection.
+ */
+public class YarnApplicationMasterRunner {
+
+   /** Logger */
+   protected static final Logger LOG = 
LoggerFactory.getLogger(YarnApplicationMasterRunner.class);
+
+   /** The maximum time that TaskManagers may be waiting to register at 
the JobManager,
+* before they quit */
+   private static final FiniteDuration TASKMANAGER_REGISTRATION_TIMEOUT = 
new FiniteDuration(5, TimeUnit.MINUTES);
+   
+   /** The process environment variables */
+   private static final Map ENV = System.getenv();
+   
+   /** The exit code returned if the initialization of the application 
master failed */
+   private static final int INIT_ERROR_EXIT_CODE = 31;
+
+   /** The exit code returned if the process exits because a critical 
actor died */
+   private static final int ACTOR_DIED_EXIT_CODE = 32;
+
+
+   // 
-

[GitHub] flink pull request: [FLINK-3544] Introduce ResourceManager compone...

2016-03-22 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r57015042
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java 
---
@@ -0,0 +1,601 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+
+import org.apache.flink.client.CliFrontend;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.jobmanager.MemoryArchivist;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.process.ProcessReaper;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.runtime.webmonitor.WebMonitor;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.util.Records;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.security.PrivilegedAction;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This class is the executable entry point for the YARN application 
master.
+ * It starts actor system and the actors for {@link 
org.apache.flink.runtime.jobmanager.JobManager}
+ * and {@link YarnFlinkResourceManager}.
+ * 
+ * The JobManager handles Flink job execution, while the 
YarnFlinkResourceManager handles container
+ * allocation and failure detection.
+ */
+public class YarnApplicationMasterRunner {
+
+   /** Logger */
+   protected static final Logger LOG = 
LoggerFactory.getLogger(YarnApplicationMasterRunner.class);
+
+   /** The maximum time that TaskManagers may be waiting to register at 
the JobManager,
+* before they quit */
+   private static final FiniteDuration TASKMANAGER_REGISTRATION_TIMEOUT = 
new FiniteDuration(5, TimeUnit.MINUTES);
+   
+   /** The process environment variables */
+   private static final Map ENV = System.getenv();
+   
+   /** The exit code returned if the initialization of the application 
master failed */
+   private static final int INIT_ERROR_EXIT_CODE = 31;
+
+   /** The exit code returned if the process exits because a critical 
actor died */
+   private static final int ACTOR_DIED_EXIT_CODE = 32;
+
+
+   // 

+   //  Program entry point
+   // 

+
+   /**
+* The entry point for the YARN application master. 
+*
+* @param arg

[jira] [Commented] (FLINK-3547) Add support for streaming projection, selection, and union

2016-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15206646#comment-15206646
 ] 

ASF GitHub Bot commented on FLINK-3547:
---

Github user vasia commented on the pull request:

https://github.com/apache/flink/pull/1820#issuecomment-199881323
  
Thanks for the review! I'll see how we can share the common parts between 
DataSet and DataStream translation. I will also open a JIRA for reworking the 
tests.


> Add support for streaming projection, selection, and union
> --
>
> Key: FLINK-3547
> URL: https://issues.apache.org/jira/browse/FLINK-3547
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API
>Reporter: Vasia Kalavri
>Assignee: Vasia Kalavri
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3547] add support for streaming filter,...

2016-03-22 Thread vasia
Github user vasia commented on the pull request:

https://github.com/apache/flink/pull/1820#issuecomment-199881323
  
Thanks for the review! I'll see how we can share the common parts between 
DataSet and DataStream translation. I will also open a JIRA for reworking the 
tests.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-3652) Enable UnusedImports check for Scala checkstyle

2016-03-22 Thread Maximilian Michels (JIRA)
Maximilian Michels created FLINK-3652:
-

 Summary: Enable UnusedImports check for Scala checkstyle
 Key: FLINK-3652
 URL: https://issues.apache.org/jira/browse/FLINK-3652
 Project: Flink
  Issue Type: Improvement
  Components: Build System
Reporter: Maximilian Michels
Assignee: Maximilian Michels
Priority: Minor


For some reason, we don't have the UnusedImports check enabled in Scala 
checkstyle. This is not consistent with Java where we strictly check for unused 
imports.

I propose to enable it and fix eventual unused imports.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2445] Add tests for HadoopOutputFormats

2016-03-22 Thread mliesenberg
Github user mliesenberg commented on the pull request:

https://github.com/apache/flink/pull/1625#issuecomment-199878190
  
it's only the build job with with the `hadoop.profile=1` option. for the 
others the tests are passing. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2445) Add tests for HadoopOutputFormats

2016-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2445?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15206628#comment-15206628
 ] 

ASF GitHub Bot commented on FLINK-2445:
---

Github user mliesenberg commented on the pull request:

https://github.com/apache/flink/pull/1625#issuecomment-199878190
  
it's only the build job with with the `hadoop.profile=1` option. for the 
others the tests are passing. 


> Add tests for HadoopOutputFormats
> -
>
> Key: FLINK-2445
> URL: https://issues.apache.org/jira/browse/FLINK-2445
> Project: Flink
>  Issue Type: Test
>  Components: Hadoop Compatibility, Tests
>Affects Versions: 0.9.1, 0.10.0
>Reporter: Fabian Hueske
>Assignee: Martin Liesenberg
>  Labels: starter
>
> The HadoopOutputFormats and HadoopOutputFormatBase classes are not 
> sufficiently covered by unit tests.
> We need tests that ensure that the methods of the wrapped Hadoop 
> OutputFormats are correctly called. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15206625#comment-15206625
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r57011989
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java 
---
@@ -0,0 +1,601 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+
+import org.apache.flink.client.CliFrontend;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.jobmanager.MemoryArchivist;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.process.ProcessReaper;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.runtime.webmonitor.WebMonitor;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.util.Records;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.security.PrivilegedAction;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This class is the executable entry point for the YARN application 
master.
+ * It starts actor system and the actors for {@link 
org.apache.flink.runtime.jobmanager.JobManager}
+ * and {@link YarnFlinkResourceManager}.
+ * 
+ * The JobManager handles Flink job execution, while the 
YarnFlinkResourceManager handles container
+ * allocation and failure detection.
+ */
+public class YarnApplicationMasterRunner {
+
+   /** Logger */
+   protected static final Logger LOG = 
LoggerFactory.getLogger(YarnApplicationMasterRunner.class);
+
+   /** The maximum time that TaskManagers may be waiting to register at 
the JobManager,
+* before they quit */
+   private static final FiniteDuration TASKMANAGER_REGISTRATION_TIMEOUT = 
new FiniteDuration(5, TimeUnit.MINUTES);
+   
+   /** The process environment variables */
+   private static final Map ENV = System.getenv();
+   
+   /** The exit code returned if the initialization of the application 
master failed */
+   private static final int INIT_ERROR_EXIT_CODE = 31;
+
+   /** The exit code returned if the process exits because a critical 
actor died */
+   private static final int ACTOR_DIED_EXIT_CODE = 32;
+
+
+   // 
--

[jira] [Commented] (FLINK-3547) Add support for streaming projection, selection, and union

2016-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15206618#comment-15206618
 ] 

ASF GitHub Bot commented on FLINK-3547:
---

Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1820#issuecomment-199877286
  
I think we should try to share more code between the DataSet and DataStream 
translation. Otherwise it looks good. 

We should also rework the tests and extract tests that check the Table API 
parts that are common for DataSet and DataStream. I wouldn't do that in this PR 
though.


> Add support for streaming projection, selection, and union
> --
>
> Key: FLINK-3547
> URL: https://issues.apache.org/jira/browse/FLINK-3547
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API
>Reporter: Vasia Kalavri
>Assignee: Vasia Kalavri
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3544) ResourceManager runtime components

2016-03-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15206622#comment-15206622
 ] 

ASF GitHub Bot commented on FLINK-3544:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1741#discussion_r57011773
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -39,24 +39,32 @@ import org.apache.flink.runtime.blob.BlobServer
 import org.apache.flink.runtime.checkpoint._
 import org.apache.flink.runtime.client._
 import org.apache.flink.runtime.execution.SuppressRestartsException
+import org.apache.flink.runtime.clusterframework.FlinkResourceManager
+import org.apache.flink.runtime.clusterframework.messages._
+import 
org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager
+import org.apache.flink.runtime.clusterframework.types.ResourceID
 import 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
 import org.apache.flink.runtime.executiongraph.restart.{RestartStrategy, 
RestartStrategyFactory}
 import org.apache.flink.runtime.executiongraph.{ExecutionGraph, 
ExecutionJobVertex}
-import org.apache.flink.runtime.instance.{AkkaActorGateway, 
InstanceManager}
+import org.apache.flink.runtime.instance.{HardwareDescription, 
InstanceConnectionInfo,
--- End diff --

`HardwareDescription` and `InstnaceConnectionInfo` unused


> ResourceManager runtime components
> --
>
> Key: FLINK-3544
> URL: https://issues.apache.org/jira/browse/FLINK-3544
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Affects Versions: 1.1.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
> Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


  1   2   3   4   >