[GitHub] flink issue #3153: [hotfix] [docs] Insert closing parentheses on "Flink Data...

2017-01-19 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/3153
  
Thanks for the fix @keijiyoshida.
+1 to merge


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-5567) Introduce and migrate current table statistics to FlinkStatistics

2017-01-19 Thread zhangjing (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5567?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhangjing updated FLINK-5567:
-
Description: 
1. Introduce FlinkStatistic class, which is subclass of Calcite Statistic.
 2. Integrate FlinkStatistic with FlinkTable 

> Introduce and migrate current table statistics to FlinkStatistics
> -
>
> Key: FLINK-5567
> URL: https://issues.apache.org/jira/browse/FLINK-5567
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kurt Young
>Assignee: zhangjing
>
> 1. Introduce FlinkStatistic class, which is subclass of Calcite Statistic.
>  2. Integrate FlinkStatistic with FlinkTable 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5505) Harmonize ZooKeeper configuration parameters

2017-01-19 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5505?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann updated FLINK-5505:
-
Assignee: (was: Till Rohrmann)

> Harmonize ZooKeeper configuration parameters
> 
>
> Key: FLINK-5505
> URL: https://issues.apache.org/jira/browse/FLINK-5505
> Project: Flink
>  Issue Type: Improvement
>  Components: Mesos
>Affects Versions: 1.2.0, 1.3.0
>Reporter: Till Rohrmann
>Priority: Trivial
> Fix For: 1.3.0
>
>
> Since Flink users don't necessarily know all of the Mesos terminology and a 
> JobManager runs also as a task, I would like to rename some of Flink's Mesos 
> configuration parameters. I would propose the following changes:
> {{mesos.initial-tasks}} => {{mesos.initial-taskmanagers}}
> {{mesos.maximum-failed-tasks}} => {{mesos.maximum-failed-taskmanagers}}
> {{mesos.resourcemanager.artifactserver.*}} => {{mesos.artifactserver.*}}
> {{mesos.resourcemanager.framework.*}} => {{mesos.framework.*}}
> {{mesos.resourcemanager.tasks.*}} => {{mesos.taskmanager.*}}
> {{recovery.zookeeper.path.mesos-workers}} => 
> {{mesos.high-availability.zookeeper.path.mesos-workers}}
> What do you think [~eronwright]?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5505) Harmonize ZooKeeper configuration parameters

2017-01-19 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5505?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann updated FLINK-5505:
-
Description: 
Since Flink users don't necessarily know all of the Mesos terminology and a 
JobManager runs also as a task, I would like to rename some of Flink's Mesos 
configuration parameters. I would propose the following changes:

{{mesos.initial-tasks}} => {{mesos.initial-taskmanagers}}
{{mesos.maximum-failed-tasks}} => {{mesos.maximum-failed-taskmanagers}}
{{mesos.resourcemanager.artifactserver.*}} => {{mesos.artifactserver.*}}
{{mesos.resourcemanager.framework.*}} => {{mesos.framework.*}}
{{mesos.resourcemanager.tasks.*}} => {{mesos.taskmanager.*}}

{{recovery.zookeeper.path.mesos-workers}} => 
{{mesos.high-availability.zookeeper.path.mesos-workers}}

What do you think [~eronwright]?

  was:In order to harmonize configuration parameter names I think we should 
rename {{recovery.zookeeper.path.mesos-workers}} into 
{{high-availability.zookeeper.path.mesos-workers}}.


> Harmonize ZooKeeper configuration parameters
> 
>
> Key: FLINK-5505
> URL: https://issues.apache.org/jira/browse/FLINK-5505
> Project: Flink
>  Issue Type: Improvement
>  Components: Mesos
>Affects Versions: 1.2.0, 1.3.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Trivial
> Fix For: 1.3.0
>
>
> Since Flink users don't necessarily know all of the Mesos terminology and a 
> JobManager runs also as a task, I would like to rename some of Flink's Mesos 
> configuration parameters. I would propose the following changes:
> {{mesos.initial-tasks}} => {{mesos.initial-taskmanagers}}
> {{mesos.maximum-failed-tasks}} => {{mesos.maximum-failed-taskmanagers}}
> {{mesos.resourcemanager.artifactserver.*}} => {{mesos.artifactserver.*}}
> {{mesos.resourcemanager.framework.*}} => {{mesos.framework.*}}
> {{mesos.resourcemanager.tasks.*}} => {{mesos.taskmanager.*}}
> {{recovery.zookeeper.path.mesos-workers}} => 
> {{mesos.high-availability.zookeeper.path.mesos-workers}}
> What do you think [~eronwright]?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5505) Harmonize ZooKeeper configuration parameters

2017-01-19 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5505?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann updated FLINK-5505:
-
Description: 
Since Flink users don't necessarily know all of the Mesos terminology and a 
JobManager runs also as a task, I would like to rename some of Flink's Mesos 
configuration parameters. I would propose the following changes:

{{mesos.initial-tasks}} => {{mesos.initial-taskmanagers}}
{{mesos.maximum-failed-tasks}} => {{mesos.maximum-failed-taskmanagers}}
{{mesos.resourcemanager.artifactserver.\*}} => {{mesos.artifactserver.*}}
{{mesos.resourcemanager.framework.*}} => {{mesos.framework.*}}
{{mesos.resourcemanager.tasks.*}} => {{mesos.taskmanager.*}}

{{recovery.zookeeper.path.mesos-workers}} => 
{{mesos.high-availability.zookeeper.path.mesos-workers}}

What do you think [~eronwright]?

  was:
Since Flink users don't necessarily know all of the Mesos terminology and a 
JobManager runs also as a task, I would like to rename some of Flink's Mesos 
configuration parameters. I would propose the following changes:

{{mesos.initial-tasks}} => {{mesos.initial-taskmanagers}}
{{mesos.maximum-failed-tasks}} => {{mesos.maximum-failed-taskmanagers}}
{{mesos.resourcemanager.artifactserver.*}} => {{mesos.artifactserver.*}}
{{mesos.resourcemanager.framework.*}} => {{mesos.framework.*}}
{{mesos.resourcemanager.tasks.*}} => {{mesos.taskmanager.*}}

{{recovery.zookeeper.path.mesos-workers}} => 
{{mesos.high-availability.zookeeper.path.mesos-workers}}

What do you think [~eronwright]?


> Harmonize ZooKeeper configuration parameters
> 
>
> Key: FLINK-5505
> URL: https://issues.apache.org/jira/browse/FLINK-5505
> Project: Flink
>  Issue Type: Improvement
>  Components: Mesos
>Affects Versions: 1.2.0, 1.3.0
>Reporter: Till Rohrmann
>Priority: Trivial
> Fix For: 1.3.0
>
>
> Since Flink users don't necessarily know all of the Mesos terminology and a 
> JobManager runs also as a task, I would like to rename some of Flink's Mesos 
> configuration parameters. I would propose the following changes:
> {{mesos.initial-tasks}} => {{mesos.initial-taskmanagers}}
> {{mesos.maximum-failed-tasks}} => {{mesos.maximum-failed-taskmanagers}}
> {{mesos.resourcemanager.artifactserver.\*}} => {{mesos.artifactserver.*}}
> {{mesos.resourcemanager.framework.*}} => {{mesos.framework.*}}
> {{mesos.resourcemanager.tasks.*}} => {{mesos.taskmanager.*}}
> {{recovery.zookeeper.path.mesos-workers}} => 
> {{mesos.high-availability.zookeeper.path.mesos-workers}}
> What do you think [~eronwright]?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5505) Harmonize ZooKeeper configuration parameters

2017-01-19 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5505?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann updated FLINK-5505:
-
Description: 
Since Flink users don't necessarily know all of the Mesos terminology and a 
JobManager runs also as a task, I would like to rename some of Flink's Mesos 
configuration parameters. I would propose the following changes:

{{mesos.initial-tasks}} => {{mesos.initial-taskmanagers}}
{{mesos.maximum-failed-tasks}} => {{mesos.maximum-failed-taskmanagers}}
{{mesos.resourcemanager.artifactserver.\*}} => {{mesos.artifactserver.*}}
{{mesos.resourcemanager.framework.\*}} => {{mesos.framework.*}}
{{mesos.resourcemanager.tasks.\*}} => {{mesos.taskmanager.*}}

{{recovery.zookeeper.path.mesos-workers}} => 
{{mesos.high-availability.zookeeper.path.mesos-workers}}

What do you think [~eronwright]?

  was:
Since Flink users don't necessarily know all of the Mesos terminology and a 
JobManager runs also as a task, I would like to rename some of Flink's Mesos 
configuration parameters. I would propose the following changes:

{{mesos.initial-tasks}} => {{mesos.initial-taskmanagers}}
{{mesos.maximum-failed-tasks}} => {{mesos.maximum-failed-taskmanagers}}
{{mesos.resourcemanager.artifactserver.\*}} => {{mesos.artifactserver.*}}
{{mesos.resourcemanager.framework.*}} => {{mesos.framework.*}}
{{mesos.resourcemanager.tasks.*}} => {{mesos.taskmanager.*}}

{{recovery.zookeeper.path.mesos-workers}} => 
{{mesos.high-availability.zookeeper.path.mesos-workers}}

What do you think [~eronwright]?


> Harmonize ZooKeeper configuration parameters
> 
>
> Key: FLINK-5505
> URL: https://issues.apache.org/jira/browse/FLINK-5505
> Project: Flink
>  Issue Type: Improvement
>  Components: Mesos
>Affects Versions: 1.2.0, 1.3.0
>Reporter: Till Rohrmann
>Priority: Trivial
> Fix For: 1.3.0
>
>
> Since Flink users don't necessarily know all of the Mesos terminology and a 
> JobManager runs also as a task, I would like to rename some of Flink's Mesos 
> configuration parameters. I would propose the following changes:
> {{mesos.initial-tasks}} => {{mesos.initial-taskmanagers}}
> {{mesos.maximum-failed-tasks}} => {{mesos.maximum-failed-taskmanagers}}
> {{mesos.resourcemanager.artifactserver.\*}} => {{mesos.artifactserver.*}}
> {{mesos.resourcemanager.framework.\*}} => {{mesos.framework.*}}
> {{mesos.resourcemanager.tasks.\*}} => {{mesos.taskmanager.*}}
> {{recovery.zookeeper.path.mesos-workers}} => 
> {{mesos.high-availability.zookeeper.path.mesos-workers}}
> What do you think [~eronwright]?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5546) When multiple users run test, /tmp/cacheFile conflicts

2017-01-19 Thread Syinchwun Leo (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15829575#comment-15829575
 ] 

Syinchwun Leo commented on FLINK-5546:
--

It seems that when run TEST, the tmp file uses JVM's default tmp dir. When 
using maven, using -Djava.io.tmpdir=/path/to/tmpdir can reset the tmp fold. 

> When multiple users run test, /tmp/cacheFile conflicts
> --
>
> Key: FLINK-5546
> URL: https://issues.apache.org/jira/browse/FLINK-5546
> Project: Flink
>  Issue Type: Test
>Affects Versions: 1.2.0, 1.3.0
> Environment: CentOS 7.2
>Reporter: Syinchwun Leo
>
> When multiple Linux users run test at the same time, flink-runtime module may 
> fail. User A creates /tmp/cacheFile, and User B will have no permission to 
> visit the fold.  
> Failed tests: 
> FileCacheDeleteValidationTest.setup:79 Error initializing the test: 
> /tmp/cacheFile (Permission denied)
> Tests in error: 
> IOManagerTest.channelEnumerator:54 » Runtime Could not create storage 
> director...
> Tests run: 1385, Failures: 1, Errors: 1, Skipped: 8



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (FLINK-5546) When multiple users run test, /tmp/cacheFile conflicts

2017-01-19 Thread Syinchwun Leo (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15829575#comment-15829575
 ] 

Syinchwun Leo edited comment on FLINK-5546 at 1/19/17 9:22 AM:
---

It seems that when runing TEST, the tmp file uses JVM's default tmp dir. When 
using maven, using -Djava.io.tmpdir=/path/to/tmpdir can reset the tmp fold. 


was (Author: syinchwunleo):
It seems that when run TEST, the tmp file uses JVM's default tmp dir. When 
using maven, using -Djava.io.tmpdir=/path/to/tmpdir can reset the tmp fold. 

> When multiple users run test, /tmp/cacheFile conflicts
> --
>
> Key: FLINK-5546
> URL: https://issues.apache.org/jira/browse/FLINK-5546
> Project: Flink
>  Issue Type: Test
>Affects Versions: 1.2.0, 1.3.0
> Environment: CentOS 7.2
>Reporter: Syinchwun Leo
>
> When multiple Linux users run test at the same time, flink-runtime module may 
> fail. User A creates /tmp/cacheFile, and User B will have no permission to 
> visit the fold.  
> Failed tests: 
> FileCacheDeleteValidationTest.setup:79 Error initializing the test: 
> /tmp/cacheFile (Permission denied)
> Tests in error: 
> IOManagerTest.channelEnumerator:54 » Runtime Could not create storage 
> director...
> Tests run: 1385, Failures: 1, Errors: 1, Skipped: 8



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1750) Add canonical correlation analysis (CCA) to machine learning library

2017-01-19 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1750?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15829584#comment-15829584
 ] 

Till Rohrmann commented on FLINK-1750:
--

Hi [~kateri],

great to hear that you're working on this feature :-)

There wasn't a specific use case intended to be solved by this issue. Thus, it 
would be great to implement it as a general purpose method where you can enter 
samples of two random vectors and then can do the dependency reduction after 
you've learned the covariance matrix. Maybe you could take a look at how scikit 
learn does it. Usually they have a really good abstraction. 

There wasn't a customer requesting this feature. I opened it because I thought 
it would be a valuable transformer in your ML pipeline.

I hope this helps to answer your questions.

> Add canonical correlation analysis (CCA) to machine learning library
> 
>
> Key: FLINK-1750
> URL: https://issues.apache.org/jira/browse/FLINK-1750
> Project: Flink
>  Issue Type: New Feature
>  Components: Machine Learning Library
>Reporter: Till Rohrmann
>Assignee: Kate Eri
>  Labels: ML
>
> Canonical correlation analysis (CCA) [1] can be used to find correlated 
> features between two random variables. Moreover, CCA can be used for 
> dimensionality reduction.
> Maybe the work of Jia Chen and Ioannis D. Schizas [2] can be adapted to 
> realize a distributed CCA with Flink. 
> Resources:
> [1] [http://en.wikipedia.org/wiki/Canonical_correlation]
> [2] [http://ieeexplore.ieee.org/stamp/stamp.jsp?tp=&arnumber=6810359]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #3136: [FLINK-5512] [doc] Improve RabbitMQ documentation

2017-01-19 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3136


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5512) RabbitMQ documentation should inform that exactly-once holds for RMQSource only when parallelism is 1

2017-01-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15829592#comment-15829592
 ] 

ASF GitHub Bot commented on FLINK-5512:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3136


> RabbitMQ documentation should inform that exactly-once holds for RMQSource 
> only when parallelism is 1  
> ---
>
> Key: FLINK-5512
> URL: https://issues.apache.org/jira/browse/FLINK-5512
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.2.0
>
>
> See here for the reasoning: FLINK-2624. We should add an informative warning 
> about this limitation in the docs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #3150: [FLINK-4693][tableApi] Add session group-windows f...

2017-01-19 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3150#discussion_r96811063
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -226,27 +225,101 @@ object AggregateUtil {
   aggregates,
   groupingOffsetMapping,
   aggOffsetMapping,
-  intermediateRowArity,
+  intermediateRowArity + 1,// the addition one field is used to 
store time attribute
   outputType.getFieldCount)
+
+  case EventTimeSessionGroupWindow(_, _, gap) =>
+val (startPos, endPos) = if (isTimeWindow(window)) {
--- End diff --

Aren't session windows always time windows?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3150: [FLINK-4693][tableApi] Add session group-windows f...

2017-01-19 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3150#discussion_r96821333
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala
 ---
@@ -218,6 +216,85 @@ class DataSetWindowAggregate(
 }
   }
 
+  private[this] def createEventTimeSessionWindowDataSet(
+inputDS: DataSet[Any],
+isParserCaseSensitive: Boolean): DataSet[Any] = {
+
+val groupingKeys = grouping.indices.toArray
+val rowTypeInfo = resultRowTypeInfo
+
+// grouping window
+if (groupingKeys.length > 0) {
+  //create mapFunction for initializing the aggregations
+  val mapFunction = createDataSetWindowPrepareMapFunction(
+window,
+namedAggregates,
+grouping,
+inputType,isParserCaseSensitive)
+
+  // create groupReduceFunction for calculating the aggregations
+  val groupReduceFunction = 
createDataSetWindowAggregationGroupReduceFunction(
+window,
+namedAggregates,
+inputType,
+rowRelDataType,
+grouping,
+namedProperties)
+
+  val mappedInput =
+inputDS
+.map(mapFunction)
+.name(prepareOperatorName)
+
+  val mapReturnType = 
mapFunction.asInstanceOf[ResultTypeQueryable[Row]].getProducedType
+
+  // the position of the rowtime field in the intermediate result for 
map output
+  val rowTimeFilePos = mapReturnType.getArity - 1
--- End diff --

should be `rowTimeFieldPos`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3150: [FLINK-4693][tableApi] Add session group-windows f...

2017-01-19 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3150#discussion_r96818876
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregateReduceGroupFunction.scala
 ---
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.lang.Iterable
+
+import org.apache.flink.api.common.functions.RichGroupReduceFunction
+import org.apache.flink.types.Row
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.util.{Collector, Preconditions}
+
+import scala.collection.JavaConversions._
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow
+
+/**
+  * It wraps the aggregate logic inside of
+  * [[org.apache.flink.api.java.operators.GroupReduceOperator]]. It is 
used for Session time-window
+  * on batch.
+  * Note:
+  *  This can handle two input types:
+  *  1. when partial aggregate is not supported, the input data structure 
of reduce is
+  *|groupKey1|groupKey2|sum1|count1|sum2|count2|rowTime|
+  *  2. when partial aggregate is supported, the input data structure of 
reduce is
+  *|groupKey1|groupKey2|sum1|count1|sum2|count2|windowStart|windowEnd|
+  *
+  * @param aggregates The aggregate functions.
+  * @param groupKeysMapping The index mapping of group keys between 
intermediate aggregate Row
+  * and output Row.
+  * @param aggregateMapping The index mapping between aggregate function 
list and aggregated value
+  * index in output Row.
+  * @param intermediateRowArity The intermediate row field count.
+  * @param finalRowArity The output row field count.
+  * @param finalRowWindowStartPos The relative window-start field position.
+  * @param finalRowWindowEndPos The relative window-end field position.
+  * @param gap Session time window gap.
+  */
+class DataSetSessionWindowAggregateReduceGroupFunction(
+aggregates: Array[Aggregate[_ <: Any]],
+groupKeysMapping: Array[(Int, Int)],
+aggregateMapping: Array[(Int, Int)],
+intermediateRowArity: Int,
+finalRowArity: Int,
+finalRowWindowStartPos: Option[Int],
+finalRowWindowEndPos: Option[Int],
+gap:Long)
+  extends RichGroupReduceFunction[Row, Row] {
+
+  private var aggregateBuffer: Row = _
+  private var output: Row = _
+  private var collector: TimeWindowPropertyCollector = _
+  private var intermediateRowWindowStartPos = 0
+  private var intermediateRowWindowEndPos = 0
+
+  override def open(config: Configuration) {
+Preconditions.checkNotNull(aggregates)
+Preconditions.checkNotNull(groupKeysMapping)
+aggregateBuffer = new Row(intermediateRowArity)
+intermediateRowWindowStartPos = intermediateRowArity - 2
+intermediateRowWindowEndPos = intermediateRowArity - 1
+output = new Row(finalRowArity)
+collector = new TimeWindowPropertyCollector(finalRowWindowStartPos, 
finalRowWindowEndPos)
+  }
+
+  /**
+* For grouped intermediate aggregate Rows, divide window according to 
the window-start
+* and window-end, merge data (within a unified window) into an 
aggregate buffer, calculate
+* aggregated values output from aggregate buffer, and then set them 
into output
+* Row based on the mapping relationship between intermediate aggregate 
data and output data.
+*
+* @param records Grouped intermediate aggregate Rows iterator.
+* @param out The collector to hand results to.
+*
+*/
+  override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = 
{
+
+var last: Row = null
+var head: Row = null
+var lastWindowEnd: java.lang.Long = null
+var currentWindowStart:java.lang.Long  = null
+
+val iterator = records.iterator()
+

[GitHub] flink pull request #3150: [FLINK-4693][tableApi] Add session group-windows f...

2017-01-19 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3150#discussion_r96812620
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetWindowAggregateITCase.scala
 ---
@@ -119,4 +119,35 @@ class DataSetWindowAggregateITCase(
 val results = windowedTable.toDataSet[Row].collect()
 TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
+
+  @Test
+  def testEventTimeSessionGroupWindow(): Unit = {
+val env = ExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+val table = env.fromCollection(data).toTable(tEnv, 'long, 'int, 
'string)
+val windowedTable = table
+  .groupBy('string)
+  .window(Session withGap 7.milli on 'long as 'w)
+  .select('string, 'string.count, 'w.start, 'w.end)
+
+val results = windowedTable.toDataSet[Row].collect()
+
+val expected = "Hallo,1,1970-01-01 00:00:00.002,1970-01-01 
00:00:00.009\nHello world,1," +
--- End diff --

can you break the lines at the end of the records? Makes it easier to read 
the expected data.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3150: [FLINK-4693][tableApi] Add session group-windows f...

2017-01-19 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3150#discussion_r96810332
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala
 ---
@@ -128,10 +128,8 @@ class DataSetWindowAggregate(
   inputDS,
   isTimeInterval(size.resultType),
   caseSensitive)
-
-  case EventTimeSessionGroupWindow(_, _, _) =>
-throw new UnsupportedOperationException(
-  "Event-time session windows in a batch environment are currently 
not supported")
+  case EventTimeSessionGroupWindow(_, _, gap) =>
+createEventTimeSessionWindowDataSet(inputDS,caseSensitive)
--- End diff --

add space: `(inputDS, caseSensitive)`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3150: [FLINK-4693][tableApi] Add session group-windows f...

2017-01-19 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3150#discussion_r96816754
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregateCombineGroupFunction.scala
 ---
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.lang.Iterable
+
+import org.apache.flink.api.common.functions.RichGroupCombineFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.types.Row
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.util.{Collector, Preconditions}
+
+import scala.collection.JavaConversions._
+
+/**
+  * This wraps the aggregate logic inside of
+  * [[org.apache.flink.api.java.operators.GroupCombineOperator]].
+  *
+  * @param aggregates The aggregate functions.
+  * @param groupingKeys
+  * @param intermediateRowArity The intermediate row field count.
+  * @param gap  Session time window gap.
+  * @param intermediateRowType Intermediate row data type.
+  */
+class DataSetSessionWindowAggregateCombineGroupFunction(
+aggregates: Array[Aggregate[_ <: Any]],
+groupingKeys: Array[Int],
+intermediateRowArity: Int,
+gap: Long,
+@transient intermediateRowType: TypeInformation[Row])
+  extends RichGroupCombineFunction[Row,Row] with ResultTypeQueryable[Row] {
+
+  private var aggregateBuffer: Row = _
+  private var rowTimePos = 0
+
+  override def open(config: Configuration) {
+Preconditions.checkNotNull(aggregates)
+Preconditions.checkNotNull(groupingKeys)
+aggregateBuffer = new Row(intermediateRowArity)
+rowTimePos = intermediateRowArity - 2
+  }
+
+  /**
+* For sub-grouped intermediate aggregate Rows, divide window based on 
the row-time
+* (current'row-time - previous’row-time > gap), and then merge data 
(within a unified window)
+* into an aggregate buffer.
+*
+* @param records  Sub-grouped intermediate aggregate Rows .
+* @return Combined intermediate aggregate Row.
+*
+*/
+  override def combine(
+records: Iterable[Row],
+out: Collector[Row]): Unit = {
+
+var head:Row = null
--- End diff --

add space


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3150: [FLINK-4693][tableApi] Add session group-windows f...

2017-01-19 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3150#discussion_r96820389
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregateReduceGroupFunction.scala
 ---
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.lang.Iterable
+
+import org.apache.flink.api.common.functions.RichGroupReduceFunction
+import org.apache.flink.types.Row
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.util.{Collector, Preconditions}
+
+import scala.collection.JavaConversions._
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow
+
+/**
+  * It wraps the aggregate logic inside of
+  * [[org.apache.flink.api.java.operators.GroupReduceOperator]]. It is 
used for Session time-window
+  * on batch.
+  * Note:
+  *  This can handle two input types:
+  *  1. when partial aggregate is not supported, the input data structure 
of reduce is
+  *|groupKey1|groupKey2|sum1|count1|sum2|count2|rowTime|
+  *  2. when partial aggregate is supported, the input data structure of 
reduce is
+  *|groupKey1|groupKey2|sum1|count1|sum2|count2|windowStart|windowEnd|
+  *
+  * @param aggregates The aggregate functions.
+  * @param groupKeysMapping The index mapping of group keys between 
intermediate aggregate Row
+  * and output Row.
+  * @param aggregateMapping The index mapping between aggregate function 
list and aggregated value
+  * index in output Row.
+  * @param intermediateRowArity The intermediate row field count.
+  * @param finalRowArity The output row field count.
+  * @param finalRowWindowStartPos The relative window-start field position.
+  * @param finalRowWindowEndPos The relative window-end field position.
+  * @param gap Session time window gap.
+  */
+class DataSetSessionWindowAggregateReduceGroupFunction(
+aggregates: Array[Aggregate[_ <: Any]],
+groupKeysMapping: Array[(Int, Int)],
+aggregateMapping: Array[(Int, Int)],
+intermediateRowArity: Int,
+finalRowArity: Int,
+finalRowWindowStartPos: Option[Int],
+finalRowWindowEndPos: Option[Int],
+gap:Long)
+  extends RichGroupReduceFunction[Row, Row] {
+
+  private var aggregateBuffer: Row = _
+  private var output: Row = _
+  private var collector: TimeWindowPropertyCollector = _
+  private var intermediateRowWindowStartPos = 0
+  private var intermediateRowWindowEndPos = 0
+
+  override def open(config: Configuration) {
+Preconditions.checkNotNull(aggregates)
+Preconditions.checkNotNull(groupKeysMapping)
+aggregateBuffer = new Row(intermediateRowArity)
+intermediateRowWindowStartPos = intermediateRowArity - 2
+intermediateRowWindowEndPos = intermediateRowArity - 1
+output = new Row(finalRowArity)
+collector = new TimeWindowPropertyCollector(finalRowWindowStartPos, 
finalRowWindowEndPos)
+  }
+
+  /**
+* For grouped intermediate aggregate Rows, divide window according to 
the window-start
+* and window-end, merge data (within a unified window) into an 
aggregate buffer, calculate
+* aggregated values output from aggregate buffer, and then set them 
into output
+* Row based on the mapping relationship between intermediate aggregate 
data and output data.
+*
+* @param records Grouped intermediate aggregate Rows iterator.
+* @param out The collector to hand results to.
+*
+*/
+  override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = 
{
+
+var last: Row = null
+var head: Row = null
+var lastWindowEnd: java.lang.Long = null
+var currentWindowStart:java.lang.Long  = null
+
+val iterator = records.iterator()
+

[GitHub] flink pull request #3150: [FLINK-4693][tableApi] Add session group-windows f...

2017-01-19 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3150#discussion_r96819808
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregateReduceGroupFunction.scala
 ---
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.lang.Iterable
+
+import org.apache.flink.api.common.functions.RichGroupReduceFunction
+import org.apache.flink.types.Row
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.util.{Collector, Preconditions}
+
+import scala.collection.JavaConversions._
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow
+
+/**
+  * It wraps the aggregate logic inside of
+  * [[org.apache.flink.api.java.operators.GroupReduceOperator]]. It is 
used for Session time-window
+  * on batch.
+  * Note:
+  *  This can handle two input types:
+  *  1. when partial aggregate is not supported, the input data structure 
of reduce is
+  *|groupKey1|groupKey2|sum1|count1|sum2|count2|rowTime|
+  *  2. when partial aggregate is supported, the input data structure of 
reduce is
+  *|groupKey1|groupKey2|sum1|count1|sum2|count2|windowStart|windowEnd|
+  *
+  * @param aggregates The aggregate functions.
+  * @param groupKeysMapping The index mapping of group keys between 
intermediate aggregate Row
+  * and output Row.
+  * @param aggregateMapping The index mapping between aggregate function 
list and aggregated value
+  * index in output Row.
+  * @param intermediateRowArity The intermediate row field count.
+  * @param finalRowArity The output row field count.
+  * @param finalRowWindowStartPos The relative window-start field position.
+  * @param finalRowWindowEndPos The relative window-end field position.
+  * @param gap Session time window gap.
+  */
+class DataSetSessionWindowAggregateReduceGroupFunction(
+aggregates: Array[Aggregate[_ <: Any]],
+groupKeysMapping: Array[(Int, Int)],
+aggregateMapping: Array[(Int, Int)],
+intermediateRowArity: Int,
+finalRowArity: Int,
+finalRowWindowStartPos: Option[Int],
+finalRowWindowEndPos: Option[Int],
+gap:Long)
+  extends RichGroupReduceFunction[Row, Row] {
+
+  private var aggregateBuffer: Row = _
+  private var output: Row = _
+  private var collector: TimeWindowPropertyCollector = _
+  private var intermediateRowWindowStartPos = 0
+  private var intermediateRowWindowEndPos = 0
+
+  override def open(config: Configuration) {
+Preconditions.checkNotNull(aggregates)
+Preconditions.checkNotNull(groupKeysMapping)
+aggregateBuffer = new Row(intermediateRowArity)
+intermediateRowWindowStartPos = intermediateRowArity - 2
+intermediateRowWindowEndPos = intermediateRowArity - 1
+output = new Row(finalRowArity)
+collector = new TimeWindowPropertyCollector(finalRowWindowStartPos, 
finalRowWindowEndPos)
+  }
+
+  /**
+* For grouped intermediate aggregate Rows, divide window according to 
the window-start
+* and window-end, merge data (within a unified window) into an 
aggregate buffer, calculate
+* aggregated values output from aggregate buffer, and then set them 
into output
+* Row based on the mapping relationship between intermediate aggregate 
data and output data.
+*
+* @param records Grouped intermediate aggregate Rows iterator.
+* @param out The collector to hand results to.
+*
+*/
+  override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = 
{
+
+var last: Row = null
+var head: Row = null
+var lastWindowEnd: java.lang.Long = null
+var currentWindowStart:java.lang.Long  = null
+
+val iterator = records.iterator()
+

[GitHub] flink pull request #3150: [FLINK-4693][tableApi] Add session group-windows f...

2017-01-19 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3150#discussion_r96810423
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala
 ---
@@ -218,6 +216,85 @@ class DataSetWindowAggregate(
 }
   }
 
+  private[this] def createEventTimeSessionWindowDataSet(
+inputDS: DataSet[Any],
+isParserCaseSensitive: Boolean): DataSet[Any] = {
+
+val groupingKeys = grouping.indices.toArray
+val rowTypeInfo = resultRowTypeInfo
+
+// grouping window
+if (groupingKeys.length > 0) {
+  //create mapFunction for initializing the aggregations
+  val mapFunction = createDataSetWindowPrepareMapFunction(
+window,
+namedAggregates,
+grouping,
+inputType,isParserCaseSensitive)
--- End diff --

wrap last argument as well


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3150: [FLINK-4693][tableApi] Add session group-windows f...

2017-01-19 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3150#discussion_r96820016
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregateReduceGroupFunction.scala
 ---
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.lang.Iterable
+
+import org.apache.flink.api.common.functions.RichGroupReduceFunction
+import org.apache.flink.types.Row
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.util.{Collector, Preconditions}
+
+import scala.collection.JavaConversions._
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow
+
+/**
+  * It wraps the aggregate logic inside of
+  * [[org.apache.flink.api.java.operators.GroupReduceOperator]]. It is 
used for Session time-window
+  * on batch.
+  * Note:
+  *  This can handle two input types:
+  *  1. when partial aggregate is not supported, the input data structure 
of reduce is
+  *|groupKey1|groupKey2|sum1|count1|sum2|count2|rowTime|
+  *  2. when partial aggregate is supported, the input data structure of 
reduce is
+  *|groupKey1|groupKey2|sum1|count1|sum2|count2|windowStart|windowEnd|
+  *
+  * @param aggregates The aggregate functions.
+  * @param groupKeysMapping The index mapping of group keys between 
intermediate aggregate Row
+  * and output Row.
+  * @param aggregateMapping The index mapping between aggregate function 
list and aggregated value
+  * index in output Row.
+  * @param intermediateRowArity The intermediate row field count.
+  * @param finalRowArity The output row field count.
+  * @param finalRowWindowStartPos The relative window-start field position.
+  * @param finalRowWindowEndPos The relative window-end field position.
+  * @param gap Session time window gap.
+  */
+class DataSetSessionWindowAggregateReduceGroupFunction(
+aggregates: Array[Aggregate[_ <: Any]],
+groupKeysMapping: Array[(Int, Int)],
+aggregateMapping: Array[(Int, Int)],
+intermediateRowArity: Int,
+finalRowArity: Int,
+finalRowWindowStartPos: Option[Int],
+finalRowWindowEndPos: Option[Int],
+gap:Long)
+  extends RichGroupReduceFunction[Row, Row] {
+
+  private var aggregateBuffer: Row = _
+  private var output: Row = _
+  private var collector: TimeWindowPropertyCollector = _
+  private var intermediateRowWindowStartPos = 0
+  private var intermediateRowWindowEndPos = 0
+
+  override def open(config: Configuration) {
+Preconditions.checkNotNull(aggregates)
+Preconditions.checkNotNull(groupKeysMapping)
+aggregateBuffer = new Row(intermediateRowArity)
+intermediateRowWindowStartPos = intermediateRowArity - 2
+intermediateRowWindowEndPos = intermediateRowArity - 1
+output = new Row(finalRowArity)
+collector = new TimeWindowPropertyCollector(finalRowWindowStartPos, 
finalRowWindowEndPos)
+  }
+
+  /**
+* For grouped intermediate aggregate Rows, divide window according to 
the window-start
+* and window-end, merge data (within a unified window) into an 
aggregate buffer, calculate
+* aggregated values output from aggregate buffer, and then set them 
into output
+* Row based on the mapping relationship between intermediate aggregate 
data and output data.
+*
+* @param records Grouped intermediate aggregate Rows iterator.
+* @param out The collector to hand results to.
+*
+*/
+  override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = 
{
+
+var last: Row = null
+var head: Row = null
+var lastWindowEnd: java.lang.Long = null
+var currentWindowStart:java.lang.Long  = null
+
+val iterator = records.iterator()
+

[GitHub] flink pull request #3150: [FLINK-4693][tableApi] Add session group-windows f...

2017-01-19 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3150#discussion_r96817585
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregateCombineGroupFunction.scala
 ---
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.lang.Iterable
+
+import org.apache.flink.api.common.functions.RichGroupCombineFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.types.Row
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.util.{Collector, Preconditions}
+
+import scala.collection.JavaConversions._
+
+/**
+  * This wraps the aggregate logic inside of
+  * [[org.apache.flink.api.java.operators.GroupCombineOperator]].
+  *
+  * @param aggregates The aggregate functions.
+  * @param groupingKeys
+  * @param intermediateRowArity The intermediate row field count.
+  * @param gap  Session time window gap.
+  * @param intermediateRowType Intermediate row data type.
+  */
+class DataSetSessionWindowAggregateCombineGroupFunction(
+aggregates: Array[Aggregate[_ <: Any]],
+groupingKeys: Array[Int],
+intermediateRowArity: Int,
+gap: Long,
+@transient intermediateRowType: TypeInformation[Row])
+  extends RichGroupCombineFunction[Row,Row] with ResultTypeQueryable[Row] {
+
+  private var aggregateBuffer: Row = _
+  private var rowTimePos = 0
+
+  override def open(config: Configuration) {
+Preconditions.checkNotNull(aggregates)
+Preconditions.checkNotNull(groupingKeys)
+aggregateBuffer = new Row(intermediateRowArity)
+rowTimePos = intermediateRowArity - 2
+  }
+
+  /**
+* For sub-grouped intermediate aggregate Rows, divide window based on 
the row-time
+* (current'row-time - previous’row-time > gap), and then merge data 
(within a unified window)
+* into an aggregate buffer.
+*
+* @param records  Sub-grouped intermediate aggregate Rows .
+* @return Combined intermediate aggregate Row.
+*
+*/
+  override def combine(
+records: Iterable[Row],
+out: Collector[Row]): Unit = {
+
+var head:Row = null
+var lastRowTime: java.lang.Long = null
+var currentRowTime: java.lang.Long = null
+
+val iterator = records.iterator()
+
+while (iterator.hasNext) {
+  val record = iterator.next()
+currentRowTime = record.getField(rowTimePos).asInstanceOf[Long]
+
+// initial traversal or opening a new window
+// the session window end is equal to last row-time + gap .
+if (null == lastRowTime ||
+  (null != lastRowTime && (currentRowTime > (lastRowTime + gap 
{
+
+  // calculate the current window and open a new window.
+  if (null != lastRowTime) {
+// emit the current window's merged data
+doCollect(out, head, lastRowTime)
+  } else {
+// set group keys to aggregateBuffer.
+for (i <- 0 until groupingKeys.length) {
+  aggregateBuffer.setField(i, record.getField(i))
+}
+  }
+
+  // initiate intermediate aggregate value.
+  aggregates.foreach(_.initiate(aggregateBuffer))
+  head = record
--- End diff --

Do not remember an object that you received from the combine (or reduce) 
iterator. The iterator may repeatedly serve the same mutable object. If we only 
need the timestamp, we should remember only this in a `long`. Or we set the 
start time immediately in the `aggregateBuffer`.

See also object reuse mode: 
https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/batch/index.htm

[GitHub] flink pull request #3150: [FLINK-4693][tableApi] Add session group-windows f...

2017-01-19 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3150#discussion_r96815166
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregateCombineGroupFunction.scala
 ---
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.lang.Iterable
+
+import org.apache.flink.api.common.functions.RichGroupCombineFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.types.Row
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.util.{Collector, Preconditions}
+
+import scala.collection.JavaConversions._
+
+/**
+  * This wraps the aggregate logic inside of
+  * [[org.apache.flink.api.java.operators.GroupCombineOperator]].
+  *
+  * @param aggregates The aggregate functions.
+  * @param groupingKeys
+  * @param intermediateRowArity The intermediate row field count.
+  * @param gap  Session time window gap.
+  * @param intermediateRowType Intermediate row data type.
+  */
+class DataSetSessionWindowAggregateCombineGroupFunction(
+aggregates: Array[Aggregate[_ <: Any]],
+groupingKeys: Array[Int],
+intermediateRowArity: Int,
+gap: Long,
+@transient intermediateRowType: TypeInformation[Row])
+  extends RichGroupCombineFunction[Row,Row] with ResultTypeQueryable[Row] {
+
+  private var aggregateBuffer: Row = _
+  private var rowTimePos = 0
+
+  override def open(config: Configuration) {
+Preconditions.checkNotNull(aggregates)
+Preconditions.checkNotNull(groupingKeys)
+aggregateBuffer = new Row(intermediateRowArity)
+rowTimePos = intermediateRowArity - 2
+  }
+
+  /**
+* For sub-grouped intermediate aggregate Rows, divide window based on 
the row-time
+* (current'row-time - previous’row-time > gap), and then merge data 
(within a unified window)
+* into an aggregate buffer.
+*
+* @param records  Sub-grouped intermediate aggregate Rows .
+* @return Combined intermediate aggregate Row.
+*
+*/
+  override def combine(
+records: Iterable[Row],
+out: Collector[Row]): Unit = {
+
+var head:Row = null
+var lastRowTime: java.lang.Long = null
+var currentRowTime: java.lang.Long = null
+
+val iterator = records.iterator()
+
+while (iterator.hasNext) {
+  val record = iterator.next()
+currentRowTime = record.getField(rowTimePos).asInstanceOf[Long]
--- End diff --

wrong indention?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3150: [FLINK-4693][tableApi] Add session group-windows f...

2017-01-19 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3150#discussion_r96818855
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregateReduceGroupFunction.scala
 ---
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.lang.Iterable
+
+import org.apache.flink.api.common.functions.RichGroupReduceFunction
+import org.apache.flink.types.Row
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.util.{Collector, Preconditions}
+
+import scala.collection.JavaConversions._
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow
+
+/**
+  * It wraps the aggregate logic inside of
+  * [[org.apache.flink.api.java.operators.GroupReduceOperator]]. It is 
used for Session time-window
+  * on batch.
+  * Note:
+  *  This can handle two input types:
+  *  1. when partial aggregate is not supported, the input data structure 
of reduce is
+  *|groupKey1|groupKey2|sum1|count1|sum2|count2|rowTime|
+  *  2. when partial aggregate is supported, the input data structure of 
reduce is
+  *|groupKey1|groupKey2|sum1|count1|sum2|count2|windowStart|windowEnd|
+  *
+  * @param aggregates The aggregate functions.
+  * @param groupKeysMapping The index mapping of group keys between 
intermediate aggregate Row
+  * and output Row.
+  * @param aggregateMapping The index mapping between aggregate function 
list and aggregated value
+  * index in output Row.
+  * @param intermediateRowArity The intermediate row field count.
+  * @param finalRowArity The output row field count.
+  * @param finalRowWindowStartPos The relative window-start field position.
+  * @param finalRowWindowEndPos The relative window-end field position.
+  * @param gap Session time window gap.
+  */
+class DataSetSessionWindowAggregateReduceGroupFunction(
+aggregates: Array[Aggregate[_ <: Any]],
+groupKeysMapping: Array[(Int, Int)],
+aggregateMapping: Array[(Int, Int)],
+intermediateRowArity: Int,
+finalRowArity: Int,
+finalRowWindowStartPos: Option[Int],
+finalRowWindowEndPos: Option[Int],
+gap:Long)
+  extends RichGroupReduceFunction[Row, Row] {
+
+  private var aggregateBuffer: Row = _
+  private var output: Row = _
+  private var collector: TimeWindowPropertyCollector = _
+  private var intermediateRowWindowStartPos = 0
+  private var intermediateRowWindowEndPos = 0
+
+  override def open(config: Configuration) {
+Preconditions.checkNotNull(aggregates)
+Preconditions.checkNotNull(groupKeysMapping)
+aggregateBuffer = new Row(intermediateRowArity)
+intermediateRowWindowStartPos = intermediateRowArity - 2
+intermediateRowWindowEndPos = intermediateRowArity - 1
+output = new Row(finalRowArity)
+collector = new TimeWindowPropertyCollector(finalRowWindowStartPos, 
finalRowWindowEndPos)
+  }
+
+  /**
+* For grouped intermediate aggregate Rows, divide window according to 
the window-start
+* and window-end, merge data (within a unified window) into an 
aggregate buffer, calculate
+* aggregated values output from aggregate buffer, and then set them 
into output
+* Row based on the mapping relationship between intermediate aggregate 
data and output data.
+*
+* @param records Grouped intermediate aggregate Rows iterator.
+* @param out The collector to hand results to.
+*
+*/
+  override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = 
{
+
+var last: Row = null
+var head: Row = null
+var lastWindowEnd: java.lang.Long = null
+var currentWindowStart:java.lang.Long  = null
+
+val iterator = records.iterator()
+

[jira] [Commented] (FLINK-5512) RabbitMQ documentation should inform that exactly-once holds for RMQSource only when parallelism is 1

2017-01-19 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15829607#comment-15829607
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-5512:


Resolved for master with bf72a7c028cdb189c8646720ae6f7bbd1d301749.
Resolved for 1.2 with 11ce80bc557308cf891152e3fe66e124471a6bf6.

> RabbitMQ documentation should inform that exactly-once holds for RMQSource 
> only when parallelism is 1  
> ---
>
> Key: FLINK-5512
> URL: https://issues.apache.org/jira/browse/FLINK-5512
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.2.0
>
>
> See here for the reasoning: FLINK-2624. We should add an informative warning 
> about this limitation in the docs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4693) Add session group-windows for batch tables

2017-01-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15829596#comment-15829596
 ] 

ASF GitHub Bot commented on FLINK-4693:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3150#discussion_r96817585
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregateCombineGroupFunction.scala
 ---
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.lang.Iterable
+
+import org.apache.flink.api.common.functions.RichGroupCombineFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.types.Row
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.util.{Collector, Preconditions}
+
+import scala.collection.JavaConversions._
+
+/**
+  * This wraps the aggregate logic inside of
+  * [[org.apache.flink.api.java.operators.GroupCombineOperator]].
+  *
+  * @param aggregates The aggregate functions.
+  * @param groupingKeys
+  * @param intermediateRowArity The intermediate row field count.
+  * @param gap  Session time window gap.
+  * @param intermediateRowType Intermediate row data type.
+  */
+class DataSetSessionWindowAggregateCombineGroupFunction(
+aggregates: Array[Aggregate[_ <: Any]],
+groupingKeys: Array[Int],
+intermediateRowArity: Int,
+gap: Long,
+@transient intermediateRowType: TypeInformation[Row])
+  extends RichGroupCombineFunction[Row,Row] with ResultTypeQueryable[Row] {
+
+  private var aggregateBuffer: Row = _
+  private var rowTimePos = 0
+
+  override def open(config: Configuration) {
+Preconditions.checkNotNull(aggregates)
+Preconditions.checkNotNull(groupingKeys)
+aggregateBuffer = new Row(intermediateRowArity)
+rowTimePos = intermediateRowArity - 2
+  }
+
+  /**
+* For sub-grouped intermediate aggregate Rows, divide window based on 
the row-time
+* (current'row-time - previous’row-time > gap), and then merge data 
(within a unified window)
+* into an aggregate buffer.
+*
+* @param records  Sub-grouped intermediate aggregate Rows .
+* @return Combined intermediate aggregate Row.
+*
+*/
+  override def combine(
+records: Iterable[Row],
+out: Collector[Row]): Unit = {
+
+var head:Row = null
+var lastRowTime: java.lang.Long = null
+var currentRowTime: java.lang.Long = null
+
+val iterator = records.iterator()
+
+while (iterator.hasNext) {
+  val record = iterator.next()
+currentRowTime = record.getField(rowTimePos).asInstanceOf[Long]
+
+// initial traversal or opening a new window
+// the session window end is equal to last row-time + gap .
+if (null == lastRowTime ||
+  (null != lastRowTime && (currentRowTime > (lastRowTime + gap 
{
+
+  // calculate the current window and open a new window.
+  if (null != lastRowTime) {
+// emit the current window's merged data
+doCollect(out, head, lastRowTime)
+  } else {
+// set group keys to aggregateBuffer.
+for (i <- 0 until groupingKeys.length) {
+  aggregateBuffer.setField(i, record.getField(i))
+}
+  }
+
+  // initiate intermediate aggregate value.
+  aggregates.foreach(_.initiate(aggregateBuffer))
+  head = record
--- End diff --

Do not remember an object that you received from the combine (or reduce) 
iterator. The iterator may repeatedly serve the same mutable object. I

[jira] [Commented] (FLINK-4693) Add session group-windows for batch tables

2017-01-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15829600#comment-15829600
 ] 

ASF GitHub Bot commented on FLINK-4693:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3150#discussion_r96818876
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregateReduceGroupFunction.scala
 ---
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.lang.Iterable
+
+import org.apache.flink.api.common.functions.RichGroupReduceFunction
+import org.apache.flink.types.Row
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.util.{Collector, Preconditions}
+
+import scala.collection.JavaConversions._
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow
+
+/**
+  * It wraps the aggregate logic inside of
+  * [[org.apache.flink.api.java.operators.GroupReduceOperator]]. It is 
used for Session time-window
+  * on batch.
+  * Note:
+  *  This can handle two input types:
+  *  1. when partial aggregate is not supported, the input data structure 
of reduce is
+  *|groupKey1|groupKey2|sum1|count1|sum2|count2|rowTime|
+  *  2. when partial aggregate is supported, the input data structure of 
reduce is
+  *|groupKey1|groupKey2|sum1|count1|sum2|count2|windowStart|windowEnd|
+  *
+  * @param aggregates The aggregate functions.
+  * @param groupKeysMapping The index mapping of group keys between 
intermediate aggregate Row
+  * and output Row.
+  * @param aggregateMapping The index mapping between aggregate function 
list and aggregated value
+  * index in output Row.
+  * @param intermediateRowArity The intermediate row field count.
+  * @param finalRowArity The output row field count.
+  * @param finalRowWindowStartPos The relative window-start field position.
+  * @param finalRowWindowEndPos The relative window-end field position.
+  * @param gap Session time window gap.
+  */
+class DataSetSessionWindowAggregateReduceGroupFunction(
+aggregates: Array[Aggregate[_ <: Any]],
+groupKeysMapping: Array[(Int, Int)],
+aggregateMapping: Array[(Int, Int)],
+intermediateRowArity: Int,
+finalRowArity: Int,
+finalRowWindowStartPos: Option[Int],
+finalRowWindowEndPos: Option[Int],
+gap:Long)
+  extends RichGroupReduceFunction[Row, Row] {
+
+  private var aggregateBuffer: Row = _
+  private var output: Row = _
+  private var collector: TimeWindowPropertyCollector = _
+  private var intermediateRowWindowStartPos = 0
+  private var intermediateRowWindowEndPos = 0
+
+  override def open(config: Configuration) {
+Preconditions.checkNotNull(aggregates)
+Preconditions.checkNotNull(groupKeysMapping)
+aggregateBuffer = new Row(intermediateRowArity)
+intermediateRowWindowStartPos = intermediateRowArity - 2
+intermediateRowWindowEndPos = intermediateRowArity - 1
+output = new Row(finalRowArity)
+collector = new TimeWindowPropertyCollector(finalRowWindowStartPos, 
finalRowWindowEndPos)
+  }
+
+  /**
+* For grouped intermediate aggregate Rows, divide window according to 
the window-start
+* and window-end, merge data (within a unified window) into an 
aggregate buffer, calculate
+* aggregated values output from aggregate buffer, and then set them 
into output
+* Row based on the mapping relationship between intermediate aggregate 
data and output data.
+*
+* @param records Grouped intermediate aggregate Rows iterator.
+* @param out The collector to hand results to.
+*
+*/
+  override def reduce(records: Iterable[Row], ou

[jira] [Commented] (FLINK-4693) Add session group-windows for batch tables

2017-01-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15829601#comment-15829601
 ] 

ASF GitHub Bot commented on FLINK-4693:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3150#discussion_r96818855
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregateReduceGroupFunction.scala
 ---
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.lang.Iterable
+
+import org.apache.flink.api.common.functions.RichGroupReduceFunction
+import org.apache.flink.types.Row
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.util.{Collector, Preconditions}
+
+import scala.collection.JavaConversions._
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow
+
+/**
+  * It wraps the aggregate logic inside of
+  * [[org.apache.flink.api.java.operators.GroupReduceOperator]]. It is 
used for Session time-window
+  * on batch.
+  * Note:
+  *  This can handle two input types:
+  *  1. when partial aggregate is not supported, the input data structure 
of reduce is
+  *|groupKey1|groupKey2|sum1|count1|sum2|count2|rowTime|
+  *  2. when partial aggregate is supported, the input data structure of 
reduce is
+  *|groupKey1|groupKey2|sum1|count1|sum2|count2|windowStart|windowEnd|
+  *
+  * @param aggregates The aggregate functions.
+  * @param groupKeysMapping The index mapping of group keys between 
intermediate aggregate Row
+  * and output Row.
+  * @param aggregateMapping The index mapping between aggregate function 
list and aggregated value
+  * index in output Row.
+  * @param intermediateRowArity The intermediate row field count.
+  * @param finalRowArity The output row field count.
+  * @param finalRowWindowStartPos The relative window-start field position.
+  * @param finalRowWindowEndPos The relative window-end field position.
+  * @param gap Session time window gap.
+  */
+class DataSetSessionWindowAggregateReduceGroupFunction(
+aggregates: Array[Aggregate[_ <: Any]],
+groupKeysMapping: Array[(Int, Int)],
+aggregateMapping: Array[(Int, Int)],
+intermediateRowArity: Int,
+finalRowArity: Int,
+finalRowWindowStartPos: Option[Int],
+finalRowWindowEndPos: Option[Int],
+gap:Long)
+  extends RichGroupReduceFunction[Row, Row] {
+
+  private var aggregateBuffer: Row = _
+  private var output: Row = _
+  private var collector: TimeWindowPropertyCollector = _
+  private var intermediateRowWindowStartPos = 0
+  private var intermediateRowWindowEndPos = 0
+
+  override def open(config: Configuration) {
+Preconditions.checkNotNull(aggregates)
+Preconditions.checkNotNull(groupKeysMapping)
+aggregateBuffer = new Row(intermediateRowArity)
+intermediateRowWindowStartPos = intermediateRowArity - 2
+intermediateRowWindowEndPos = intermediateRowArity - 1
+output = new Row(finalRowArity)
+collector = new TimeWindowPropertyCollector(finalRowWindowStartPos, 
finalRowWindowEndPos)
+  }
+
+  /**
+* For grouped intermediate aggregate Rows, divide window according to 
the window-start
+* and window-end, merge data (within a unified window) into an 
aggregate buffer, calculate
+* aggregated values output from aggregate buffer, and then set them 
into output
+* Row based on the mapping relationship between intermediate aggregate 
data and output data.
+*
+* @param records Grouped intermediate aggregate Rows iterator.
+* @param out The collector to hand results to.
+*
+*/
+  override def reduce(records: Iterable[Row], ou

[jira] [Commented] (FLINK-4693) Add session group-windows for batch tables

2017-01-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15829597#comment-15829597
 ] 

ASF GitHub Bot commented on FLINK-4693:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3150#discussion_r96811063
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -226,27 +225,101 @@ object AggregateUtil {
   aggregates,
   groupingOffsetMapping,
   aggOffsetMapping,
-  intermediateRowArity,
+  intermediateRowArity + 1,// the addition one field is used to 
store time attribute
   outputType.getFieldCount)
+
+  case EventTimeSessionGroupWindow(_, _, gap) =>
+val (startPos, endPos) = if (isTimeWindow(window)) {
--- End diff --

Aren't session windows always time windows?


> Add session group-windows for batch tables
> ---
>
> Key: FLINK-4693
> URL: https://issues.apache.org/jira/browse/FLINK-4693
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: sunjincheng
>
> Add Session group-windows for batch tables as described in 
> [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations].
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4693) Add session group-windows for batch tables

2017-01-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15829603#comment-15829603
 ] 

ASF GitHub Bot commented on FLINK-4693:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3150#discussion_r96816754
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregateCombineGroupFunction.scala
 ---
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.lang.Iterable
+
+import org.apache.flink.api.common.functions.RichGroupCombineFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.types.Row
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.util.{Collector, Preconditions}
+
+import scala.collection.JavaConversions._
+
+/**
+  * This wraps the aggregate logic inside of
+  * [[org.apache.flink.api.java.operators.GroupCombineOperator]].
+  *
+  * @param aggregates The aggregate functions.
+  * @param groupingKeys
+  * @param intermediateRowArity The intermediate row field count.
+  * @param gap  Session time window gap.
+  * @param intermediateRowType Intermediate row data type.
+  */
+class DataSetSessionWindowAggregateCombineGroupFunction(
+aggregates: Array[Aggregate[_ <: Any]],
+groupingKeys: Array[Int],
+intermediateRowArity: Int,
+gap: Long,
+@transient intermediateRowType: TypeInformation[Row])
+  extends RichGroupCombineFunction[Row,Row] with ResultTypeQueryable[Row] {
+
+  private var aggregateBuffer: Row = _
+  private var rowTimePos = 0
+
+  override def open(config: Configuration) {
+Preconditions.checkNotNull(aggregates)
+Preconditions.checkNotNull(groupingKeys)
+aggregateBuffer = new Row(intermediateRowArity)
+rowTimePos = intermediateRowArity - 2
+  }
+
+  /**
+* For sub-grouped intermediate aggregate Rows, divide window based on 
the row-time
+* (current'row-time - previous’row-time > gap), and then merge data 
(within a unified window)
+* into an aggregate buffer.
+*
+* @param records  Sub-grouped intermediate aggregate Rows .
+* @return Combined intermediate aggregate Row.
+*
+*/
+  override def combine(
+records: Iterable[Row],
+out: Collector[Row]): Unit = {
+
+var head:Row = null
--- End diff --

add space


> Add session group-windows for batch tables
> ---
>
> Key: FLINK-4693
> URL: https://issues.apache.org/jira/browse/FLINK-4693
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: sunjincheng
>
> Add Session group-windows for batch tables as described in 
> [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations].
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4693) Add session group-windows for batch tables

2017-01-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15829604#comment-15829604
 ] 

ASF GitHub Bot commented on FLINK-4693:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3150#discussion_r96820016
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregateReduceGroupFunction.scala
 ---
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.lang.Iterable
+
+import org.apache.flink.api.common.functions.RichGroupReduceFunction
+import org.apache.flink.types.Row
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.util.{Collector, Preconditions}
+
+import scala.collection.JavaConversions._
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow
+
+/**
+  * It wraps the aggregate logic inside of
+  * [[org.apache.flink.api.java.operators.GroupReduceOperator]]. It is 
used for Session time-window
+  * on batch.
+  * Note:
+  *  This can handle two input types:
+  *  1. when partial aggregate is not supported, the input data structure 
of reduce is
+  *|groupKey1|groupKey2|sum1|count1|sum2|count2|rowTime|
+  *  2. when partial aggregate is supported, the input data structure of 
reduce is
+  *|groupKey1|groupKey2|sum1|count1|sum2|count2|windowStart|windowEnd|
+  *
+  * @param aggregates The aggregate functions.
+  * @param groupKeysMapping The index mapping of group keys between 
intermediate aggregate Row
+  * and output Row.
+  * @param aggregateMapping The index mapping between aggregate function 
list and aggregated value
+  * index in output Row.
+  * @param intermediateRowArity The intermediate row field count.
+  * @param finalRowArity The output row field count.
+  * @param finalRowWindowStartPos The relative window-start field position.
+  * @param finalRowWindowEndPos The relative window-end field position.
+  * @param gap Session time window gap.
+  */
+class DataSetSessionWindowAggregateReduceGroupFunction(
+aggregates: Array[Aggregate[_ <: Any]],
+groupKeysMapping: Array[(Int, Int)],
+aggregateMapping: Array[(Int, Int)],
+intermediateRowArity: Int,
+finalRowArity: Int,
+finalRowWindowStartPos: Option[Int],
+finalRowWindowEndPos: Option[Int],
+gap:Long)
+  extends RichGroupReduceFunction[Row, Row] {
+
+  private var aggregateBuffer: Row = _
+  private var output: Row = _
+  private var collector: TimeWindowPropertyCollector = _
+  private var intermediateRowWindowStartPos = 0
+  private var intermediateRowWindowEndPos = 0
+
+  override def open(config: Configuration) {
+Preconditions.checkNotNull(aggregates)
+Preconditions.checkNotNull(groupKeysMapping)
+aggregateBuffer = new Row(intermediateRowArity)
+intermediateRowWindowStartPos = intermediateRowArity - 2
+intermediateRowWindowEndPos = intermediateRowArity - 1
+output = new Row(finalRowArity)
+collector = new TimeWindowPropertyCollector(finalRowWindowStartPos, 
finalRowWindowEndPos)
+  }
+
+  /**
+* For grouped intermediate aggregate Rows, divide window according to 
the window-start
+* and window-end, merge data (within a unified window) into an 
aggregate buffer, calculate
+* aggregated values output from aggregate buffer, and then set them 
into output
+* Row based on the mapping relationship between intermediate aggregate 
data and output data.
+*
+* @param records Grouped intermediate aggregate Rows iterator.
+* @param out The collector to hand results to.
+*
+*/
+  override def reduce(records: Iterable[Row], ou

[jira] [Commented] (FLINK-4693) Add session group-windows for batch tables

2017-01-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15829599#comment-15829599
 ] 

ASF GitHub Bot commented on FLINK-4693:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3150#discussion_r96815166
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregateCombineGroupFunction.scala
 ---
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.lang.Iterable
+
+import org.apache.flink.api.common.functions.RichGroupCombineFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.types.Row
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.util.{Collector, Preconditions}
+
+import scala.collection.JavaConversions._
+
+/**
+  * This wraps the aggregate logic inside of
+  * [[org.apache.flink.api.java.operators.GroupCombineOperator]].
+  *
+  * @param aggregates The aggregate functions.
+  * @param groupingKeys
+  * @param intermediateRowArity The intermediate row field count.
+  * @param gap  Session time window gap.
+  * @param intermediateRowType Intermediate row data type.
+  */
+class DataSetSessionWindowAggregateCombineGroupFunction(
+aggregates: Array[Aggregate[_ <: Any]],
+groupingKeys: Array[Int],
+intermediateRowArity: Int,
+gap: Long,
+@transient intermediateRowType: TypeInformation[Row])
+  extends RichGroupCombineFunction[Row,Row] with ResultTypeQueryable[Row] {
+
+  private var aggregateBuffer: Row = _
+  private var rowTimePos = 0
+
+  override def open(config: Configuration) {
+Preconditions.checkNotNull(aggregates)
+Preconditions.checkNotNull(groupingKeys)
+aggregateBuffer = new Row(intermediateRowArity)
+rowTimePos = intermediateRowArity - 2
+  }
+
+  /**
+* For sub-grouped intermediate aggregate Rows, divide window based on 
the row-time
+* (current'row-time - previous’row-time > gap), and then merge data 
(within a unified window)
+* into an aggregate buffer.
+*
+* @param records  Sub-grouped intermediate aggregate Rows .
+* @return Combined intermediate aggregate Row.
+*
+*/
+  override def combine(
+records: Iterable[Row],
+out: Collector[Row]): Unit = {
+
+var head:Row = null
+var lastRowTime: java.lang.Long = null
+var currentRowTime: java.lang.Long = null
+
+val iterator = records.iterator()
+
+while (iterator.hasNext) {
+  val record = iterator.next()
+currentRowTime = record.getField(rowTimePos).asInstanceOf[Long]
--- End diff --

wrong indention?


> Add session group-windows for batch tables
> ---
>
> Key: FLINK-4693
> URL: https://issues.apache.org/jira/browse/FLINK-4693
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: sunjincheng
>
> Add Session group-windows for batch tables as described in 
> [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations].
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4693) Add session group-windows for batch tables

2017-01-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15829595#comment-15829595
 ] 

ASF GitHub Bot commented on FLINK-4693:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3150#discussion_r96810423
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala
 ---
@@ -218,6 +216,85 @@ class DataSetWindowAggregate(
 }
   }
 
+  private[this] def createEventTimeSessionWindowDataSet(
+inputDS: DataSet[Any],
+isParserCaseSensitive: Boolean): DataSet[Any] = {
+
+val groupingKeys = grouping.indices.toArray
+val rowTypeInfo = resultRowTypeInfo
+
+// grouping window
+if (groupingKeys.length > 0) {
+  //create mapFunction for initializing the aggregations
+  val mapFunction = createDataSetWindowPrepareMapFunction(
+window,
+namedAggregates,
+grouping,
+inputType,isParserCaseSensitive)
--- End diff --

wrap last argument as well


> Add session group-windows for batch tables
> ---
>
> Key: FLINK-4693
> URL: https://issues.apache.org/jira/browse/FLINK-4693
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: sunjincheng
>
> Add Session group-windows for batch tables as described in 
> [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations].
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4693) Add session group-windows for batch tables

2017-01-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15829605#comment-15829605
 ] 

ASF GitHub Bot commented on FLINK-4693:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3150#discussion_r96819808
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregateReduceGroupFunction.scala
 ---
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.lang.Iterable
+
+import org.apache.flink.api.common.functions.RichGroupReduceFunction
+import org.apache.flink.types.Row
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.util.{Collector, Preconditions}
+
+import scala.collection.JavaConversions._
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow
+
+/**
+  * It wraps the aggregate logic inside of
+  * [[org.apache.flink.api.java.operators.GroupReduceOperator]]. It is 
used for Session time-window
+  * on batch.
+  * Note:
+  *  This can handle two input types:
+  *  1. when partial aggregate is not supported, the input data structure 
of reduce is
+  *|groupKey1|groupKey2|sum1|count1|sum2|count2|rowTime|
+  *  2. when partial aggregate is supported, the input data structure of 
reduce is
+  *|groupKey1|groupKey2|sum1|count1|sum2|count2|windowStart|windowEnd|
+  *
+  * @param aggregates The aggregate functions.
+  * @param groupKeysMapping The index mapping of group keys between 
intermediate aggregate Row
+  * and output Row.
+  * @param aggregateMapping The index mapping between aggregate function 
list and aggregated value
+  * index in output Row.
+  * @param intermediateRowArity The intermediate row field count.
+  * @param finalRowArity The output row field count.
+  * @param finalRowWindowStartPos The relative window-start field position.
+  * @param finalRowWindowEndPos The relative window-end field position.
+  * @param gap Session time window gap.
+  */
+class DataSetSessionWindowAggregateReduceGroupFunction(
+aggregates: Array[Aggregate[_ <: Any]],
+groupKeysMapping: Array[(Int, Int)],
+aggregateMapping: Array[(Int, Int)],
+intermediateRowArity: Int,
+finalRowArity: Int,
+finalRowWindowStartPos: Option[Int],
+finalRowWindowEndPos: Option[Int],
+gap:Long)
+  extends RichGroupReduceFunction[Row, Row] {
+
+  private var aggregateBuffer: Row = _
+  private var output: Row = _
+  private var collector: TimeWindowPropertyCollector = _
+  private var intermediateRowWindowStartPos = 0
+  private var intermediateRowWindowEndPos = 0
+
+  override def open(config: Configuration) {
+Preconditions.checkNotNull(aggregates)
+Preconditions.checkNotNull(groupKeysMapping)
+aggregateBuffer = new Row(intermediateRowArity)
+intermediateRowWindowStartPos = intermediateRowArity - 2
+intermediateRowWindowEndPos = intermediateRowArity - 1
+output = new Row(finalRowArity)
+collector = new TimeWindowPropertyCollector(finalRowWindowStartPos, 
finalRowWindowEndPos)
+  }
+
+  /**
+* For grouped intermediate aggregate Rows, divide window according to 
the window-start
+* and window-end, merge data (within a unified window) into an 
aggregate buffer, calculate
+* aggregated values output from aggregate buffer, and then set them 
into output
+* Row based on the mapping relationship between intermediate aggregate 
data and output data.
+*
+* @param records Grouped intermediate aggregate Rows iterator.
+* @param out The collector to hand results to.
+*
+*/
+  override def reduce(records: Iterable[Row], ou

[jira] [Commented] (FLINK-4693) Add session group-windows for batch tables

2017-01-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15829606#comment-15829606
 ] 

ASF GitHub Bot commented on FLINK-4693:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3150#discussion_r96821333
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala
 ---
@@ -218,6 +216,85 @@ class DataSetWindowAggregate(
 }
   }
 
+  private[this] def createEventTimeSessionWindowDataSet(
+inputDS: DataSet[Any],
+isParserCaseSensitive: Boolean): DataSet[Any] = {
+
+val groupingKeys = grouping.indices.toArray
+val rowTypeInfo = resultRowTypeInfo
+
+// grouping window
+if (groupingKeys.length > 0) {
+  //create mapFunction for initializing the aggregations
+  val mapFunction = createDataSetWindowPrepareMapFunction(
+window,
+namedAggregates,
+grouping,
+inputType,isParserCaseSensitive)
+
+  // create groupReduceFunction for calculating the aggregations
+  val groupReduceFunction = 
createDataSetWindowAggregationGroupReduceFunction(
+window,
+namedAggregates,
+inputType,
+rowRelDataType,
+grouping,
+namedProperties)
+
+  val mappedInput =
+inputDS
+.map(mapFunction)
+.name(prepareOperatorName)
+
+  val mapReturnType = 
mapFunction.asInstanceOf[ResultTypeQueryable[Row]].getProducedType
+
+  // the position of the rowtime field in the intermediate result for 
map output
+  val rowTimeFilePos = mapReturnType.getArity - 1
--- End diff --

should be `rowTimeFieldPos`?


> Add session group-windows for batch tables
> ---
>
> Key: FLINK-4693
> URL: https://issues.apache.org/jira/browse/FLINK-4693
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: sunjincheng
>
> Add Session group-windows for batch tables as described in 
> [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations].
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4693) Add session group-windows for batch tables

2017-01-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15829598#comment-15829598
 ] 

ASF GitHub Bot commented on FLINK-4693:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3150#discussion_r96812620
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetWindowAggregateITCase.scala
 ---
@@ -119,4 +119,35 @@ class DataSetWindowAggregateITCase(
 val results = windowedTable.toDataSet[Row].collect()
 TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
+
+  @Test
+  def testEventTimeSessionGroupWindow(): Unit = {
+val env = ExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+val table = env.fromCollection(data).toTable(tEnv, 'long, 'int, 
'string)
+val windowedTable = table
+  .groupBy('string)
+  .window(Session withGap 7.milli on 'long as 'w)
+  .select('string, 'string.count, 'w.start, 'w.end)
+
+val results = windowedTable.toDataSet[Row].collect()
+
+val expected = "Hallo,1,1970-01-01 00:00:00.002,1970-01-01 
00:00:00.009\nHello world,1," +
--- End diff --

can you break the lines at the end of the records? Makes it easier to read 
the expected data.


> Add session group-windows for batch tables
> ---
>
> Key: FLINK-4693
> URL: https://issues.apache.org/jira/browse/FLINK-4693
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: sunjincheng
>
> Add Session group-windows for batch tables as described in 
> [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations].
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-5512) RabbitMQ documentation should inform that exactly-once holds for RMQSource only when parallelism is 1

2017-01-19 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5512?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai resolved FLINK-5512.

Resolution: Fixed

> RabbitMQ documentation should inform that exactly-once holds for RMQSource 
> only when parallelism is 1  
> ---
>
> Key: FLINK-5512
> URL: https://issues.apache.org/jira/browse/FLINK-5512
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.2.0
>
>
> See here for the reasoning: FLINK-2624. We should add an informative warning 
> about this limitation in the docs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4693) Add session group-windows for batch tables

2017-01-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15829594#comment-15829594
 ] 

ASF GitHub Bot commented on FLINK-4693:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3150#discussion_r96810332
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala
 ---
@@ -128,10 +128,8 @@ class DataSetWindowAggregate(
   inputDS,
   isTimeInterval(size.resultType),
   caseSensitive)
-
-  case EventTimeSessionGroupWindow(_, _, _) =>
-throw new UnsupportedOperationException(
-  "Event-time session windows in a batch environment are currently 
not supported")
+  case EventTimeSessionGroupWindow(_, _, gap) =>
+createEventTimeSessionWindowDataSet(inputDS,caseSensitive)
--- End diff --

add space: `(inputDS, caseSensitive)`


> Add session group-windows for batch tables
> ---
>
> Key: FLINK-4693
> URL: https://issues.apache.org/jira/browse/FLINK-4693
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: sunjincheng
>
> Add Session group-windows for batch tables as described in 
> [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations].
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4693) Add session group-windows for batch tables

2017-01-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15829602#comment-15829602
 ] 

ASF GitHub Bot commented on FLINK-4693:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3150#discussion_r96820389
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregateReduceGroupFunction.scala
 ---
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.lang.Iterable
+
+import org.apache.flink.api.common.functions.RichGroupReduceFunction
+import org.apache.flink.types.Row
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.util.{Collector, Preconditions}
+
+import scala.collection.JavaConversions._
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow
+
+/**
+  * It wraps the aggregate logic inside of
+  * [[org.apache.flink.api.java.operators.GroupReduceOperator]]. It is 
used for Session time-window
+  * on batch.
+  * Note:
+  *  This can handle two input types:
+  *  1. when partial aggregate is not supported, the input data structure 
of reduce is
+  *|groupKey1|groupKey2|sum1|count1|sum2|count2|rowTime|
+  *  2. when partial aggregate is supported, the input data structure of 
reduce is
+  *|groupKey1|groupKey2|sum1|count1|sum2|count2|windowStart|windowEnd|
+  *
+  * @param aggregates The aggregate functions.
+  * @param groupKeysMapping The index mapping of group keys between 
intermediate aggregate Row
+  * and output Row.
+  * @param aggregateMapping The index mapping between aggregate function 
list and aggregated value
+  * index in output Row.
+  * @param intermediateRowArity The intermediate row field count.
+  * @param finalRowArity The output row field count.
+  * @param finalRowWindowStartPos The relative window-start field position.
+  * @param finalRowWindowEndPos The relative window-end field position.
+  * @param gap Session time window gap.
+  */
+class DataSetSessionWindowAggregateReduceGroupFunction(
+aggregates: Array[Aggregate[_ <: Any]],
+groupKeysMapping: Array[(Int, Int)],
+aggregateMapping: Array[(Int, Int)],
+intermediateRowArity: Int,
+finalRowArity: Int,
+finalRowWindowStartPos: Option[Int],
+finalRowWindowEndPos: Option[Int],
+gap:Long)
+  extends RichGroupReduceFunction[Row, Row] {
+
+  private var aggregateBuffer: Row = _
+  private var output: Row = _
+  private var collector: TimeWindowPropertyCollector = _
+  private var intermediateRowWindowStartPos = 0
+  private var intermediateRowWindowEndPos = 0
+
+  override def open(config: Configuration) {
+Preconditions.checkNotNull(aggregates)
+Preconditions.checkNotNull(groupKeysMapping)
+aggregateBuffer = new Row(intermediateRowArity)
+intermediateRowWindowStartPos = intermediateRowArity - 2
+intermediateRowWindowEndPos = intermediateRowArity - 1
+output = new Row(finalRowArity)
+collector = new TimeWindowPropertyCollector(finalRowWindowStartPos, 
finalRowWindowEndPos)
+  }
+
+  /**
+* For grouped intermediate aggregate Rows, divide window according to 
the window-start
+* and window-end, merge data (within a unified window) into an 
aggregate buffer, calculate
+* aggregated values output from aggregate buffer, and then set them 
into output
+* Row based on the mapping relationship between intermediate aggregate 
data and output data.
+*
+* @param records Grouped intermediate aggregate Rows iterator.
+* @param out The collector to hand results to.
+*
+*/
+  override def reduce(records: Iterable[Row], ou

[jira] [Commented] (FLINK-4683) Add SlideRow row-windows for batch tables

2017-01-19 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15829626#comment-15829626
 ] 

Fabian Hueske commented on FLINK-4683:
--

Hi [~sunjincheng121], thanks for picking up this issue.
FLIP-11 mainly describes the API for this feature, not the internal 
implementation.

So far none of the row windows (FLINK-4678 and FLINK-4679) has been implemented.
The other issues are assigned to [~jark] and [~twalthr].

I think we should first agree on a joint architecture before starting with an 
implementation. 
We also should keep in mind, that some types of row windows are already 
supported by Calcite (as SQL OVER). Ideally, the execution code can be reused 
to process SQL windows as well.

What do you think [~sunjincheng121], [~jark], [~twalthr]?

> Add SlideRow row-windows for batch tables
> -
>
> Key: FLINK-4683
> URL: https://issues.apache.org/jira/browse/FLINK-4683
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: sunjincheng
>
> Add SlideRow row-windows for batch tables as described in 
> [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations].
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #3150: [FLINK-4693][tableApi] Add session group-windows f...

2017-01-19 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3150#discussion_r96825508
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -226,27 +225,101 @@ object AggregateUtil {
   aggregates,
   groupingOffsetMapping,
   aggOffsetMapping,
-  intermediateRowArity,
+  intermediateRowArity + 1,// the addition one field is used to 
store time attribute
   outputType.getFieldCount)
+
+  case EventTimeSessionGroupWindow(_, _, gap) =>
+val (startPos, endPos) = if (isTimeWindow(window)) {
--- End diff --

Good catching!! 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4693) Add session group-windows for batch tables

2017-01-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15829628#comment-15829628
 ] 

ASF GitHub Bot commented on FLINK-4693:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3150#discussion_r96825508
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -226,27 +225,101 @@ object AggregateUtil {
   aggregates,
   groupingOffsetMapping,
   aggOffsetMapping,
-  intermediateRowArity,
+  intermediateRowArity + 1,// the addition one field is used to 
store time attribute
   outputType.getFieldCount)
+
+  case EventTimeSessionGroupWindow(_, _, gap) =>
+val (startPos, endPos) = if (isTimeWindow(window)) {
--- End diff --

Good catching!! 


> Add session group-windows for batch tables
> ---
>
> Key: FLINK-4693
> URL: https://issues.apache.org/jira/browse/FLINK-4693
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: sunjincheng
>
> Add Session group-windows for batch tables as described in 
> [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations].
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #3149: FLINK-2168 Add HBaseTableSource

2017-01-19 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3149#discussion_r96643785
  
--- Diff: 
flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java
 ---
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.addons.hbase;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.table.sources.BatchTableSource;
+import org.apache.flink.table.sources.ProjectableTableSource;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Creates a table source that helps to scan data from an hbase table
+ *
+ * Note : the colNames are specified along with a familyName and they are 
seperated by a ':'
+ * For eg, cf1:q1 - where cf1 is the familyName and q1 is the qualifier 
name
+ */
+public class HBaseTableSource implements BatchTableSource, 
ProjectableTableSource {
+
+   private Configuration conf;
+   private String tableName;
+   private byte[] rowKey;
+   private String[] colNames;
+   private TypeInformation[] colTypes;
+
+   public HBaseTableSource(Configuration conf, String tableName, byte[] 
rowKey, String[] colNames,
+   TypeInformation[] 
colTypes) {
+   this.conf = conf;
+   this.tableName = Preconditions.checkNotNull(tableName, "Table  
name");
+   this.rowKey = Preconditions.checkNotNull(rowKey, "Rowkey");
--- End diff --

What is the rowKey used for ?  I think we can remove it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3149: FLINK-2168 Add HBaseTableSource

2017-01-19 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3149#discussion_r96823925
  
--- Diff: 
flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSourceInputFormat.java
 ---
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.addons.hbase;
+
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.io.LocatableInputSplitAssigner;
+import org.apache.flink.api.common.io.RichInputFormat;
+import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.InputSplitAssigner;
+import org.apache.flink.types.Row;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.ClusterConnection;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.HRegionLocator;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+/**
+ * {@link InputFormat} subclass that wraps the access for HTables. Returns 
the result as {@link Row}
+ */
+public class HBaseTableSourceInputFormat extends RichInputFormat implements ResultTypeQueryable {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(HBaseTableSourceInputFormat.class);
+   private String tableName;
+   private TypeInformation[] fieldTypeInfos;
+   private String[] fieldNames;
+   private transient Table table;
+   private transient Scan scan;
+   private transient Connection conn;
+   private ResultScanner resultScanner = null;
+
+   private byte[] lastRow;
+   private int scannedRows;
+   private boolean endReached = false;
+   private org.apache.hadoop.conf.Configuration conf;
+   private static final String COLON = ":";
+
+   public HBaseTableSourceInputFormat(org.apache.hadoop.conf.Configuration 
conf, String tableName, String[] fieldNames, TypeInformation[] fieldTypeInfos) {
+   this.conf = conf;
+   this.tableName = tableName;
+   this.fieldNames = fieldNames;
+   this.fieldTypeInfos = fieldTypeInfos;
+   }
+
+   @Override
+   public void configure(Configuration parameters) {
+   LOG.info("Initializing HBaseConfiguration");
+   connectToTable();
+   if(table != null) {
+   scan = createScanner();
+   }
+   }
+
+   private Scan createScanner() {
+   Scan scan = new Scan();
+   for(String field : fieldNames) {
+   // select only the fields in the 'selectedFields'
+   String[] famCol = field.split(COLON);
+   scan.addColumn(Bytes.toBytes(famCol[0]), 
Bytes.toBytes(famCol[1]));
+   }
+   return scan;
+   }
+
+   private void connectToTable() {
+   //use files found in the classpath
+   if(this.conf == null) {
+   this.conf = HBaseConfiguration.create();
+   

[GitHub] flink pull request #3149: FLINK-2168 Add HBaseTableSource

2017-01-19 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3149#discussion_r96824021
  
--- Diff: 
flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSourceInputFormat.java
 ---
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.addons.hbase;
+
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.io.LocatableInputSplitAssigner;
+import org.apache.flink.api.common.io.RichInputFormat;
+import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.InputSplitAssigner;
+import org.apache.flink.types.Row;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.ClusterConnection;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.HRegionLocator;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+/**
+ * {@link InputFormat} subclass that wraps the access for HTables. Returns 
the result as {@link Row}
+ */
+public class HBaseTableSourceInputFormat extends RichInputFormat implements ResultTypeQueryable {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(HBaseTableSourceInputFormat.class);
+   private String tableName;
+   private TypeInformation[] fieldTypeInfos;
+   private String[] fieldNames;
+   private transient Table table;
+   private transient Scan scan;
+   private transient Connection conn;
+   private ResultScanner resultScanner = null;
+
+   private byte[] lastRow;
+   private int scannedRows;
+   private boolean endReached = false;
+   private org.apache.hadoop.conf.Configuration conf;
+   private static final String COLON = ":";
+
+   public HBaseTableSourceInputFormat(org.apache.hadoop.conf.Configuration 
conf, String tableName, String[] fieldNames, TypeInformation[] fieldTypeInfos) {
+   this.conf = conf;
+   this.tableName = tableName;
+   this.fieldNames = fieldNames;
+   this.fieldTypeInfos = fieldTypeInfos;
+   }
+
+   @Override
+   public void configure(Configuration parameters) {
+   LOG.info("Initializing HBaseConfiguration");
+   connectToTable();
+   if(table != null) {
+   scan = createScanner();
+   }
+   }
+
+   private Scan createScanner() {
+   Scan scan = new Scan();
+   for(String field : fieldNames) {
+   // select only the fields in the 'selectedFields'
+   String[] famCol = field.split(COLON);
+   scan.addColumn(Bytes.toBytes(famCol[0]), 
Bytes.toBytes(famCol[1]));
+   }
+   return scan;
+   }
+
+   private void connectToTable() {
+   //use files found in the classpath
+   if(this.conf == null) {
+   this.conf = HBaseConfiguration.create();
+   

[jira] [Commented] (FLINK-2168) Add HBaseTableSource

2017-01-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15829631#comment-15829631
 ] 

ASF GitHub Bot commented on FLINK-2168:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3149#discussion_r96823925
  
--- Diff: 
flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSourceInputFormat.java
 ---
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.addons.hbase;
+
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.io.LocatableInputSplitAssigner;
+import org.apache.flink.api.common.io.RichInputFormat;
+import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.InputSplitAssigner;
+import org.apache.flink.types.Row;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.ClusterConnection;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.HRegionLocator;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+/**
+ * {@link InputFormat} subclass that wraps the access for HTables. Returns 
the result as {@link Row}
+ */
+public class HBaseTableSourceInputFormat extends RichInputFormat implements ResultTypeQueryable {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(HBaseTableSourceInputFormat.class);
+   private String tableName;
+   private TypeInformation[] fieldTypeInfos;
+   private String[] fieldNames;
+   private transient Table table;
+   private transient Scan scan;
+   private transient Connection conn;
+   private ResultScanner resultScanner = null;
+
+   private byte[] lastRow;
+   private int scannedRows;
+   private boolean endReached = false;
+   private org.apache.hadoop.conf.Configuration conf;
+   private static final String COLON = ":";
+
+   public HBaseTableSourceInputFormat(org.apache.hadoop.conf.Configuration 
conf, String tableName, String[] fieldNames, TypeInformation[] fieldTypeInfos) {
+   this.conf = conf;
+   this.tableName = tableName;
+   this.fieldNames = fieldNames;
+   this.fieldTypeInfos = fieldTypeInfos;
+   }
+
+   @Override
+   public void configure(Configuration parameters) {
+   LOG.info("Initializing HBaseConfiguration");
+   connectToTable();
+   if(table != null) {
+   scan = createScanner();
+   }
+   }
+
+   private Scan createScanner() {
+   Scan scan = new Scan();
+   for(String field : fieldNames) {
+   // select only the fields in the 'selectedFields'
+   String[] famCol = field.split(COLON);
+   scan.addColumn(Bytes.toBytes(famCol[0]), 
Bytes.toBytes(famCol[1]));
+  

[jira] [Commented] (FLINK-2168) Add HBaseTableSource

2017-01-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15829629#comment-15829629
 ] 

ASF GitHub Bot commented on FLINK-2168:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3149#discussion_r96643785
  
--- Diff: 
flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java
 ---
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.addons.hbase;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.table.sources.BatchTableSource;
+import org.apache.flink.table.sources.ProjectableTableSource;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Creates a table source that helps to scan data from an hbase table
+ *
+ * Note : the colNames are specified along with a familyName and they are 
seperated by a ':'
+ * For eg, cf1:q1 - where cf1 is the familyName and q1 is the qualifier 
name
+ */
+public class HBaseTableSource implements BatchTableSource, 
ProjectableTableSource {
+
+   private Configuration conf;
+   private String tableName;
+   private byte[] rowKey;
+   private String[] colNames;
+   private TypeInformation[] colTypes;
+
+   public HBaseTableSource(Configuration conf, String tableName, byte[] 
rowKey, String[] colNames,
+   TypeInformation[] 
colTypes) {
+   this.conf = conf;
+   this.tableName = Preconditions.checkNotNull(tableName, "Table  
name");
+   this.rowKey = Preconditions.checkNotNull(rowKey, "Rowkey");
--- End diff --

What is the rowKey used for ?  I think we can remove it.


> Add HBaseTableSource
> 
>
> Key: FLINK-2168
> URL: https://issues.apache.org/jira/browse/FLINK-2168
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 0.9
>Reporter: Fabian Hueske
>Assignee: ramkrishna.s.vasudevan
>Priority: Minor
>  Labels: starter
>
> Add a {{HBaseTableSource}} to read data from a HBase table. The 
> {{HBaseTableSource}} should implement the {{ProjectableTableSource}} 
> (FLINK-3848) and {{FilterableTableSource}} (FLINK-3849) interfaces.
> The implementation can be based on Flink's {{TableInputFormat}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2168) Add HBaseTableSource

2017-01-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15829630#comment-15829630
 ] 

ASF GitHub Bot commented on FLINK-2168:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3149#discussion_r96824021
  
--- Diff: 
flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSourceInputFormat.java
 ---
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.addons.hbase;
+
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.io.LocatableInputSplitAssigner;
+import org.apache.flink.api.common.io.RichInputFormat;
+import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.InputSplitAssigner;
+import org.apache.flink.types.Row;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.ClusterConnection;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.HRegionLocator;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+/**
+ * {@link InputFormat} subclass that wraps the access for HTables. Returns 
the result as {@link Row}
+ */
+public class HBaseTableSourceInputFormat extends RichInputFormat implements ResultTypeQueryable {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(HBaseTableSourceInputFormat.class);
+   private String tableName;
+   private TypeInformation[] fieldTypeInfos;
+   private String[] fieldNames;
+   private transient Table table;
+   private transient Scan scan;
+   private transient Connection conn;
+   private ResultScanner resultScanner = null;
+
+   private byte[] lastRow;
+   private int scannedRows;
+   private boolean endReached = false;
+   private org.apache.hadoop.conf.Configuration conf;
+   private static final String COLON = ":";
+
+   public HBaseTableSourceInputFormat(org.apache.hadoop.conf.Configuration 
conf, String tableName, String[] fieldNames, TypeInformation[] fieldTypeInfos) {
+   this.conf = conf;
+   this.tableName = tableName;
+   this.fieldNames = fieldNames;
+   this.fieldTypeInfos = fieldTypeInfos;
+   }
+
+   @Override
+   public void configure(Configuration parameters) {
+   LOG.info("Initializing HBaseConfiguration");
+   connectToTable();
+   if(table != null) {
+   scan = createScanner();
+   }
+   }
+
+   private Scan createScanner() {
+   Scan scan = new Scan();
+   for(String field : fieldNames) {
+   // select only the fields in the 'selectedFields'
+   String[] famCol = field.split(COLON);
+   scan.addColumn(Bytes.toBytes(famCol[0]), 
Bytes.toBytes(famCol[1]));
+  

[jira] [Commented] (FLINK-5472) WebUI endpoint not advertised correctly to Mesos

2017-01-19 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5472?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15829638#comment-15829638
 ] 

Till Rohrmann commented on FLINK-5472:
--

Thanks [~eronwright] for pointing this out to me. Will FLINK-5030 enable HTTPS 
support by default also if the general Flink {{security.ssl.enabled}} option is 
set to {{false}}?

> WebUI endpoint not advertised correctly to Mesos
> 
>
> Key: FLINK-5472
> URL: https://issues.apache.org/jira/browse/FLINK-5472
> Project: Flink
>  Issue Type: Bug
>  Components: Mesos, Webfrontend
>Affects Versions: 1.2.0, 1.3.0
>Reporter: Till Rohrmann
> Fix For: 1.3.0
>
>
> When trying to access {{https://jobmanager:port}}, chrome says that the 
> webserver answered with an invalid response {{ERR_SSL_PROTOCOL_ERROR}}.
> This happens, for example, when one tries to access Flink's web UI from the 
> DC/OS dashboard via the endpoint links.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-5568) Introduce interface for catalog, and provide an in-memory implementation

2017-01-19 Thread zhangjing (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5568?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhangjing reassigned FLINK-5568:


Assignee: zhangjing

> Introduce interface for catalog, and provide an in-memory implementation
> 
>
> Key: FLINK-5568
> URL: https://issues.apache.org/jira/browse/FLINK-5568
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kurt Young
>Assignee: zhangjing
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #3150: [FLINK-4693][tableApi] Add session group-windows for batc...

2017-01-19 Thread sunjincheng121
Github user sunjincheng121 commented on the issue:

https://github.com/apache/flink/pull/3150
  
@fhueske Thank you very much for your detailed review, your optimization 
suggestion is that I should be concerned, I will update the PR as soon as 
possible. Thanks again!!!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4693) Add session group-windows for batch tables

2017-01-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15829654#comment-15829654
 ] 

ASF GitHub Bot commented on FLINK-4693:
---

Github user sunjincheng121 commented on the issue:

https://github.com/apache/flink/pull/3150
  
@fhueske Thank you very much for your detailed review, your optimization 
suggestion is that I should be concerned, I will update the PR as soon as 
possible. Thanks again!!!


> Add session group-windows for batch tables
> ---
>
> Key: FLINK-4693
> URL: https://issues.apache.org/jira/browse/FLINK-4693
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: sunjincheng
>
> Add Session group-windows for batch tables as described in 
> [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations].
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #3077: [FLINK-5423] Implement Stochastic Outlier Selection

2017-01-19 Thread Fokko
Github user Fokko commented on the issue:

https://github.com/apache/flink/pull/3077
  
@tillrohrmann I've added documentation about the algorithm. Can you check? 

Cheers, Fokko


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5423) Implement Stochastic Outlier Selection

2017-01-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15829661#comment-15829661
 ] 

ASF GitHub Bot commented on FLINK-5423:
---

Github user Fokko commented on the issue:

https://github.com/apache/flink/pull/3077
  
@tillrohrmann I've added documentation about the algorithm. Can you check? 

Cheers, Fokko


> Implement Stochastic Outlier Selection
> --
>
> Key: FLINK-5423
> URL: https://issues.apache.org/jira/browse/FLINK-5423
> Project: Flink
>  Issue Type: Improvement
>  Components: Machine Learning Library
>Reporter: Fokko Driesprong
>Assignee: Fokko Driesprong
>
> I've implemented the Stochastic Outlier Selection (SOS) algorithm by Jeroen 
> Jansen.
> http://jeroenjanssens.com/2013/11/24/stochastic-outlier-selection.html
> Integrated as much as possible with the components from the machine learning 
> library.
> The algorithm itself has been compared to four other algorithms and it it 
> shows that SOS has a higher performance on most of these real-world datasets. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #1668: [FLINK-3257] Add Exactly-Once Processing Guarantees for I...

2017-01-19 Thread senorcarbone
Github user senorcarbone commented on the issue:

https://github.com/apache/flink/pull/1668
  
The last update implements a variant of what @StephanEwen proposes. We have 
put some more thought on this offline too thanks to @gyfora ! The idea is that 
instead of putting records to each `ListState`, the output log is partitioned 
into multiple log "slices", one per concurrent checkpoint.

More specifically, the `UpstreamLogger` operator at the `IterationHead` 
slices logs proportionally to the number of concurrent snapshots. This also 
allows committed output logs to be uniquely identified and cleared after each 
complete checkpoint. The design is based on the following assumptions:

- A slice is named after a checkpoint ID. Checkpoint IDs are numerically 
ordered within an execution.
- Each checkpoint barrier arrives back in FIFO order, thus we discard log 
slices in respective FIFO order.
 - Upon restoration the logger sorts sliced logs in the same FIFO order and 
returns an Iterable that gives a singular view of the log.

Before I polish this we need to close a memory leak. The `clear` operation 
of `State` cleans the state under the registered id but it does not seem to 
unregister the key itself. Does anyone have an idea on how to unregister state 
properly? Hope this gets some attention to wrap it up, it's been too long :). 



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs

2017-01-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15829678#comment-15829678
 ] 

ASF GitHub Bot commented on FLINK-3257:
---

Github user senorcarbone commented on the issue:

https://github.com/apache/flink/pull/1668
  
The last update implements a variant of what @StephanEwen proposes. We have 
put some more thought on this offline too thanks to @gyfora ! The idea is that 
instead of putting records to each `ListState`, the output log is partitioned 
into multiple log "slices", one per concurrent checkpoint.

More specifically, the `UpstreamLogger` operator at the `IterationHead` 
slices logs proportionally to the number of concurrent snapshots. This also 
allows committed output logs to be uniquely identified and cleared after each 
complete checkpoint. The design is based on the following assumptions:

- A slice is named after a checkpoint ID. Checkpoint IDs are numerically 
ordered within an execution.
- Each checkpoint barrier arrives back in FIFO order, thus we discard log 
slices in respective FIFO order.
 - Upon restoration the logger sorts sliced logs in the same FIFO order and 
returns an Iterable that gives a singular view of the log.

Before I polish this we need to close a memory leak. The `clear` operation 
of `State` cleans the state under the registered id but it does not seem to 
unregister the key itself. Does anyone have an idea on how to unregister state 
properly? Hope this gets some attention to wrap it up, it's been too long :). 



> Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
> ---
>
> Key: FLINK-3257
> URL: https://issues.apache.org/jira/browse/FLINK-3257
> Project: Flink
>  Issue Type: Improvement
>Reporter: Paris Carbone
>Assignee: Paris Carbone
>
> The current snapshotting algorithm cannot support cycles in the execution 
> graph. An alternative scheme can potentially include records in-transit 
> through the back-edges of a cyclic execution graph (ABS [1]) to achieve the 
> same guarantees.
> One straightforward implementation of ABS for cyclic graphs can work as 
> follows along the lines:
> 1) Upon triggering a barrier in an IterationHead from the TaskManager start 
> block output and start upstream backup of all records forwarded from the 
> respective IterationSink.
> 2) The IterationSink should eventually forward the current snapshotting epoch 
> barrier to the IterationSource.
> 3) Upon receiving a barrier from the IterationSink, the IterationSource 
> should finalize the snapshot, unblock its output and emit all records 
> in-transit in FIFO order and continue the usual execution.
> --
> Upon restart the IterationSource should emit all records from the injected 
> snapshot first and then continue its usual execution.
> Several optimisations and slight variations can be potentially achieved but 
> this can be the initial implementation take.
> [1] http://arxiv.org/abs/1506.08603



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2885: [FLINK-1707] Affinity propagation

2017-01-19 Thread vasia
Github user vasia commented on the issue:

https://github.com/apache/flink/pull/2885
  
Hi @joseprupi, thank you for the PR! This should replace #2053, right? If 
yes, could you please close #2053? Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-1707) Add an Affinity Propagation Library Method

2017-01-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1707?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15829686#comment-15829686
 ] 

ASF GitHub Bot commented on FLINK-1707:
---

Github user vasia commented on the issue:

https://github.com/apache/flink/pull/2885
  
Hi @joseprupi, thank you for the PR! This should replace #2053, right? If 
yes, could you please close #2053? Thanks!


> Add an Affinity Propagation Library Method
> --
>
> Key: FLINK-1707
> URL: https://issues.apache.org/jira/browse/FLINK-1707
> Project: Flink
>  Issue Type: New Feature
>  Components: Gelly
>Reporter: Vasia Kalavri
>Assignee: Josep Rubió
>Priority: Minor
>  Labels: requires-design-doc
> Attachments: Binary_Affinity_Propagation_in_Flink_design_doc.pdf
>
>
> This issue proposes adding the an implementation of the Affinity Propagation 
> algorithm as a Gelly library method and a corresponding example.
> The algorithm is described in paper [1] and a description of a vertex-centric 
> implementation can be found is [2].
> [1]: http://www.psi.toronto.edu/affinitypropagation/FreyDueckScience07.pdf
> [2]: http://event.cwi.nl/grades2014/00-ching-slides.pdf
> Design doc:
> https://docs.google.com/document/d/1QULalzPqMVICi8jRVs3S0n39pell2ZVc7RNemz_SGA4/edit?usp=sharing
> Example spreadsheet:
> https://docs.google.com/spreadsheets/d/1CurZCBP6dPb1IYQQIgUHVjQdyLxK0JDGZwlSXCzBcvA/edit?usp=sharing
> Graph:
> https://docs.google.com/drawings/d/1PC3S-6AEt2Gp_TGrSfiWzkTcL7vXhHSxvM6b9HglmtA/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5415) ContinuousFileProcessingTest failed on travis

2017-01-19 Thread Ufuk Celebi (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5415?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15829692#comment-15829692
 ] 

Ufuk Celebi commented on FLINK-5415:


https://s3.amazonaws.com/archive.travis-ci.org/jobs/193115046/log.txt

> ContinuousFileProcessingTest failed on travis
> -
>
> Key: FLINK-5415
> URL: https://issues.apache.org/jira/browse/FLINK-5415
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.3.0
>Reporter: Chesnay Schepler
>
> https://s3.amazonaws.com/archive.travis-ci.org/jobs/189171123/log.txt
> testFunctionRestore(org.apache.flink.hdfstests.ContinuousFileProcessingTest)  
> Time elapsed: 0.162 sec  <<< FAILURE!
> java.lang.AssertionError: expected:<1483623669528> but 
> was:<-9223372036854775808>
>   at org.junit.Assert.fail(Assert.java:88)
>   at org.junit.Assert.failNotEquals(Assert.java:834)
>   at org.junit.Assert.assertEquals(Assert.java:645)
>   at org.junit.Assert.assertEquals(Assert.java:631)
>   at 
> org.apache.flink.hdfstests.ContinuousFileProcessingTest.testFunctionRestore(ContinuousFileProcessingTest.java:761)
> testProcessOnce(org.apache.flink.hdfstests.ContinuousFileProcessingTest)  
> Time elapsed: 0.045 sec  <<< FAILURE!
> java.lang.AssertionError: null
>   at org.junit.Assert.fail(Assert.java:86)
>   at org.junit.Assert.assertTrue(Assert.java:41)
>   at org.junit.Assert.assertFalse(Assert.java:64)
>   at org.junit.Assert.assertFalse(Assert.java:74)
>   at 
> org.apache.flink.hdfstests.ContinuousFileProcessingTest.createFileAndFillWithData(ContinuousFileProcessingTest.java:958)
>   at 
> org.apache.flink.hdfstests.ContinuousFileProcessingTest.testProcessOnce(ContinuousFileProcessingTest.java:675)
> testFileReadingOperatorWithIngestionTime(org.apache.flink.hdfstests.ContinuousFileProcessingTest)
>   Time elapsed: 0.002 sec  <<< FAILURE!
> java.lang.AssertionError: null
>   at org.junit.Assert.fail(Assert.java:86)
>   at org.junit.Assert.assertTrue(Assert.java:41)
>   at org.junit.Assert.assertFalse(Assert.java:64)
>   at org.junit.Assert.assertFalse(Assert.java:74)
>   at 
> org.apache.flink.hdfstests.ContinuousFileProcessingTest.createFileAndFillWithData(ContinuousFileProcessingTest.java:958)
>   at 
> org.apache.flink.hdfstests.ContinuousFileProcessingTest.testFileReadingOperatorWithIngestionTime(ContinuousFileProcessingTest.java:150)
> testSortingOnModTime(org.apache.flink.hdfstests.ContinuousFileProcessingTest) 
>  Time elapsed: 0.002 sec  <<< FAILURE!
> java.lang.AssertionError: null
>   at org.junit.Assert.fail(Assert.java:86)
>   at org.junit.Assert.assertTrue(Assert.java:41)
>   at org.junit.Assert.assertFalse(Assert.java:64)
>   at org.junit.Assert.assertFalse(Assert.java:74)
>   at 
> org.apache.flink.hdfstests.ContinuousFileProcessingTest.createFileAndFillWithData(ContinuousFileProcessingTest.java:958)
>   at 
> org.apache.flink.hdfstests.ContinuousFileProcessingTest.testSortingOnModTime(ContinuousFileProcessingTest.java:596)
> testFileReadingOperatorWithEventTime(org.apache.flink.hdfstests.ContinuousFileProcessingTest)
>   Time elapsed: 0.001 sec  <<< FAILURE!
> java.lang.AssertionError: null
>   at org.junit.Assert.fail(Assert.java:86)
>   at org.junit.Assert.assertTrue(Assert.java:41)
>   at org.junit.Assert.assertFalse(Assert.java:64)
>   at org.junit.Assert.assertFalse(Assert.java:74)
>   at 
> org.apache.flink.hdfstests.ContinuousFileProcessingTest.createFileAndFillWithData(ContinuousFileProcessingTest.java:958)
>   at 
> org.apache.flink.hdfstests.ContinuousFileProcessingTest.testFileReadingOperatorWithEventTime(ContinuousFileProcessingTest.java:308)
> testProcessContinuously(org.apache.flink.hdfstests.ContinuousFileProcessingTest)
>   Time elapsed: 0.001 sec  <<< FAILURE!
> java.lang.AssertionError: null
>   at org.junit.Assert.fail(Assert.java:86)
>   at org.junit.Assert.assertTrue(Assert.java:41)
>   at org.junit.Assert.assertFalse(Assert.java:64)
>   at org.junit.Assert.assertFalse(Assert.java:74)
>   at 
> org.apache.flink.hdfstests.ContinuousFileProcessingTest.createFileAndFillWithData(ContinuousFileProcessingTest.java:958)
>   at 
> org.apache.flink.hdfstests.ContinuousFileProcessingTest.testProcessContinuously(ContinuousFileProcessingTest.java:771)
> Results :
> Failed tests: 
>   
> ContinuousFileProcessingTest.testFileReadingOperatorWithEventTime:308->createFileAndFillWithData:958
>  null
>   
> ContinuousFileProcessingTest.testFileReadingOperatorWithIngestionTime:150->createFileAndFillWithData:958
>  null
>   ContinuousFileProcessingTest.testFunctionRestore:761 
> expected:<1483623669528> but was:<-9223372036854775808>
>   
> ContinuousFileProcessingTest.testProcessContinuously:771->createFileAn

[GitHub] flink issue #3010: [minor] Fix String formats in FlinkDistributionOverlay an...

2017-01-19 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/3010
  
Yes I think it does @zentol. +1 for merging.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2053: [FLINK-1707] Affinity Propagation

2017-01-19 Thread joseprupi
Github user joseprupi closed the pull request at:

https://github.com/apache/flink/pull/2053


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-1707) Add an Affinity Propagation Library Method

2017-01-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1707?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15829717#comment-15829717
 ] 

ASF GitHub Bot commented on FLINK-1707:
---

Github user joseprupi closed the pull request at:

https://github.com/apache/flink/pull/2053


> Add an Affinity Propagation Library Method
> --
>
> Key: FLINK-1707
> URL: https://issues.apache.org/jira/browse/FLINK-1707
> Project: Flink
>  Issue Type: New Feature
>  Components: Gelly
>Reporter: Vasia Kalavri
>Assignee: Josep Rubió
>Priority: Minor
>  Labels: requires-design-doc
> Attachments: Binary_Affinity_Propagation_in_Flink_design_doc.pdf
>
>
> This issue proposes adding the an implementation of the Affinity Propagation 
> algorithm as a Gelly library method and a corresponding example.
> The algorithm is described in paper [1] and a description of a vertex-centric 
> implementation can be found is [2].
> [1]: http://www.psi.toronto.edu/affinitypropagation/FreyDueckScience07.pdf
> [2]: http://event.cwi.nl/grades2014/00-ching-slides.pdf
> Design doc:
> https://docs.google.com/document/d/1QULalzPqMVICi8jRVs3S0n39pell2ZVc7RNemz_SGA4/edit?usp=sharing
> Example spreadsheet:
> https://docs.google.com/spreadsheets/d/1CurZCBP6dPb1IYQQIgUHVjQdyLxK0JDGZwlSXCzBcvA/edit?usp=sharing
> Graph:
> https://docs.google.com/drawings/d/1PC3S-6AEt2Gp_TGrSfiWzkTcL7vXhHSxvM6b9HglmtA/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1707) Add an Affinity Propagation Library Method

2017-01-19 Thread JIRA

[ 
https://issues.apache.org/jira/browse/FLINK-1707?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15829719#comment-15829719
 ] 

Josep Rubió commented on FLINK-1707:


Closed :)

> Add an Affinity Propagation Library Method
> --
>
> Key: FLINK-1707
> URL: https://issues.apache.org/jira/browse/FLINK-1707
> Project: Flink
>  Issue Type: New Feature
>  Components: Gelly
>Reporter: Vasia Kalavri
>Assignee: Josep Rubió
>Priority: Minor
>  Labels: requires-design-doc
> Attachments: Binary_Affinity_Propagation_in_Flink_design_doc.pdf
>
>
> This issue proposes adding the an implementation of the Affinity Propagation 
> algorithm as a Gelly library method and a corresponding example.
> The algorithm is described in paper [1] and a description of a vertex-centric 
> implementation can be found is [2].
> [1]: http://www.psi.toronto.edu/affinitypropagation/FreyDueckScience07.pdf
> [2]: http://event.cwi.nl/grades2014/00-ching-slides.pdf
> Design doc:
> https://docs.google.com/document/d/1QULalzPqMVICi8jRVs3S0n39pell2ZVc7RNemz_SGA4/edit?usp=sharing
> Example spreadsheet:
> https://docs.google.com/spreadsheets/d/1CurZCBP6dPb1IYQQIgUHVjQdyLxK0JDGZwlSXCzBcvA/edit?usp=sharing
> Graph:
> https://docs.google.com/drawings/d/1PC3S-6AEt2Gp_TGrSfiWzkTcL7vXhHSxvM6b9HglmtA/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #3130: [FLINK-5502] Add the function migration guide in docs.

2017-01-19 Thread kl0u
Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/3130
  
Thanks @alpinegizmo for the review! I integrated your comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5533) DCOS Integration

2017-01-19 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15829721#comment-15829721
 ] 

Till Rohrmann commented on FLINK-5533:
--

Hi [~mgummelt], [~joerg84] and [~Makman2] are also working on the DC/OS Flink 
package. The idea was to try to release it with the upcoming Flink 1.2 release 
if possible. Flink 1.2 is currently being tested and might be released in the 
next couple of weeks.

> DCOS Integration
> 
>
> Key: FLINK-5533
> URL: https://issues.apache.org/jira/browse/FLINK-5533
> Project: Flink
>  Issue Type: New Feature
>  Components: Mesos
>Reporter: Eron Wright 
>Assignee: Eron Wright 
>
> Umbrella issue for DCOS integration, including production-level features but 
> not future improvements/bugs (for which a new 'DCOS' component might work 
> best).
> Design doc:
> [https://docs.google.com/document/d/14uyKG1VBXuV5_Xu9wUdrK97EcKmyn73wk0oP51S91hM/edit?usp=sharing]
> DCOS packaging code:
> [https://github.com/mesosphere/dcos-flink-service]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5502) Add documentation about migrating functions from 1.1 to 1.2

2017-01-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15829720#comment-15829720
 ] 

ASF GitHub Bot commented on FLINK-5502:
---

Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/3130
  
Thanks @alpinegizmo for the review! I integrated your comments.


> Add documentation about migrating functions from 1.1 to 1.2
> ---
>
> Key: FLINK-5502
> URL: https://issues.apache.org/jira/browse/FLINK-5502
> Project: Flink
>  Issue Type: Sub-task
>  Components: Documentation
>Affects Versions: 1.2.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>  Labels: documentation
> Fix For: 1.2.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4364) Implement TaskManager side of heartbeat from JobManager

2017-01-19 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15829725#comment-15829725
 ] 

Till Rohrmann commented on FLINK-4364:
--

Thanks [~wangzj] :-) I'll try to review it asap.

> Implement TaskManager side of heartbeat from JobManager
> ---
>
> Key: FLINK-4364
> URL: https://issues.apache.org/jira/browse/FLINK-4364
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Zhijiang Wang
>Assignee: Zhijiang Wang
>
> The {{JobManager}} initiates heartbeat messages via (JobID, JmLeaderID), and 
> the {{TaskManager}} will report metrics info for each heartbeat.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2974: [FLINK-5298] TM checks that log file exists

2017-01-19 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2974#discussion_r96843747
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
@@ -830,19 +830,24 @@ class TaskManager(
   case StdOutFileRequest =>
 new File(logFilePath.substring(0, logFilePath.length - 4) + 
".out");
 }
-val fis = new FileInputStream(file);
-Future {
-  val client: BlobClient = blobService.get.createClient()
-  client.put(fis);
-}(context.dispatcher)
-  .onComplete {
-case Success(value) => 
-  sender ! value
-  fis.close()
-case Failure(e) =>
-  sender ! e
-  fis.close()
+if (file.exists()) {
+  val fis = new FileInputStream(file);
+  Future {
+val client: BlobClient = blobService.get.createClient()
+client.put(fis);
   }(context.dispatcher)
+.onComplete {
+  case Success(value) =>
+sender ! value
+fis.close()
+  case Failure(e) =>
+sender ! e
--- End diff --

And here `akka.actor.Status.Failure`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5298) TaskManager crashes when TM log not existant

2017-01-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15829768#comment-15829768
 ] 

ASF GitHub Bot commented on FLINK-5298:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2974#discussion_r96843696
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
@@ -821,7 +821,7 @@ class TaskManager(
 val logFilePathOption = Option(config.configuration.getString(
   ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, 
System.getProperty("log.file")));
 logFilePathOption match {
-  case None => throw new IOException("TaskManager log files are 
unavailable. " +
+  case None => sender ! new IOException("TaskManager log files are 
unavailable. " +
--- End diff --

In order to send an `Exception` over the wire to the sender it has to be 
packed into `akka.actor.Status.Failure`. Otherwise it will only be sent as a 
normal response type.


> TaskManager crashes when TM log not existant
> 
>
> Key: FLINK-5298
> URL: https://issues.apache.org/jira/browse/FLINK-5298
> Project: Flink
>  Issue Type: Bug
>  Components: Mesos, TaskManager, Webfrontend
>Affects Versions: 1.1.0, 1.2.0
>Reporter: Mischa Krüger
>Assignee: Chesnay Schepler
>Priority: Trivial
> Fix For: 1.2.0
>
>
> {code}
> java.io.FileNotFoundException: flink-taskmanager.out (No such file or 
> directory)
> at java.io.FileInputStream.open0(Native Method)
> at java.io.FileInputStream.open(FileInputStream.java:195)
> at java.io.FileInputStream.(FileInputStream.java:138)
> at 
> org.apache.flink.runtime.taskmanager.TaskManager.org$apache$flink$runtime$taskmanager$TaskManager$$handleRequestTaskManagerLog(TaskManager.scala:833)
> at 
> org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$handleMessage$1.applyOrElse(TaskManager.scala:340)
> at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
> at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
> at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
> at 
> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:44)
> at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
> at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
> at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
> at 
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
> at 
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
> at 
> org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
> at 
> org.apache.flink.runtime.taskmanager.TaskManager.aroundReceive(TaskManager.scala:122)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
> at akka.actor.ActorCell.invoke(ActorCell.scala:487)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
> at akka.dispatch.Mailbox.run(Mailbox.scala:221)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> 2016-12-08 16:45:14,995 INFO  
> org.apache.flink.mesos.runtime.clusterframework.MesosTaskManager  - Stopping 
> TaskManager akka://flink/user/taskmanager#1361882659.
> 2016-12-08 16:45:14,995 INFO  
> org.apache.flink.mesos.runtime.clusterframework.MesosTaskManager  - 
> Disassociating from JobManager
> 2016-12-08 16:45:14,997 INFO  org.apache.flink.runtime.blob.BlobCache 
>   - Shutting down BlobCache
> 2016-12-08 16:45:15,006 INFO  
> org.apache.flink.runtime.io.disk.iomanager.IOManager  - I/O manager 
> removed spill file directory 
> /tmp/flink-io-e61f717b-630c-4a2a-b3e3-62ccb40aa2f9
> 2016-12-08 16:45:15,006 INFO  
> org.apache.flink.runtime.io.network.NetworkEnvironment- Shutting down 
> the network environment and its components.
> 2016-12-08 16:45:15,008 INFO  
> org.apache.flink.runtime.io.network.netty.NettyClient - Successful 
> shutdown (took 1 ms).
> 2016-12-08 16:45:15,00

[jira] [Commented] (FLINK-5298) TaskManager crashes when TM log not existant

2017-01-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15829765#comment-15829765
 ] 

ASF GitHub Bot commented on FLINK-5298:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2974#discussion_r96843830
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
@@ -830,19 +830,24 @@ class TaskManager(
   case StdOutFileRequest =>
 new File(logFilePath.substring(0, logFilePath.length - 4) + 
".out");
 }
-val fis = new FileInputStream(file);
-Future {
-  val client: BlobClient = blobService.get.createClient()
-  client.put(fis);
-}(context.dispatcher)
-  .onComplete {
-case Success(value) => 
-  sender ! value
-  fis.close()
-case Failure(e) =>
-  sender ! e
-  fis.close()
+if (file.exists()) {
+  val fis = new FileInputStream(file);
+  Future {
+val client: BlobClient = blobService.get.createClient()
+client.put(fis);
   }(context.dispatcher)
+.onComplete {
+  case Success(value) =>
+sender ! value
+fis.close()
+  case Failure(e) =>
+sender ! e
+fis.close()
+}(context.dispatcher)
+} else {
+  sender ! new IOException("TaskManager log files are unavailable. 
" +
+"Log file does not exist.")
--- End diff --

Maybe we could add the path under which we've looked for the log file to 
the error message.


> TaskManager crashes when TM log not existant
> 
>
> Key: FLINK-5298
> URL: https://issues.apache.org/jira/browse/FLINK-5298
> Project: Flink
>  Issue Type: Bug
>  Components: Mesos, TaskManager, Webfrontend
>Affects Versions: 1.1.0, 1.2.0
>Reporter: Mischa Krüger
>Assignee: Chesnay Schepler
>Priority: Trivial
> Fix For: 1.2.0
>
>
> {code}
> java.io.FileNotFoundException: flink-taskmanager.out (No such file or 
> directory)
> at java.io.FileInputStream.open0(Native Method)
> at java.io.FileInputStream.open(FileInputStream.java:195)
> at java.io.FileInputStream.(FileInputStream.java:138)
> at 
> org.apache.flink.runtime.taskmanager.TaskManager.org$apache$flink$runtime$taskmanager$TaskManager$$handleRequestTaskManagerLog(TaskManager.scala:833)
> at 
> org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$handleMessage$1.applyOrElse(TaskManager.scala:340)
> at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
> at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
> at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
> at 
> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:44)
> at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
> at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
> at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
> at 
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
> at 
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
> at 
> org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
> at 
> org.apache.flink.runtime.taskmanager.TaskManager.aroundReceive(TaskManager.scala:122)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
> at akka.actor.ActorCell.invoke(ActorCell.scala:487)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
> at akka.dispatch.Mailbox.run(Mailbox.scala:221)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> 2016-12-08 16:45:14,995 INFO  
> org.apache.flink.mesos.runtime.clusterframework.MesosTaskManager  - Stopping 
> TaskManager akk

[jira] [Commented] (FLINK-5298) TaskManager crashes when TM log not existant

2017-01-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15829766#comment-15829766
 ] 

ASF GitHub Bot commented on FLINK-5298:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2974#discussion_r96843747
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
@@ -830,19 +830,24 @@ class TaskManager(
   case StdOutFileRequest =>
 new File(logFilePath.substring(0, logFilePath.length - 4) + 
".out");
 }
-val fis = new FileInputStream(file);
-Future {
-  val client: BlobClient = blobService.get.createClient()
-  client.put(fis);
-}(context.dispatcher)
-  .onComplete {
-case Success(value) => 
-  sender ! value
-  fis.close()
-case Failure(e) =>
-  sender ! e
-  fis.close()
+if (file.exists()) {
+  val fis = new FileInputStream(file);
+  Future {
+val client: BlobClient = blobService.get.createClient()
+client.put(fis);
   }(context.dispatcher)
+.onComplete {
+  case Success(value) =>
+sender ! value
+fis.close()
+  case Failure(e) =>
+sender ! e
--- End diff --

And here `akka.actor.Status.Failure`.


> TaskManager crashes when TM log not existant
> 
>
> Key: FLINK-5298
> URL: https://issues.apache.org/jira/browse/FLINK-5298
> Project: Flink
>  Issue Type: Bug
>  Components: Mesos, TaskManager, Webfrontend
>Affects Versions: 1.1.0, 1.2.0
>Reporter: Mischa Krüger
>Assignee: Chesnay Schepler
>Priority: Trivial
> Fix For: 1.2.0
>
>
> {code}
> java.io.FileNotFoundException: flink-taskmanager.out (No such file or 
> directory)
> at java.io.FileInputStream.open0(Native Method)
> at java.io.FileInputStream.open(FileInputStream.java:195)
> at java.io.FileInputStream.(FileInputStream.java:138)
> at 
> org.apache.flink.runtime.taskmanager.TaskManager.org$apache$flink$runtime$taskmanager$TaskManager$$handleRequestTaskManagerLog(TaskManager.scala:833)
> at 
> org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$handleMessage$1.applyOrElse(TaskManager.scala:340)
> at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
> at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
> at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
> at 
> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:44)
> at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
> at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
> at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
> at 
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
> at 
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
> at 
> org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
> at 
> org.apache.flink.runtime.taskmanager.TaskManager.aroundReceive(TaskManager.scala:122)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
> at akka.actor.ActorCell.invoke(ActorCell.scala:487)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
> at akka.dispatch.Mailbox.run(Mailbox.scala:221)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> 2016-12-08 16:45:14,995 INFO  
> org.apache.flink.mesos.runtime.clusterframework.MesosTaskManager  - Stopping 
> TaskManager akka://flink/user/taskmanager#1361882659.
> 2016-12-08 16:45:14,995 INFO  
> org.apache.flink.mesos.runtime.clusterframework.MesosTaskManager  - 
> Disassociating from JobManager
> 2016-12-08 16:45:14,997 INFO  org.apache.flink.runtime.blob.BlobCache 
>   - Shut

[GitHub] flink pull request #2974: [FLINK-5298] TM checks that log file exists

2017-01-19 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2974#discussion_r96843696
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
@@ -821,7 +821,7 @@ class TaskManager(
 val logFilePathOption = Option(config.configuration.getString(
   ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, 
System.getProperty("log.file")));
 logFilePathOption match {
-  case None => throw new IOException("TaskManager log files are 
unavailable. " +
+  case None => sender ! new IOException("TaskManager log files are 
unavailable. " +
--- End diff --

In order to send an `Exception` over the wire to the sender it has to be 
packed into `akka.actor.Status.Failure`. Otherwise it will only be sent as a 
normal response type.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2974: [FLINK-5298] TM checks that log file exists

2017-01-19 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2974#discussion_r96843719
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
@@ -830,19 +830,24 @@ class TaskManager(
   case StdOutFileRequest =>
 new File(logFilePath.substring(0, logFilePath.length - 4) + 
".out");
 }
-val fis = new FileInputStream(file);
-Future {
-  val client: BlobClient = blobService.get.createClient()
-  client.put(fis);
-}(context.dispatcher)
-  .onComplete {
-case Success(value) => 
-  sender ! value
-  fis.close()
-case Failure(e) =>
-  sender ! e
-  fis.close()
+if (file.exists()) {
+  val fis = new FileInputStream(file);
+  Future {
+val client: BlobClient = blobService.get.createClient()
+client.put(fis);
   }(context.dispatcher)
+.onComplete {
+  case Success(value) =>
+sender ! value
+fis.close()
+  case Failure(e) =>
+sender ! e
+fis.close()
+}(context.dispatcher)
+} else {
+  sender ! new IOException("TaskManager log files are unavailable. 
" +
--- End diff --

The same here with `akka.actor.Status.Failure`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5298) TaskManager crashes when TM log not existant

2017-01-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15829767#comment-15829767
 ] 

ASF GitHub Bot commented on FLINK-5298:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2974#discussion_r96843719
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
@@ -830,19 +830,24 @@ class TaskManager(
   case StdOutFileRequest =>
 new File(logFilePath.substring(0, logFilePath.length - 4) + 
".out");
 }
-val fis = new FileInputStream(file);
-Future {
-  val client: BlobClient = blobService.get.createClient()
-  client.put(fis);
-}(context.dispatcher)
-  .onComplete {
-case Success(value) => 
-  sender ! value
-  fis.close()
-case Failure(e) =>
-  sender ! e
-  fis.close()
+if (file.exists()) {
+  val fis = new FileInputStream(file);
+  Future {
+val client: BlobClient = blobService.get.createClient()
+client.put(fis);
   }(context.dispatcher)
+.onComplete {
+  case Success(value) =>
+sender ! value
+fis.close()
+  case Failure(e) =>
+sender ! e
+fis.close()
+}(context.dispatcher)
+} else {
+  sender ! new IOException("TaskManager log files are unavailable. 
" +
--- End diff --

The same here with `akka.actor.Status.Failure`.


> TaskManager crashes when TM log not existant
> 
>
> Key: FLINK-5298
> URL: https://issues.apache.org/jira/browse/FLINK-5298
> Project: Flink
>  Issue Type: Bug
>  Components: Mesos, TaskManager, Webfrontend
>Affects Versions: 1.1.0, 1.2.0
>Reporter: Mischa Krüger
>Assignee: Chesnay Schepler
>Priority: Trivial
> Fix For: 1.2.0
>
>
> {code}
> java.io.FileNotFoundException: flink-taskmanager.out (No such file or 
> directory)
> at java.io.FileInputStream.open0(Native Method)
> at java.io.FileInputStream.open(FileInputStream.java:195)
> at java.io.FileInputStream.(FileInputStream.java:138)
> at 
> org.apache.flink.runtime.taskmanager.TaskManager.org$apache$flink$runtime$taskmanager$TaskManager$$handleRequestTaskManagerLog(TaskManager.scala:833)
> at 
> org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$handleMessage$1.applyOrElse(TaskManager.scala:340)
> at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
> at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
> at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
> at 
> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:44)
> at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
> at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
> at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
> at 
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
> at 
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
> at 
> org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
> at 
> org.apache.flink.runtime.taskmanager.TaskManager.aroundReceive(TaskManager.scala:122)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
> at akka.actor.ActorCell.invoke(ActorCell.scala:487)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
> at akka.dispatch.Mailbox.run(Mailbox.scala:221)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> 2016-12-08 16:45:14,995 INFO  
> org.apache.flink.mesos.runtime.clusterframework.MesosTaskManager  - Stopping 
> TaskManager akka://flink/user/taskmanager#1361882659.
> 2016-12-08 16:45:14,995 INFO  
> org.apache.flink

[GitHub] flink pull request #2974: [FLINK-5298] TM checks that log file exists

2017-01-19 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2974#discussion_r96843830
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
@@ -830,19 +830,24 @@ class TaskManager(
   case StdOutFileRequest =>
 new File(logFilePath.substring(0, logFilePath.length - 4) + 
".out");
 }
-val fis = new FileInputStream(file);
-Future {
-  val client: BlobClient = blobService.get.createClient()
-  client.put(fis);
-}(context.dispatcher)
-  .onComplete {
-case Success(value) => 
-  sender ! value
-  fis.close()
-case Failure(e) =>
-  sender ! e
-  fis.close()
+if (file.exists()) {
+  val fis = new FileInputStream(file);
+  Future {
+val client: BlobClient = blobService.get.createClient()
+client.put(fis);
   }(context.dispatcher)
+.onComplete {
+  case Success(value) =>
+sender ! value
+fis.close()
+  case Failure(e) =>
+sender ! e
+fis.close()
+}(context.dispatcher)
+} else {
+  sender ! new IOException("TaskManager log files are unavailable. 
" +
+"Log file does not exist.")
--- End diff --

Maybe we could add the path under which we've looked for the log file to 
the error message.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3141: [FLINK-5520] [table] Disable outer joins with non-...

2017-01-19 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3141#discussion_r96843888
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/JoinITCase.scala
 ---
@@ -375,4 +395,62 @@ class JoinITCase(
 
 Assert.assertEquals(0, result)
   }
+
+  @Test(expected = classOf[TableException])
+  def testRightOuterJoinWithNonEquiJoinPredicate(): Unit = {
+
+val env = ExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env, config)
+tEnv.getConfig.setNullCheck(true)
+
+val sqlQuery = "SELECT c, g FROM Table3 RIGHT OUTER JOIN Table5 ON b = 
e and a > d"
+
+val ds1 = 
CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
+val ds2 = 
CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('d, 'e, 'f, 'g, 'h)
+tEnv.registerTable("Table3", ds1)
+tEnv.registerTable("Table5", ds2)
+
+tEnv.sql(sqlQuery).toDataSet[Row].collect()
+
+val results = tEnv.sql(sqlQuery).toDataSet[Row].collect()
+  }
+
+  @Test(expected = classOf[TableException])
+  def testLeftOuterJoinWithNonEquiJoinPredicate(): Unit = {
+
+val env = ExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env, config)
+tEnv.getConfig.setNullCheck(true)
+
+val sqlQuery = "SELECT c, g FROM Table3 LEFT OUTER JOIN Table5 ON b = 
e and a > d"
+
+val ds1 = 
CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
+val ds2 = 
CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('d, 'e, 'f, 'g, 'h)
+tEnv.registerTable("Table3", ds1)
+tEnv.registerTable("Table5", ds2)
+
+tEnv.sql(sqlQuery).toDataSet[Row].collect()
+
+val results = tEnv.sql(sqlQuery).toDataSet[Row].collect()
+  }
+
+  @Test(expected = classOf[TableException])
+  def testFullOuterJoinWithNonEquiJoinPredicate(): Unit = {
+
+val env = ExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env, config)
+tEnv.getConfig.setNullCheck(true)
+
+val sqlQuery = "SELECT c, g FROM Table3 FULL OUTER JOIN Table5 ON b = 
e and a > d"
+
+val ds1 = 
CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
+val ds2 = 
CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('d, 'e, 'f, 'g, 'h)
+tEnv.registerTable("Table3", ds1)
+tEnv.registerTable("Table5", ds2)
+
+tEnv.sql(sqlQuery).toDataSet[Row].collect()
+
+val results = tEnv.sql(sqlQuery).toDataSet[Row].collect()
--- End diff --

remove


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3141: [FLINK-5520] [table] Disable outer joins with non-...

2017-01-19 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3141#discussion_r96856158
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/JoinITCase.scala
 ---
@@ -83,6 +83,27 @@ class JoinITCase(
 val env = ExecutionEnvironment.getExecutionEnvironment
 val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
+val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE b = e AND a < 6"
--- End diff --

I think this test covers the same case as the one above 
(`testJoinWithFilter()`).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3141: [FLINK-5520] [table] Disable outer joins with non-...

2017-01-19 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3141#discussion_r96843825
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/JoinITCase.scala
 ---
@@ -375,4 +395,62 @@ class JoinITCase(
 
 Assert.assertEquals(0, result)
   }
+
+  @Test(expected = classOf[TableException])
+  def testRightOuterJoinWithNonEquiJoinPredicate(): Unit = {
+
+val env = ExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env, config)
+tEnv.getConfig.setNullCheck(true)
+
+val sqlQuery = "SELECT c, g FROM Table3 RIGHT OUTER JOIN Table5 ON b = 
e and a > d"
+
+val ds1 = 
CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
+val ds2 = 
CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('d, 'e, 'f, 'g, 'h)
+tEnv.registerTable("Table3", ds1)
+tEnv.registerTable("Table5", ds2)
+
+tEnv.sql(sqlQuery).toDataSet[Row].collect()
+
+val results = tEnv.sql(sqlQuery).toDataSet[Row].collect()
+  }
+
+  @Test(expected = classOf[TableException])
+  def testLeftOuterJoinWithNonEquiJoinPredicate(): Unit = {
+
+val env = ExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env, config)
+tEnv.getConfig.setNullCheck(true)
+
+val sqlQuery = "SELECT c, g FROM Table3 LEFT OUTER JOIN Table5 ON b = 
e and a > d"
+
+val ds1 = 
CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
+val ds2 = 
CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('d, 'e, 'f, 'g, 'h)
+tEnv.registerTable("Table3", ds1)
+tEnv.registerTable("Table5", ds2)
+
+tEnv.sql(sqlQuery).toDataSet[Row].collect()
+
+val results = tEnv.sql(sqlQuery).toDataSet[Row].collect()
--- End diff --

remove


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3141: [FLINK-5520] [table] Disable outer joins with non-...

2017-01-19 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3141#discussion_r96841502
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
 ---
@@ -454,30 +454,54 @@ case class Join(
 
   private def testJoinCondition(expression: Expression): Unit = {
 
-def checkIfJoinCondition(exp : BinaryComparison) = exp.children match {
-case (x : JoinFieldReference) :: (y : JoinFieldReference) :: Nil
-  if x.isFromLeftInput != y.isFromLeftInput => Unit
-case x => failValidation(
-  s"Invalid non-join predicate $exp. For non-join predicates use 
Table#where.")
-  }
+def checkIfJoinCondition(exp: BinaryComparison) = exp.children match {
+  case (x: JoinFieldReference) :: (y: JoinFieldReference) :: Nil
+if x.isFromLeftInput != y.isFromLeftInput => true
+  case x => false
--- End diff --

`case x` -> `case _`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3141: [FLINK-5520] [table] Disable outer joins with non-...

2017-01-19 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3141#discussion_r96843797
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/JoinITCase.scala
 ---
@@ -375,4 +395,62 @@ class JoinITCase(
 
 Assert.assertEquals(0, result)
   }
+
+  @Test(expected = classOf[TableException])
+  def testRightOuterJoinWithNonEquiJoinPredicate(): Unit = {
+
+val env = ExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env, config)
+tEnv.getConfig.setNullCheck(true)
+
+val sqlQuery = "SELECT c, g FROM Table3 RIGHT OUTER JOIN Table5 ON b = 
e and a > d"
+
+val ds1 = 
CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
+val ds2 = 
CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('d, 'e, 'f, 'g, 'h)
+tEnv.registerTable("Table3", ds1)
+tEnv.registerTable("Table5", ds2)
+
+tEnv.sql(sqlQuery).toDataSet[Row].collect()
+
+val results = tEnv.sql(sqlQuery).toDataSet[Row].collect()
--- End diff --

This line can be removed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3141: [FLINK-5520] [table] Disable outer joins with non-...

2017-01-19 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3141#discussion_r96856944
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/JoinITCase.scala
 ---
@@ -234,7 +249,38 @@ class JoinITCase(
 TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
-  @Test
+  @Test(expected = classOf[ValidationException])
+  def testLeftJoinWithNotOnlyEquiJoin(): Unit = {
+val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env, config)
+tEnv.getConfig.setNullCheck(true)
+
+val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 
'b, 'c)
+val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 
'e, 'f, 'g, 'h)
+
+val joinT = ds1.leftOuterJoin(ds2, 'a === 'd && 'b < 'h).select('c, 'g)
+
+val results = joinT.toDataSet[Row].collect()
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testFullJoinWithNotOnlyEquiJoin(): Unit = {
+val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env, config)
+tEnv.getConfig.setNullCheck(true)
+
+val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 
'b, 'c)
+val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 
'e, 'f, 'g, 'h)
+
+val joinT = ds1.fullOuterJoin(ds2, 'a === 'd && 'b < 'h).select('c, 'g)
+
+val results = joinT.toDataSet[Row].collect()
+  }
+
+
--- End diff --

Add tests to check that local predicates are not allowed in outer joins as 
well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5520) Disable outer joins with non-equality predicates

2017-01-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15829900#comment-15829900
 ] 

ASF GitHub Bot commented on FLINK-5520:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3141#discussion_r96856944
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/JoinITCase.scala
 ---
@@ -234,7 +249,38 @@ class JoinITCase(
 TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
-  @Test
+  @Test(expected = classOf[ValidationException])
+  def testLeftJoinWithNotOnlyEquiJoin(): Unit = {
+val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env, config)
+tEnv.getConfig.setNullCheck(true)
+
+val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 
'b, 'c)
+val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 
'e, 'f, 'g, 'h)
+
+val joinT = ds1.leftOuterJoin(ds2, 'a === 'd && 'b < 'h).select('c, 'g)
+
+val results = joinT.toDataSet[Row].collect()
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testFullJoinWithNotOnlyEquiJoin(): Unit = {
+val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env, config)
+tEnv.getConfig.setNullCheck(true)
+
+val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 
'b, 'c)
+val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 
'e, 'f, 'g, 'h)
+
+val joinT = ds1.fullOuterJoin(ds2, 'a === 'd && 'b < 'h).select('c, 'g)
+
+val results = joinT.toDataSet[Row].collect()
+  }
+
+
--- End diff --

Add tests to check that local predicates are not allowed in outer joins as 
well.


> Disable outer joins with non-equality predicates
> 
>
> Key: FLINK-5520
> URL: https://issues.apache.org/jira/browse/FLINK-5520
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: lincoln.lee
>Priority: Blocker
> Fix For: 1.2.0
>
>
> Outer joins with non-equality predicates (and at least one equality 
> predicate) compute incorrect results. 
> Since this is not a very common requirement, I propose to disable this 
> feature for the 1.2.0 release and correctly implement it for a later version.
> The fix should add checks in the Table API validation phase (to get a good 
> error message) and in the DataSetJoinRule to prevent translation of SQL 
> queries with non-equality predicates on outer joins.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5520) Disable outer joins with non-equality predicates

2017-01-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15829899#comment-15829899
 ] 

ASF GitHub Bot commented on FLINK-5520:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3141#discussion_r96843825
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/JoinITCase.scala
 ---
@@ -375,4 +395,62 @@ class JoinITCase(
 
 Assert.assertEquals(0, result)
   }
+
+  @Test(expected = classOf[TableException])
+  def testRightOuterJoinWithNonEquiJoinPredicate(): Unit = {
+
+val env = ExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env, config)
+tEnv.getConfig.setNullCheck(true)
+
+val sqlQuery = "SELECT c, g FROM Table3 RIGHT OUTER JOIN Table5 ON b = 
e and a > d"
+
+val ds1 = 
CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
+val ds2 = 
CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('d, 'e, 'f, 'g, 'h)
+tEnv.registerTable("Table3", ds1)
+tEnv.registerTable("Table5", ds2)
+
+tEnv.sql(sqlQuery).toDataSet[Row].collect()
+
+val results = tEnv.sql(sqlQuery).toDataSet[Row].collect()
+  }
+
+  @Test(expected = classOf[TableException])
+  def testLeftOuterJoinWithNonEquiJoinPredicate(): Unit = {
+
+val env = ExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env, config)
+tEnv.getConfig.setNullCheck(true)
+
+val sqlQuery = "SELECT c, g FROM Table3 LEFT OUTER JOIN Table5 ON b = 
e and a > d"
+
+val ds1 = 
CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
+val ds2 = 
CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('d, 'e, 'f, 'g, 'h)
+tEnv.registerTable("Table3", ds1)
+tEnv.registerTable("Table5", ds2)
+
+tEnv.sql(sqlQuery).toDataSet[Row].collect()
+
+val results = tEnv.sql(sqlQuery).toDataSet[Row].collect()
--- End diff --

remove


> Disable outer joins with non-equality predicates
> 
>
> Key: FLINK-5520
> URL: https://issues.apache.org/jira/browse/FLINK-5520
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: lincoln.lee
>Priority: Blocker
> Fix For: 1.2.0
>
>
> Outer joins with non-equality predicates (and at least one equality 
> predicate) compute incorrect results. 
> Since this is not a very common requirement, I propose to disable this 
> feature for the 1.2.0 release and correctly implement it for a later version.
> The fix should add checks in the Table API validation phase (to get a good 
> error message) and in the DataSetJoinRule to prevent translation of SQL 
> queries with non-equality predicates on outer joins.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5520) Disable outer joins with non-equality predicates

2017-01-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15829897#comment-15829897
 ] 

ASF GitHub Bot commented on FLINK-5520:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3141#discussion_r96841502
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
 ---
@@ -454,30 +454,54 @@ case class Join(
 
   private def testJoinCondition(expression: Expression): Unit = {
 
-def checkIfJoinCondition(exp : BinaryComparison) = exp.children match {
-case (x : JoinFieldReference) :: (y : JoinFieldReference) :: Nil
-  if x.isFromLeftInput != y.isFromLeftInput => Unit
-case x => failValidation(
-  s"Invalid non-join predicate $exp. For non-join predicates use 
Table#where.")
-  }
+def checkIfJoinCondition(exp: BinaryComparison) = exp.children match {
+  case (x: JoinFieldReference) :: (y: JoinFieldReference) :: Nil
+if x.isFromLeftInput != y.isFromLeftInput => true
+  case x => false
--- End diff --

`case x` -> `case _`


> Disable outer joins with non-equality predicates
> 
>
> Key: FLINK-5520
> URL: https://issues.apache.org/jira/browse/FLINK-5520
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: lincoln.lee
>Priority: Blocker
> Fix For: 1.2.0
>
>
> Outer joins with non-equality predicates (and at least one equality 
> predicate) compute incorrect results. 
> Since this is not a very common requirement, I propose to disable this 
> feature for the 1.2.0 release and correctly implement it for a later version.
> The fix should add checks in the Table API validation phase (to get a good 
> error message) and in the DataSetJoinRule to prevent translation of SQL 
> queries with non-equality predicates on outer joins.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5520) Disable outer joins with non-equality predicates

2017-01-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15829896#comment-15829896
 ] 

ASF GitHub Bot commented on FLINK-5520:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3141#discussion_r96856158
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/JoinITCase.scala
 ---
@@ -83,6 +83,27 @@ class JoinITCase(
 val env = ExecutionEnvironment.getExecutionEnvironment
 val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
+val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE b = e AND a < 6"
--- End diff --

I think this test covers the same case as the one above 
(`testJoinWithFilter()`).


> Disable outer joins with non-equality predicates
> 
>
> Key: FLINK-5520
> URL: https://issues.apache.org/jira/browse/FLINK-5520
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: lincoln.lee
>Priority: Blocker
> Fix For: 1.2.0
>
>
> Outer joins with non-equality predicates (and at least one equality 
> predicate) compute incorrect results. 
> Since this is not a very common requirement, I propose to disable this 
> feature for the 1.2.0 release and correctly implement it for a later version.
> The fix should add checks in the Table API validation phase (to get a good 
> error message) and in the DataSetJoinRule to prevent translation of SQL 
> queries with non-equality predicates on outer joins.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5520) Disable outer joins with non-equality predicates

2017-01-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15829901#comment-15829901
 ] 

ASF GitHub Bot commented on FLINK-5520:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3141#discussion_r96843797
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/JoinITCase.scala
 ---
@@ -375,4 +395,62 @@ class JoinITCase(
 
 Assert.assertEquals(0, result)
   }
+
+  @Test(expected = classOf[TableException])
+  def testRightOuterJoinWithNonEquiJoinPredicate(): Unit = {
+
+val env = ExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env, config)
+tEnv.getConfig.setNullCheck(true)
+
+val sqlQuery = "SELECT c, g FROM Table3 RIGHT OUTER JOIN Table5 ON b = 
e and a > d"
+
+val ds1 = 
CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
+val ds2 = 
CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('d, 'e, 'f, 'g, 'h)
+tEnv.registerTable("Table3", ds1)
+tEnv.registerTable("Table5", ds2)
+
+tEnv.sql(sqlQuery).toDataSet[Row].collect()
+
+val results = tEnv.sql(sqlQuery).toDataSet[Row].collect()
--- End diff --

This line can be removed


> Disable outer joins with non-equality predicates
> 
>
> Key: FLINK-5520
> URL: https://issues.apache.org/jira/browse/FLINK-5520
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: lincoln.lee
>Priority: Blocker
> Fix For: 1.2.0
>
>
> Outer joins with non-equality predicates (and at least one equality 
> predicate) compute incorrect results. 
> Since this is not a very common requirement, I propose to disable this 
> feature for the 1.2.0 release and correctly implement it for a later version.
> The fix should add checks in the Table API validation phase (to get a good 
> error message) and in the DataSetJoinRule to prevent translation of SQL 
> queries with non-equality predicates on outer joins.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5520) Disable outer joins with non-equality predicates

2017-01-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15829898#comment-15829898
 ] 

ASF GitHub Bot commented on FLINK-5520:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3141#discussion_r96843888
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/JoinITCase.scala
 ---
@@ -375,4 +395,62 @@ class JoinITCase(
 
 Assert.assertEquals(0, result)
   }
+
+  @Test(expected = classOf[TableException])
+  def testRightOuterJoinWithNonEquiJoinPredicate(): Unit = {
+
+val env = ExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env, config)
+tEnv.getConfig.setNullCheck(true)
+
+val sqlQuery = "SELECT c, g FROM Table3 RIGHT OUTER JOIN Table5 ON b = 
e and a > d"
+
+val ds1 = 
CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
+val ds2 = 
CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('d, 'e, 'f, 'g, 'h)
+tEnv.registerTable("Table3", ds1)
+tEnv.registerTable("Table5", ds2)
+
+tEnv.sql(sqlQuery).toDataSet[Row].collect()
+
+val results = tEnv.sql(sqlQuery).toDataSet[Row].collect()
+  }
+
+  @Test(expected = classOf[TableException])
+  def testLeftOuterJoinWithNonEquiJoinPredicate(): Unit = {
+
+val env = ExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env, config)
+tEnv.getConfig.setNullCheck(true)
+
+val sqlQuery = "SELECT c, g FROM Table3 LEFT OUTER JOIN Table5 ON b = 
e and a > d"
+
+val ds1 = 
CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
+val ds2 = 
CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('d, 'e, 'f, 'g, 'h)
+tEnv.registerTable("Table3", ds1)
+tEnv.registerTable("Table5", ds2)
+
+tEnv.sql(sqlQuery).toDataSet[Row].collect()
+
+val results = tEnv.sql(sqlQuery).toDataSet[Row].collect()
+  }
+
+  @Test(expected = classOf[TableException])
+  def testFullOuterJoinWithNonEquiJoinPredicate(): Unit = {
+
+val env = ExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env, config)
+tEnv.getConfig.setNullCheck(true)
+
+val sqlQuery = "SELECT c, g FROM Table3 FULL OUTER JOIN Table5 ON b = 
e and a > d"
+
+val ds1 = 
CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
+val ds2 = 
CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('d, 'e, 'f, 'g, 'h)
+tEnv.registerTable("Table3", ds1)
+tEnv.registerTable("Table5", ds2)
+
+tEnv.sql(sqlQuery).toDataSet[Row].collect()
+
+val results = tEnv.sql(sqlQuery).toDataSet[Row].collect()
--- End diff --

remove


> Disable outer joins with non-equality predicates
> 
>
> Key: FLINK-5520
> URL: https://issues.apache.org/jira/browse/FLINK-5520
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: lincoln.lee
>Priority: Blocker
> Fix For: 1.2.0
>
>
> Outer joins with non-equality predicates (and at least one equality 
> predicate) compute incorrect results. 
> Since this is not a very common requirement, I propose to disable this 
> feature for the 1.2.0 release and correctly implement it for a later version.
> The fix should add checks in the Table API validation phase (to get a good 
> error message) and in the DataSetJoinRule to prevent translation of SQL 
> queries with non-equality predicates on outer joins.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5572) ListState in SlidingEventTimeWindow and SlidingProcessingTimeWindow optimization

2017-01-19 Thread Syinchwun Leo (JIRA)
Syinchwun Leo created FLINK-5572:


 Summary: ListState in SlidingEventTimeWindow and 
SlidingProcessingTimeWindow optimization
 Key: FLINK-5572
 URL: https://issues.apache.org/jira/browse/FLINK-5572
 Project: Flink
  Issue Type: Improvement
  Components: Core
Affects Versions: 1.1.4, 1.2.0
 Environment: CentOS 7.2
Reporter: Syinchwun Leo


When using ListState in SlidingEventTimeWindow and SlidingProcessingTimeWindow, 
an element  may be assigned to multiple overlapped windows. It may lead to 
storage consuming. for example, 
window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(2))).apply(UDF 
window function), each element is assigned to 5 windows, shown as the following 
Fig. When the window size is very large, it is unacceptable(size/slide is very 
large).

We plan to make a little optimization, and the doc is in 
https://docs.google.com/document/d/1HCt1Si3YNGFwsl2H5SO0f7WD69DdBBPVJA6abd3oFWo/edit?usp=sharing
 

Comments?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5572) ListState in SlidingEventTimeWindow and SlidingProcessingTimeWindow optimization

2017-01-19 Thread Syinchwun Leo (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5572?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Syinchwun Leo updated FLINK-5572:
-
Description: 
When using ListState in SlidingEventTimeWindow and SlidingProcessingTimeWindow, 
an element  may be assigned to multiple overlapped windows. It may lead to 
storage consuming. for example, 
window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(2))).apply(UDF 
window function), each element is assigned to 5 windows, When the window size 
is very large, it is unacceptable(size/slide is very large).

We plan to make a little optimization, and the doc is in 
https://docs.google.com/document/d/1HCt1Si3YNGFwsl2H5SO0f7WD69DdBBPVJA6abd3oFWo/edit?usp=sharing
 

Comments?

  was:
When using ListState in SlidingEventTimeWindow and SlidingProcessingTimeWindow, 
an element  may be assigned to multiple overlapped windows. It may lead to 
storage consuming. for example, 
window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(2))).apply(UDF 
window function), each element is assigned to 5 windows, shown as the following 
Fig. When the window size is very large, it is unacceptable(size/slide is very 
large).

We plan to make a little optimization, and the doc is in 
https://docs.google.com/document/d/1HCt1Si3YNGFwsl2H5SO0f7WD69DdBBPVJA6abd3oFWo/edit?usp=sharing
 

Comments?


> ListState in SlidingEventTimeWindow and SlidingProcessingTimeWindow 
> optimization
> 
>
> Key: FLINK-5572
> URL: https://issues.apache.org/jira/browse/FLINK-5572
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.2.0, 1.1.4
> Environment: CentOS 7.2
>Reporter: Syinchwun Leo
>
> When using ListState in SlidingEventTimeWindow and 
> SlidingProcessingTimeWindow, an element  may be assigned to multiple 
> overlapped windows. It may lead to storage consuming. for example, 
> window(SlidingEventTimeWindows.of(Time.seconds(10), 
> Time.seconds(2))).apply(UDF window function), each element is assigned to 5 
> windows, When the window size is very large, it is unacceptable(size/slide is 
> very large).
> We plan to make a little optimization, and the doc is in 
> https://docs.google.com/document/d/1HCt1Si3YNGFwsl2H5SO0f7WD69DdBBPVJA6abd3oFWo/edit?usp=sharing
>  
> Comments?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-5377) Improve savepoint docs

2017-01-19 Thread Ufuk Celebi (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5377?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ufuk Celebi closed FLINK-5377.
--
   Resolution: Fixed
Fix Version/s: 1.3.0
   1.2.0

Fixed in 57a3955 (release-1.2), 6d8b3f5 (master).

> Improve savepoint docs
> --
>
> Key: FLINK-5377
> URL: https://issues.apache.org/jira/browse/FLINK-5377
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
> Fix For: 1.2.0, 1.3.0
>
>
> The savepoint docs are very detailed and focus on the internals. They should 
> better convey what users have to take care of.
> The following questions should be answered:
> What happens if I add a new operator that requires state to my flow?
> What happens if I delete an operator that has state to my flow?
> What happens if I reorder stateful operators in my flow?
> What happens if I add or delete or reorder operators that have no state in my 
> flow?
> Should I apply .uid to all operators in my flow?
> Should I apply .uid to only the operators that have state?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #3165: [FLINK-5560] [webfrontend] Align table header columns wit...

2017-01-19 Thread rmetzger
Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/3165
  
+1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5560) Header in checkpoint stats summary misaligned

2017-01-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15829948#comment-15829948
 ] 

ASF GitHub Bot commented on FLINK-5560:
---

Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/3165
  
+1


> Header in checkpoint stats summary misaligned
> -
>
> Key: FLINK-5560
> URL: https://issues.apache.org/jira/browse/FLINK-5560
> Project: Flink
>  Issue Type: Bug
>  Components: Webfrontend
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
>Priority: Minor
>
> The checkpoint summary stats table header line is misaligned. The first and 
> second head columns need to be swapped.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5573) Kafka08ITCase.testStartFromZookeeperCommitOffsets fails on Travis

2017-01-19 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-5573:


 Summary: Kafka08ITCase.testStartFromZookeeperCommitOffsets fails 
on Travis
 Key: FLINK-5573
 URL: https://issues.apache.org/jira/browse/FLINK-5573
 Project: Flink
  Issue Type: Bug
  Components: Kafka Connector
Affects Versions: 1.3.0
Reporter: Till Rohrmann
Priority: Critical


The test case {{Kafka08ITCase.testStartFromZookeeperCommitOffsets}} fails on 
Travis with 

{code}
org.apache.flink.client.program.ProgramInvocationException: The program 
execution failed: Job execution failed.
at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
at 
org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:101)
at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:400)
at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:392)
at 
org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.executeRemotely(RemoteStreamEnvironment.java:209)
at 
org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.execute(RemoteStreamEnvironment.java:173)
at org.apache.flink.test.util.TestUtils.tryExecute(TestUtils.java:32)
at 
org.apache.flink.streaming.connectors.kafka.KafkaConsumerTestBase.runStartFromKafkaCommitOffsets(KafkaConsumerTestBase.java:323)
at 
org.apache.flink.streaming.connectors.kafka.Kafka08ITCase.testStartFromZookeeperCommitOffsets(Kafka08ITCase.java:147)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at 
org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298)
at 
org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution 
failed.
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:905)
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:848)
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:848)
at 
scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at 
scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.RuntimeException: Error while confirming checkpoint
at org.apache.flink.runtime.taskmanager.Task$4.run(Task.java:1177)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalStateException: instance must be started before 
calling this method
at 
org.apache.flink.shaded.com.google.common.base.Preconditions.checkState(Preconditions.java:173)
at 
org.apache.curator.framework.imps.CuratorFrameworkImpl.setData(CuratorFrameworkImpl.java:371)
at 
org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.setOffsetInZooKeeper(ZookeeperOffsetHandler.java:135)
at 
org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.prepareAndCommitOffsets(Zooke

[GitHub] flink issue #3077: [FLINK-5423] Implement Stochastic Outlier Selection

2017-01-19 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/3077
  
Changes look really good to me :-) Travis is passing modulo an unrelated 
test failure. Will merge this PR. Thanks a lot for your contribution @Fokko.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5423) Implement Stochastic Outlier Selection

2017-01-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15829952#comment-15829952
 ] 

ASF GitHub Bot commented on FLINK-5423:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/3077
  
Changes look really good to me :-) Travis is passing modulo an unrelated 
test failure. Will merge this PR. Thanks a lot for your contribution @Fokko.


> Implement Stochastic Outlier Selection
> --
>
> Key: FLINK-5423
> URL: https://issues.apache.org/jira/browse/FLINK-5423
> Project: Flink
>  Issue Type: Improvement
>  Components: Machine Learning Library
>Reporter: Fokko Driesprong
>Assignee: Fokko Driesprong
>
> I've implemented the Stochastic Outlier Selection (SOS) algorithm by Jeroen 
> Jansen.
> http://jeroenjanssens.com/2013/11/24/stochastic-outlier-selection.html
> Integrated as much as possible with the components from the machine learning 
> library.
> The algorithm itself has been compared to four other algorithms and it it 
> shows that SOS has a higher performance on most of these real-world datasets. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #3170: [FLINK-5555] Document how to debug event time usin...

2017-01-19 Thread rmetzger
GitHub user rmetzger opened a pull request:

https://github.com/apache/flink/pull/3170

[FLINK-] Document how to debug event time using watermarks 

I've also added a section to the Kafka connector's page.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/rmetzger/flink flink

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3170.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3170


commit 3fe291ab96f93efd52fe59d6268c27f76d36847f
Author: Robert Metzger 
Date:   2017-01-19T13:44:05Z

[FLINK-] Document how to debug event time using watermarks (+ kafka 
metrics)




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5555) Add documentation about debugging watermarks

2017-01-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15829954#comment-15829954
 ] 

ASF GitHub Bot commented on FLINK-:
---

GitHub user rmetzger opened a pull request:

https://github.com/apache/flink/pull/3170

[FLINK-] Document how to debug event time using watermarks 

I've also added a section to the Kafka connector's page.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/rmetzger/flink flink

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3170.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3170


commit 3fe291ab96f93efd52fe59d6268c27f76d36847f
Author: Robert Metzger 
Date:   2017-01-19T13:44:05Z

[FLINK-] Document how to debug event time using watermarks (+ kafka 
metrics)




> Add documentation about debugging watermarks
> 
>
> Key: FLINK-
> URL: https://issues.apache.org/jira/browse/FLINK-
> Project: Flink
>  Issue Type: Sub-task
>  Components: Documentation
>Affects Versions: 1.2.0
>Reporter: Robert Metzger
>Assignee: Robert Metzger
> Fix For: 1.2.0
>
>
> This was a frequent question on the mailing list.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #3156: [FLINK-5496] [mesos] Relocate Mesos Protobuf dependency t...

2017-01-19 Thread rmetzger
Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/3156
  
+1 to merge (I assume you've tested this)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5496) ClassCastException when using Mesos HA mode

2017-01-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5496?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15829960#comment-15829960
 ] 

ASF GitHub Bot commented on FLINK-5496:
---

Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/3156
  
+1 to merge (I assume you've tested this)


> ClassCastException when using Mesos HA mode
> ---
>
> Key: FLINK-5496
> URL: https://issues.apache.org/jira/browse/FLINK-5496
> Project: Flink
>  Issue Type: Bug
>  Components: Mesos
>Affects Versions: 1.2.0, 1.3.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Critical
> Fix For: 1.2.0, 1.3.0
>
>
> When using the Mesos' HA mode, one cannot start the Mesos appmaster, because 
> the following class cast exception occurs:
> {code}
> java.lang.ClassCastException: 
> org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorFrameworkImpl
>  cannot be cast to 
> org.apache.flink.mesos.shaded.org.apache.curator.framework.CuratorFramework
>   at 
> org.apache.flink.mesos.util.ZooKeeperUtils.startCuratorFramework(ZooKeeperUtils.java:38)
>   at 
> org.apache.flink.mesos.runtime.clusterframework.MesosApplicationMasterRunner.createWorkerStore(MesosApplicationMasterRunner.java:510)
>   at 
> org.apache.flink.mesos.runtime.clusterframework.MesosApplicationMasterRunner.runPrivileged(MesosApplicationMasterRunner.java:320)
>   at 
> org.apache.flink.mesos.runtime.clusterframework.MesosApplicationMasterRunner$1.call(MesosApplicationMasterRunner.java:178)
>   at 
> org.apache.flink.mesos.runtime.clusterframework.MesosApplicationMasterRunner$1.call(MesosApplicationMasterRunner.java:175)
>   at 
> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:29)
>   at 
> org.apache.flink.mesos.runtime.clusterframework.MesosApplicationMasterRunner.run(MesosApplicationMasterRunner.java:175)
>   at 
> org.apache.flink.mesos.runtime.clusterframework.MesosApplicationMasterRunner.main(MesosApplicationMasterRunner.java:135)
> {code}
> It seems as if the {{flink-mesos}} module relocates the curator dependency in 
> another namespace than {{flink-runtime}}. Not sure why this is done. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5573) Kafka08ITCase.testStartFromZookeeperCommitOffsets fails on Travis

2017-01-19 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5573?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15829962#comment-15829962
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-5573:


Not sure if it's a duplicate, but this seems related: FLINK-4905.

> Kafka08ITCase.testStartFromZookeeperCommitOffsets fails on Travis
> -
>
> Key: FLINK-5573
> URL: https://issues.apache.org/jira/browse/FLINK-5573
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.3.0
>Reporter: Till Rohrmann
>Priority: Critical
>  Labels: test-stability
>
> The test case {{Kafka08ITCase.testStartFromZookeeperCommitOffsets}} fails on 
> Travis with 
> {code}
> org.apache.flink.client.program.ProgramInvocationException: The program 
> execution failed: Job execution failed.
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
>   at 
> org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:101)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:400)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:392)
>   at 
> org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.executeRemotely(RemoteStreamEnvironment.java:209)
>   at 
> org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.execute(RemoteStreamEnvironment.java:173)
>   at org.apache.flink.test.util.TestUtils.tryExecute(TestUtils.java:32)
>   at 
> org.apache.flink.streaming.connectors.kafka.KafkaConsumerTestBase.runStartFromKafkaCommitOffsets(KafkaConsumerTestBase.java:323)
>   at 
> org.apache.flink.streaming.connectors.kafka.Kafka08ITCase.testStartFromZookeeperCommitOffsets(Kafka08ITCase.java:147)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:483)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298)
>   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job 
> execution failed.
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:905)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:848)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:848)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.RuntimeException: Error while confirming checkpoint
>   at org.apache.flink.runtime.taskmanager.Task$4.run(Task.java:1177)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.IllegalStateException: instance must be started before 
> calling this method
>   at 
> org.apache.flink.shaded.com.go

  1   2   3   >