[jira] [Commented] (FLINK-10218) Allow writing DataSet without explicit path parameter

2018-08-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10218?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16593154#comment-16593154
 ] 

ASF GitHub Bot commented on FLINK-10218:


link3280 opened a new pull request #6616: [FLINK-10218] Allow writing DataSet 
without explicit path parameter
URL: https://github.com/apache/flink/pull/6616
 
 
   ## What is the purpose of the change
   
   Add an file output helper method, which requires only FileOutputFormat 
parameter, to DataSet API. This can avoid setting duplicate path parameters, 
since the output path could be found in FileOutputFormat.
   
   ## Brief change log
   
 - *Add an file output helper method, which requires only FileOutputFormat 
parameter, to DataSet API.*
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Allow writing DataSet without explicit path parameter
> -
>
> Key: FLINK-10218
> URL: https://issues.apache.org/jira/browse/FLINK-10218
> Project: Flink
>  Issue Type: Improvement
>  Components: DataSet API
>Affects Versions: 1.6.0
>Reporter: Paul Lin
>Priority: Minor
>  Labels: pull-request-available
>
> Currently, DataSet API has two overloaded `write` methods for using 
> FileOutputFormat as output format, and both require a path parameter, but the 
> output path could already be set in the FileOutputFormat object. What's more, 
> the subclasses of FileOutputFormat mostly don't have default constructors and 
> required a path parameter too, so users have to set output path twice in the 
> code, like:
> {code:java}
>   String output = "hdfs:///tmp/";
>   dataset.write(new TextOutputFormat<>(new Path(output)), output);
> {code}
> So I propose to add another write helper method that requires no path 
> parameter. May someone assign this issue to me?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10218) Allow writing DataSet without explicit path parameter

2018-08-26 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10218?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-10218:
---
Labels: pull-request-available  (was: )

> Allow writing DataSet without explicit path parameter
> -
>
> Key: FLINK-10218
> URL: https://issues.apache.org/jira/browse/FLINK-10218
> Project: Flink
>  Issue Type: Improvement
>  Components: DataSet API
>Affects Versions: 1.6.0
>Reporter: Paul Lin
>Priority: Minor
>  Labels: pull-request-available
>
> Currently, DataSet API has two overloaded `write` methods for using 
> FileOutputFormat as output format, and both require a path parameter, but the 
> output path could already be set in the FileOutputFormat object. What's more, 
> the subclasses of FileOutputFormat mostly don't have default constructors and 
> required a path parameter too, so users have to set output path twice in the 
> code, like:
> {code:java}
>   String output = "hdfs:///tmp/";
>   dataset.write(new TextOutputFormat<>(new Path(output)), output);
> {code}
> So I propose to add another write helper method that requires no path 
> parameter. May someone assign this issue to me?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] link3280 opened a new pull request #6616: [FLINK-10218] Allow writing DataSet without explicit path parameter

2018-08-26 Thread GitBox
link3280 opened a new pull request #6616: [FLINK-10218] Allow writing DataSet 
without explicit path parameter
URL: https://github.com/apache/flink/pull/6616
 
 
   ## What is the purpose of the change
   
   Add an file output helper method, which requires only FileOutputFormat 
parameter, to DataSet API. This can avoid setting duplicate path parameters, 
since the output path could be found in FileOutputFormat.
   
   ## Brief change log
   
 - *Add an file output helper method, which requires only FileOutputFormat 
parameter, to DataSet API.*
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10218) Allow writing DataSet without explicit path parameter

2018-08-26 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10218?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16593135#comment-16593135
 ] 

vinoyang commented on FLINK-10218:
--

[~Paul Lin] Currently you may have no contributor permissions, ping 
[~till.rohrmann] [~Zentol]

> Allow writing DataSet without explicit path parameter
> -
>
> Key: FLINK-10218
> URL: https://issues.apache.org/jira/browse/FLINK-10218
> Project: Flink
>  Issue Type: Improvement
>  Components: DataSet API
>Affects Versions: 1.6.0
>Reporter: Paul Lin
>Priority: Minor
>
> Currently, DataSet API has two overloaded `write` methods for using 
> FileOutputFormat as output format, and both require a path parameter, but the 
> output path could already be set in the FileOutputFormat object. What's more, 
> the subclasses of FileOutputFormat mostly don't have default constructors and 
> required a path parameter too, so users have to set output path twice in the 
> code, like:
> {code:java}
>   String output = "hdfs:///tmp/";
>   dataset.write(new TextOutputFormat<>(new Path(output)), output);
> {code}
> So I propose to add another write helper method that requires no path 
> parameter. May someone assign this issue to me?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10170) Support string representation for map types in descriptor-based Table API

2018-08-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16593132#comment-16593132
 ] 

ASF GitHub Bot commented on FLINK-10170:


tragicjun commented on issue #6578: [FLINK-10170] [table] Support string 
representation for map types in descriptor-based Table API
URL: https://github.com/apache/flink/pull/6578#issuecomment-416097515
 
 
   @twalthr could you review this again? Anything else I should do before the 
merge, thanks.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support string representation for map types in descriptor-based Table API
> -
>
> Key: FLINK-10170
> URL: https://issues.apache.org/jira/browse/FLINK-10170
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Affects Versions: 1.6.1, 1.7.0
>Reporter: Jun Zhang
>Assignee: Jun Zhang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
> Attachments: flink-10170
>
>
> Since 1.6 the recommended way of creating source/sink table is using 
> connector/format/schema/ descriptors. However, when declaring map types in 
> the schema descriptor, the following exception would be thrown:
> {quote}org.apache.flink.table.api.TableException: A string representation for 
> map types is not supported yet.{quote}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] tragicjun commented on issue #6578: [FLINK-10170] [table] Support string representation for map types in descriptor-based Table API

2018-08-26 Thread GitBox
tragicjun commented on issue #6578: [FLINK-10170] [table] Support string 
representation for map types in descriptor-based Table API
URL: https://github.com/apache/flink/pull/6578#issuecomment-416097515
 
 
   @twalthr could you review this again? Anything else I should do before the 
merge, thanks.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10216) Add REGEXP_MATCH in TableAPI and SQL

2018-08-26 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10216?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16593129#comment-16593129
 ] 

vinoyang commented on FLINK-10216:
--

[~juho.autio.r] Flink already has a function for regular matching, called 
"similar to", which is provided by Apache Calcite. Reference documentation: 
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/table/sql.html#built-in-functions

> Add REGEXP_MATCH in TableAPI and SQL
> 
>
> Key: FLINK-10216
> URL: https://issues.apache.org/jira/browse/FLINK-10216
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Juho Autio
>Priority: Major
>
> Here's a naive implementation:
> {code:java}
> public class RegexpMatchFunction extends ScalarFunction {
> // NOTE! Flink calls eval() by reflection
> public boolean eval(String value, String pattern) {
> return value != null && pattern != null && value.matches(pattern);
> }
> }
> {code}
> I wonder if there would be a way to optimize this to use 
> {{Pattern.compile(value)}} and use the compiled Pattern for multiple calls 
> (possibly different values, but same pattern).
> h3. Naming
> Should regex functions be prefixed with {{regexp_}} or {{regex_}}? See also: 
> [https://github.com/apache/flink/pull/6448#issuecomment-415972833]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9990) Add regex_extract supported in TableAPI and SQL

2018-08-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9990?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16593121#comment-16593121
 ] 

ASF GitHub Bot commented on FLINK-9990:
---

yanghua commented on issue #6448: [FLINK-9990] [table] Add regex_extract 
supported in TableAPI and SQL
URL: https://github.com/apache/flink/pull/6448#issuecomment-416095640
 
 
   @juhoautio Yes, you are right, agree!


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add regex_extract supported in TableAPI and SQL
> ---
>
> Key: FLINK-9990
> URL: https://issues.apache.org/jira/browse/FLINK-9990
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Minor
>  Labels: pull-request-available
>
> regex_extract is a very useful function, it returns a string based on a regex 
> pattern and a index.
> For example : 
> {code:java}
> regexp_extract('foothebar', 'foo(.*?)(bar)', 2) // returns 'bar.'
> {code}
> It is provided as a UDF in Hive, more details please see[1].
> [1]: https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] yanghua commented on issue #6448: [FLINK-9990] [table] Add regex_extract supported in TableAPI and SQL

2018-08-26 Thread GitBox
yanghua commented on issue #6448: [FLINK-9990] [table] Add regex_extract 
supported in TableAPI and SQL
URL: https://github.com/apache/flink/pull/6448#issuecomment-416095640
 
 
   @juhoautio Yes, you are right, agree!


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9311) PubSub connector

2018-08-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16593118#comment-16593118
 ] 

ASF GitHub Bot commented on FLINK-9311:
---

yanghua commented on a change in pull request #6594: [FLINK-9311] [pubsub] 
Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
URL: https://github.com/apache/flink/pull/6594#discussion_r212852318
 
 

 ##
 File path: 
flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/PubSubSink.java
 ##
 @@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pubsub;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import 
org.apache.flink.streaming.connectors.pubsub.common.SerializableCredentialsProvider;
+
+import com.google.api.gax.core.CredentialsProvider;
+import com.google.api.gax.grpc.GrpcTransportChannel;
+import com.google.api.gax.rpc.FixedTransportChannelProvider;
+import com.google.api.gax.rpc.TransportChannel;
+import com.google.auth.Credentials;
+import com.google.cloud.pubsub.v1.Publisher;
+import com.google.protobuf.ByteString;
+import com.google.pubsub.v1.ProjectTopicName;
+import com.google.pubsub.v1.PubsubMessage;
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+
+import java.io.IOException;
+
+/**
+ * A sink function that outputs to PubSub.
+ *
+ * @param  type of PubSubSink messages to write
+ */
+public class PubSubSink extends RichSinkFunction {
+
+   private SerializableCredentialsProvider serializableCredentialsProvider;
+   private SerializationSchema serializationSchema;
+   private String projectName;
+   private String topicName;
+   private String hostAndPort = null;
+
+   private transient Publisher publisher;
+
+   private PubSubSink() {
 
 Review comment:
   Sorry, I missed your `PubSubSinkBuilder`, I just saw that you have done a 
lot of NPE checks on the `initialize` method in this class. If they are 
mandatory fields, you can pass them directly in the constructor of 
`PubSubSinkBuilder`. Maybe this can save these checks?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> PubSub connector
> 
>
> Key: FLINK-9311
> URL: https://issues.apache.org/jira/browse/FLINK-9311
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Richard Deurwaarder
>Assignee: Niels Basjes
>Priority: Minor
>  Labels: pull-request-available
>
> I would like start adding some google cloud connectors starting with a PubSub 
> Source. I have a basic implementation ready but I want it to be able to:
>  * easily scale up (should I have it extend RichParallelSourceFunction for 
> this?)
>  * Make it easier to provide the google cloud credentials. This would require 
> being able to send some json string / ServiceAccount to the nodes when 
> starting up this source.
> Could this be something that would be useful for others and added to the 
> flink connectors repo?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] yanghua commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)

2018-08-26 Thread GitBox
yanghua commented on a change in pull request #6594: [FLINK-9311] [pubsub] 
Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
URL: https://github.com/apache/flink/pull/6594#discussion_r212852318
 
 

 ##
 File path: 
flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/PubSubSink.java
 ##
 @@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pubsub;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import 
org.apache.flink.streaming.connectors.pubsub.common.SerializableCredentialsProvider;
+
+import com.google.api.gax.core.CredentialsProvider;
+import com.google.api.gax.grpc.GrpcTransportChannel;
+import com.google.api.gax.rpc.FixedTransportChannelProvider;
+import com.google.api.gax.rpc.TransportChannel;
+import com.google.auth.Credentials;
+import com.google.cloud.pubsub.v1.Publisher;
+import com.google.protobuf.ByteString;
+import com.google.pubsub.v1.ProjectTopicName;
+import com.google.pubsub.v1.PubsubMessage;
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+
+import java.io.IOException;
+
+/**
+ * A sink function that outputs to PubSub.
+ *
+ * @param  type of PubSubSink messages to write
+ */
+public class PubSubSink extends RichSinkFunction {
+
+   private SerializableCredentialsProvider serializableCredentialsProvider;
+   private SerializationSchema serializationSchema;
+   private String projectName;
+   private String topicName;
+   private String hostAndPort = null;
+
+   private transient Publisher publisher;
+
+   private PubSubSink() {
 
 Review comment:
   Sorry, I missed your `PubSubSinkBuilder`, I just saw that you have done a 
lot of NPE checks on the `initialize` method in this class. If they are 
mandatory fields, you can pass them directly in the constructor of 
`PubSubSinkBuilder`. Maybe this can save these checks?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9311) PubSub connector

2018-08-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16593115#comment-16593115
 ] 

ASF GitHub Bot commented on FLINK-9311:
---

yanghua commented on a change in pull request #6594: [FLINK-9311] [pubsub] 
Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
URL: https://github.com/apache/flink/pull/6594#discussion_r212851768
 
 

 ##
 File path: 
flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/Bound.java
 ##
 @@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pubsub;
+
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.Timer;
+import java.util.TimerTask;
+
+class Bound implements Serializable {
+   private static final Logger LOG = LoggerFactory.getLogger(Bound.class);
+
+   private final Bound.Mode mode;
+   private final long maxMessagedReceived;
+   private final long maxTimeBetweenMessages;
+
+   private SourceFunction sourceFunction;
+   private transient Timer timer;
+   private long messagesReceived;
+   private long lastReceivedMessage;
+   private boolean cancelled = false;
+
+   private Bound(Bound.Mode mode, long maxMessagedReceived, long 
maxTimeBetweenMessages) {
+   this.mode = mode;
+   this.maxMessagedReceived = maxMessagedReceived;
+   this.maxTimeBetweenMessages = maxTimeBetweenMessages;
+   this.messagesReceived = 0L;
+   }
+
+   static  Bound boundByAmountOfMessages(long 
maxMessagedReceived) {
+   return new Bound<>(Mode.COUNTER, maxMessagedReceived, 0L);
+   }
+
+   static  Bound boundByTimeSinceLastMessage(long 
maxTimeBetweenMessages) {
+   return new Bound<>(Mode.TIMER, 0L, maxTimeBetweenMessages);
+   }
+
+   static  Bound 
boundByAmountOfMessagesOrTimeSinceLastMessage(long maxMessagedReceived, long 
maxTimeBetweenMessages) {
+   return new Bound<>(Mode.COUNTER_OR_TIMER, maxMessagedReceived, 
maxTimeBetweenMessages);
+   }
+
+   private TimerTask shutdownPubSubSource() {
+   return new TimerTask() {
+   @Override
+   public void run() {
+   if (maxTimeBetweenMessagesElapsed()) {
+   
cancelPubSubSource("BoundedSourceFunction: Idle timeout --> canceling source");
+   timer.cancel();
+   }
+   }
+   };
+   }
+
+   private synchronized boolean maxTimeBetweenMessagesElapsed() {
+   return System.currentTimeMillis() - lastReceivedMessage > 
maxTimeBetweenMessages;
+   }
+
+   private synchronized void cancelPubSubSource(String logMessage) {
+   if (!cancelled) {
+   cancelled = true;
+   sourceFunction.cancel();
+   LOG.info(logMessage);
+   }
+   }
+
+   void start(SourceFunction sourceFunction) {
+   if (this.sourceFunction != null) {
+   throw new IllegalStateException("start() already 
called");
+   }
+
+   this.sourceFunction = sourceFunction;
+   messagesReceived = 0;
+
+   if (mode == Mode.TIMER || mode == Mode.COUNTER_OR_TIMER) {
+   lastReceivedMessage = System.currentTimeMillis();
+   timer = new Timer();
+   timer.schedule(shutdownPubSubSource(), 0, 100);
+   }
+   }
+
+   synchronized void receivedMessage() {
+   if (sourceFunction == null) {
+   throw new IllegalStateException("start() not called");
+   }
+
+   lastReceivedMessage = System.currentTimeMillis();
+   messagesReceived++;
 
 Review comment:
   yes, I mean to expose 

[GitHub] yanghua commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)

2018-08-26 Thread GitBox
yanghua commented on a change in pull request #6594: [FLINK-9311] [pubsub] 
Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
URL: https://github.com/apache/flink/pull/6594#discussion_r212851768
 
 

 ##
 File path: 
flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/Bound.java
 ##
 @@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pubsub;
+
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.Timer;
+import java.util.TimerTask;
+
+class Bound implements Serializable {
+   private static final Logger LOG = LoggerFactory.getLogger(Bound.class);
+
+   private final Bound.Mode mode;
+   private final long maxMessagedReceived;
+   private final long maxTimeBetweenMessages;
+
+   private SourceFunction sourceFunction;
+   private transient Timer timer;
+   private long messagesReceived;
+   private long lastReceivedMessage;
+   private boolean cancelled = false;
+
+   private Bound(Bound.Mode mode, long maxMessagedReceived, long 
maxTimeBetweenMessages) {
+   this.mode = mode;
+   this.maxMessagedReceived = maxMessagedReceived;
+   this.maxTimeBetweenMessages = maxTimeBetweenMessages;
+   this.messagesReceived = 0L;
+   }
+
+   static  Bound boundByAmountOfMessages(long 
maxMessagedReceived) {
+   return new Bound<>(Mode.COUNTER, maxMessagedReceived, 0L);
+   }
+
+   static  Bound boundByTimeSinceLastMessage(long 
maxTimeBetweenMessages) {
+   return new Bound<>(Mode.TIMER, 0L, maxTimeBetweenMessages);
+   }
+
+   static  Bound 
boundByAmountOfMessagesOrTimeSinceLastMessage(long maxMessagedReceived, long 
maxTimeBetweenMessages) {
+   return new Bound<>(Mode.COUNTER_OR_TIMER, maxMessagedReceived, 
maxTimeBetweenMessages);
+   }
+
+   private TimerTask shutdownPubSubSource() {
+   return new TimerTask() {
+   @Override
+   public void run() {
+   if (maxTimeBetweenMessagesElapsed()) {
+   
cancelPubSubSource("BoundedSourceFunction: Idle timeout --> canceling source");
+   timer.cancel();
+   }
+   }
+   };
+   }
+
+   private synchronized boolean maxTimeBetweenMessagesElapsed() {
+   return System.currentTimeMillis() - lastReceivedMessage > 
maxTimeBetweenMessages;
+   }
+
+   private synchronized void cancelPubSubSource(String logMessage) {
+   if (!cancelled) {
+   cancelled = true;
+   sourceFunction.cancel();
+   LOG.info(logMessage);
+   }
+   }
+
+   void start(SourceFunction sourceFunction) {
+   if (this.sourceFunction != null) {
+   throw new IllegalStateException("start() already 
called");
+   }
+
+   this.sourceFunction = sourceFunction;
+   messagesReceived = 0;
+
+   if (mode == Mode.TIMER || mode == Mode.COUNTER_OR_TIMER) {
+   lastReceivedMessage = System.currentTimeMillis();
+   timer = new Timer();
+   timer.schedule(shutdownPubSubSource(), 0, 100);
+   }
+   }
+
+   synchronized void receivedMessage() {
+   if (sourceFunction == null) {
+   throw new IllegalStateException("start() not called");
+   }
+
+   lastReceivedMessage = System.currentTimeMillis();
+   messagesReceived++;
 
 Review comment:
   yes, I mean to expose the number of messages received as metrics, just an 
idea, not sure if it is a good suggestion.


This is an automated message from the Apache Git Service.
To respond to the 

[jira] [Updated] (FLINK-10218) Allow writing DataSet without explicit path parameter

2018-08-26 Thread Paul Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10218?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Paul Lin updated FLINK-10218:
-
Description: 
Currently, DataSet API has two overloaded `write` methods for using 
FileOutputFormat as output format, and both require a path parameter, but the 
output path could already be set in the FileOutputFormat object. What's more, 
the subclasses of FileOutputFormat mostly don't have default constructors and 
required a path parameter too, so users have to set output path twice in the 
code, like:
{code:java}
  String output = "hdfs:///tmp/";
  dataset.write(new TextOutputFormat<>(new Path(output)), output);
{code}
So I propose to add another write helper method that requires no path 
parameter. May someone assign this issue to me?

  was:
Currently, DataSet API has two overloaded write methods for using 
FileOutputFormat as output format, and both require a path parameter, but the 
output path could already be set in the FileOutputFormat object. What's more, 
the subclasses of FileOutputFormat mostly don't have default constructors and 
required a path parameter too, so users have to set output path twice in the 
code, like:
{code:java}
  String output = "hdfs:///tmp/";
  dataset.write(new TextOutputFormat<>(new Path(output)), output);
{code}
So I propose to add another write method that requires no path parameter. May 
someone assign this issue to me?

Summary: Allow writing DataSet without explicit path parameter  (was: 
Allow write DataSet without path parameter)

> Allow writing DataSet without explicit path parameter
> -
>
> Key: FLINK-10218
> URL: https://issues.apache.org/jira/browse/FLINK-10218
> Project: Flink
>  Issue Type: Improvement
>  Components: DataSet API
>Affects Versions: 1.6.0
>Reporter: Paul Lin
>Priority: Minor
>
> Currently, DataSet API has two overloaded `write` methods for using 
> FileOutputFormat as output format, and both require a path parameter, but the 
> output path could already be set in the FileOutputFormat object. What's more, 
> the subclasses of FileOutputFormat mostly don't have default constructors and 
> required a path parameter too, so users have to set output path twice in the 
> code, like:
> {code:java}
>   String output = "hdfs:///tmp/";
>   dataset.write(new TextOutputFormat<>(new Path(output)), output);
> {code}
> So I propose to add another write helper method that requires no path 
> parameter. May someone assign this issue to me?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10205) Batch Job: InputSplit Fault tolerant for DataSourceTask

2018-08-26 Thread JIN SUN (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16593082#comment-16593082
 ] 

JIN SUN commented on FLINK-10205:
-

Great Stephan, i actually has the same idea, for the batch failover, there are 
a bunch of things we need to consider and discuss. I can prepare a FLIP. 

> Batch Job: InputSplit Fault tolerant for DataSourceTask
> ---
>
> Key: FLINK-10205
> URL: https://issues.apache.org/jira/browse/FLINK-10205
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Reporter: JIN SUN
>Assignee: JIN SUN
>Priority: Major
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Today DataSource Task pull InputSplits from JobManager to achieve better 
> performance, however, when a DataSourceTask failed and rerun, it will not get 
> the same splits as its previous version. this will introduce inconsistent 
> result or even data corruption.
> Furthermore,  if there are two executions run at the same time (in batch 
> scenario), this two executions should process same splits.
> we need to fix the issue to make the inputs of a DataSourceTask 
> deterministic. The propose is save all splits into ExecutionVertex and 
> DataSourceTask will pull split from there.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8354) Flink Kafka connector ignores Kafka message headers

2018-08-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16593020#comment-16593020
 ] 

ASF GitHub Bot commented on FLINK-8354:
---

alexeyt820 opened a new pull request #6615: [FLINK-8354] [flink-connectors] Add 
ability to access and provider Kafka headers
URL: https://github.com/apache/flink/pull/6615
 
 
   
   ## What is the purpose of the change
   This changes allows to access Kafka headers in KeyedDeserializationSchema, 
and to provide Kafka headers via KeyedSerializationSchema.headers method
   
   ## Brief change log
   - Add default _headers_ method to KeyedSerializationSchema interface
   - Add default deserialize method with _headers_ argument to 
KeyedDeserializationSchema
   - Add Kafka011Fetcher to get headers from Kafka ConsumerRecord and provide 
them as parameter KeyedDeserializationSchema
   - Modify FlinkKafkaProducer011 to request _headers_ from 
KeyedSerializationSchema and add them to ProducerRecord.
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
 - *Added integration test *Kafka011ITCase.testHeaders* to verify that we 
can produce headers and consume them
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (yes)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes)
 - If yes, how is the feature documented? (JavaDocs)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Flink Kafka connector ignores Kafka message  headers 
> -
>
> Key: FLINK-8354
> URL: https://issues.apache.org/jira/browse/FLINK-8354
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
> Environment: Kafka 0.11.0.0
> Flink 1.4.0
> flink-connector-kafka-0.11_2.11 
>Reporter: Mohammad Abareghi
>Assignee: Aegeaner
>Priority: Major
>  Labels: pull-request-available
>
> Kafka has introduced notion of Header for messages in version 0.11.0.0  
> https://issues.apache.org/jira/browse/KAFKA-4208.
> But flink-connector-kafka-0.11_2.11 which supports kafka 0.11.0.0 ignores 
> headers when consuming kafka messages. 
> It would be useful in some scenarios, such as distributed log tracing, to 
> support message headers to FlinkKafkaConsumer011 and FlinkKafkaProducer011. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8354) Flink Kafka connector ignores Kafka message headers

2018-08-26 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-8354?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-8354:
--
Labels: pull-request-available  (was: )

> Flink Kafka connector ignores Kafka message  headers 
> -
>
> Key: FLINK-8354
> URL: https://issues.apache.org/jira/browse/FLINK-8354
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
> Environment: Kafka 0.11.0.0
> Flink 1.4.0
> flink-connector-kafka-0.11_2.11 
>Reporter: Mohammad Abareghi
>Assignee: Aegeaner
>Priority: Major
>  Labels: pull-request-available
>
> Kafka has introduced notion of Header for messages in version 0.11.0.0  
> https://issues.apache.org/jira/browse/KAFKA-4208.
> But flink-connector-kafka-0.11_2.11 which supports kafka 0.11.0.0 ignores 
> headers when consuming kafka messages. 
> It would be useful in some scenarios, such as distributed log tracing, to 
> support message headers to FlinkKafkaConsumer011 and FlinkKafkaProducer011. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] alexeyt820 opened a new pull request #6615: [FLINK-8354] [flink-connectors] Add ability to access and provider Kafka headers

2018-08-26 Thread GitBox
alexeyt820 opened a new pull request #6615: [FLINK-8354] [flink-connectors] Add 
ability to access and provider Kafka headers
URL: https://github.com/apache/flink/pull/6615
 
 
   
   ## What is the purpose of the change
   This changes allows to access Kafka headers in KeyedDeserializationSchema, 
and to provide Kafka headers via KeyedSerializationSchema.headers method
   
   ## Brief change log
   - Add default _headers_ method to KeyedSerializationSchema interface
   - Add default deserialize method with _headers_ argument to 
KeyedDeserializationSchema
   - Add Kafka011Fetcher to get headers from Kafka ConsumerRecord and provide 
them as parameter KeyedDeserializationSchema
   - Modify FlinkKafkaProducer011 to request _headers_ from 
KeyedSerializationSchema and add them to ProducerRecord.
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
 - *Added integration test *Kafka011ITCase.testHeaders* to verify that we 
can produce headers and consume them
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (yes)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes)
 - If yes, how is the feature documented? (JavaDocs)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9172) Support external catalog factory that comes default with SQL-Client

2018-08-26 Thread Rong Rong (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9172?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16592927#comment-16592927
 ] 

Rong Rong commented on FLINK-9172:
--

[~eronwright] yes. your are right. This is the additional feature in FLIP-24 
that needs to be implemented. 

> Support external catalog factory that comes default with SQL-Client
> ---
>
> Key: FLINK-9172
> URL: https://issues.apache.org/jira/browse/FLINK-9172
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Rong Rong
>Assignee: Rong Rong
>Priority: Major
>
> It will be great to have SQL-Client to support some external catalogs 
> out-of-the-box for SQL users to configure and utilize easily. I am currently 
> think of having an external catalog factory that spins up both streaming and 
> batch external catalog table sources and sinks. This could greatly unify and 
> provide easy access for SQL users. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8532) RebalancePartitioner should use Random value for its first partition

2018-08-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16592910#comment-16592910
 ] 

ASF GitHub Bot commented on FLINK-8532:
---

StephanEwen commented on issue #6544: [FLINK-8532] [Streaming] modify 
RebalancePartitioner to use a random partition as its first partition
URL: https://github.com/apache/flink/pull/6544#issuecomment-416043267
 
 
   Thanks for taking a deeper look. Unfortunately, divisions (modulo) are even 
more expensive, so would be good to avoid them.
   
   I think the solution can be actually a bit simpler. It would probably be 
sufficient to simply initialize the array to `INT_MAX - 1` replace the 
`this.returnArray[0] = 0;` in the original code with `this.returnArray[0] = 
resetValue()`. Inside the `resetValue()` you can do the initialization.
   
   That way, common cases have no additional check, and the overflow/reset case 
gets one additional branch, which is already a good improvement.
   
   We could possibly do a followup optimization, where outputs that have only 
one channel swap in a special selector that always returns just `0`. The 
one-channel-only case is probably the one that would be affected most by this 
change, because it always overflows each time.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> RebalancePartitioner should use Random value for its first partition
> 
>
> Key: FLINK-8532
> URL: https://issues.apache.org/jira/browse/FLINK-8532
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Yuta Morisawa
>Assignee: Guibo Pan
>Priority: Major
>  Labels: pull-request-available
>
> In some conditions, RebalancePartitioner doesn't balance data correctly 
> because it use the same value for selecting next operators.
> RebalancePartitioner initializes its partition id using the same value in 
> every threads, so it indeed balances data, but at one moment the amount of 
> data in each operator is skew.
> Particularly, when the data rate of  former operators is equal , data skew 
> becomes severe.
>  
>  
> Example:
> Consider a simple operator chain.
> -> map1 -> rebalance -> map2 ->
> Each map operator(map1, map2) contains three subtasks(subtask 1, 2, 3, 4, 5, 
> 6).
> map1          map2
>  st1              st4
>  st2              st5
>  st3              st6
>  
> At the beginning, every subtasks in map1 sends data to st4 in map2 because 
> they use the same initial parition id.
> Next time the map1 receive data st1,2,3 send data to st5 because they 
> increment its partition id when they processed former data.
> In my environment,  it takes twice the time to process data when I use 
> RebalancePartitioner  as long as I use other partitioners(rescale, keyby).
>  
> To solve this problem, in my opinion, RebalancePartitioner should use its own 
> operator id for the initial value.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] StephanEwen commented on issue #6544: [FLINK-8532] [Streaming] modify RebalancePartitioner to use a random partition as its first partition

2018-08-26 Thread GitBox
StephanEwen commented on issue #6544: [FLINK-8532] [Streaming] modify 
RebalancePartitioner to use a random partition as its first partition
URL: https://github.com/apache/flink/pull/6544#issuecomment-416043267
 
 
   Thanks for taking a deeper look. Unfortunately, divisions (modulo) are even 
more expensive, so would be good to avoid them.
   
   I think the solution can be actually a bit simpler. It would probably be 
sufficient to simply initialize the array to `INT_MAX - 1` replace the 
`this.returnArray[0] = 0;` in the original code with `this.returnArray[0] = 
resetValue()`. Inside the `resetValue()` you can do the initialization.
   
   That way, common cases have no additional check, and the overflow/reset case 
gets one additional branch, which is already a good improvement.
   
   We could possibly do a followup optimization, where outputs that have only 
one channel swap in a special selector that always returns just `0`. The 
one-channel-only case is probably the one that would be affected most by this 
change, because it always overflows each time.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-5315) Support distinct aggregations in table api

2018-08-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-5315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16592884#comment-16592884
 ] 

ASF GitHub Bot commented on FLINK-5315:
---

hequn8128 commented on a change in pull request #6521: [FLINK-5315][table] 
Adding support for distinct operation for table API on DataStream
URL: https://github.com/apache/flink/pull/6521#discussion_r212824390
 
 

 ##
 File path: docs/dev/table/tableApi.md
 ##
 @@ -381,6 +381,36 @@ Table result = orders
 {% highlight java %}
 Table orders = tableEnv.scan("Orders");
 Table result = orders.distinct();
+{% endhighlight %}
+Note: For streaming queries the required state to compute 
the query result might grow infinitely depending on the number of distinct 
fields. Please provide a query configuration with valid retention interval to 
prevent excessive state size. See Streaming 
Concepts for details.
+  
+
+
+  
+Distinct Aggregation
+Streaming
 
 Review comment:
   Add batch label? It seems batch already support distinct. BTW, over is not 
yet supported in batch.
   
   Maybe it is better to remove these documents about distinct aggregation? 
Append a distinct column in all Aggregation examples?
   For `GroupBy Aggregation`, change the sql 
   from
   ```
   Table orders = tableEnv.scan("Orders");
   Table result = orders.groupBy("a").select("a, b.sum as d");
   ```
   to
   ```
   Table orders = tableEnv.scan("Orders");
   Table result = orders.groupBy("a").select("a, b.sum as d, b.sum.distinct as 
e");
   ```
   What's more, we probably should add UDAGG document like it in SQL document 
about Aggregations and add distinct column.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support distinct aggregations in table api
> --
>
> Key: FLINK-5315
> URL: https://issues.apache.org/jira/browse/FLINK-5315
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Kurt Young
>Assignee: Rong Rong
>Priority: Major
>  Labels: pull-request-available
>
> Support distinct aggregations in Table API in the following format:
> For Expressions:
> {code:scala}
> 'a.count.distinct // Expressions distinct modifier
> {code}
> For User-defined Function:
> {code:scala}
> singleArgUdaggFunc.distinct('a) // FunctionCall distinct modifier
> multiArgUdaggFunc.distinct('a, 'b) // FunctionCall distinct modifier
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] hequn8128 commented on a change in pull request #6521: [FLINK-5315][table] Adding support for distinct operation for table API on DataStream

2018-08-26 Thread GitBox
hequn8128 commented on a change in pull request #6521: [FLINK-5315][table] 
Adding support for distinct operation for table API on DataStream
URL: https://github.com/apache/flink/pull/6521#discussion_r212824390
 
 

 ##
 File path: docs/dev/table/tableApi.md
 ##
 @@ -381,6 +381,36 @@ Table result = orders
 {% highlight java %}
 Table orders = tableEnv.scan("Orders");
 Table result = orders.distinct();
+{% endhighlight %}
+Note: For streaming queries the required state to compute 
the query result might grow infinitely depending on the number of distinct 
fields. Please provide a query configuration with valid retention interval to 
prevent excessive state size. See Streaming 
Concepts for details.
+  
+
+
+  
+Distinct Aggregation
+Streaming
 
 Review comment:
   Add batch label? It seems batch already support distinct. BTW, over is not 
yet supported in batch.
   
   Maybe it is better to remove these documents about distinct aggregation? 
Append a distinct column in all Aggregation examples?
   For `GroupBy Aggregation`, change the sql 
   from
   ```
   Table orders = tableEnv.scan("Orders");
   Table result = orders.groupBy("a").select("a, b.sum as d");
   ```
   to
   ```
   Table orders = tableEnv.scan("Orders");
   Table result = orders.groupBy("a").select("a, b.sum as d, b.sum.distinct as 
e");
   ```
   What's more, we probably should add UDAGG document like it in SQL document 
about Aggregations and add distinct column.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-10219) remove the IRC mention from the website

2018-08-26 Thread Nicos Maris (JIRA)
Nicos Maris created FLINK-10219:
---

 Summary: remove the IRC mention from the website
 Key: FLINK-10219
 URL: https://issues.apache.org/jira/browse/FLINK-10219
 Project: Flink
  Issue Type: Task
  Components: Project Website
Reporter: Nicos Maris


This is the outcome of the following ticket along with a PR.

https://issues.apache.org/jira/browse/FLINK-10217



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9432) Support extract epoch, decade, millisecond, microsecond

2018-08-26 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9432?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-9432:
--
Labels: pull-request-available  (was: )

> Support extract epoch, decade, millisecond, microsecond
> ---
>
> Key: FLINK-9432
> URL: https://issues.apache.org/jira/browse/FLINK-9432
> Project: Flink
>  Issue Type: New Feature
>Reporter: Sergey Nuyanzin
>Assignee: Sergey Nuyanzin
>Priority: Major
>  Labels: pull-request-available
>
> The task is to separate activity from depending on 
> https://issues.apache.org/jira/browse/CALCITE-2303 from all others that could 
> be done without upgrade avatica/calcite in  
> https://issues.apache.org/jira/browse/FLINK-8518
> Now the implementations of next functions are blocked
> {code:sql}
> extract(decade from ...)
> extract(epoch from ...)
> extract(millisecond from ...)
> extract(microsecond from ...)
> extract(isodow from ...)
> extract(isoyear from ...)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9432) Support extract epoch, decade, millisecond, microsecond

2018-08-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9432?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16592864#comment-16592864
 ] 

ASF GitHub Bot commented on FLINK-9432:
---

snuyanzin opened a new pull request #6614: [FLINK-9432] Support extract 
timeunit: DECADE, EPOCH, ISODOW, ISOYEAR…
URL: https://github.com/apache/flink/pull/6614
 
 
   ## What is the purpose of the change
   The PR provides implementation of `extract` function for timeunits: 
`decade`, `epoch`, `isodow`, `isoyear`, 'millisecond`,`microsecond`
   
   ## Brief change log
   
   - support timeunits `extract`
   - documentation updates (mention all possible timeunits for `extract`)
   - tests
   
   ## Verifying this change
 - *Added tests validates the result of `extract` call*
 - *Added tests validates that `extract` of `isodow`, `isodoy`, `epoch`, 
`decade` for time is incorrect*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes)
 - If yes, how is the feature documented? (docs)
   functions.md updated
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support extract epoch, decade, millisecond, microsecond
> ---
>
> Key: FLINK-9432
> URL: https://issues.apache.org/jira/browse/FLINK-9432
> Project: Flink
>  Issue Type: New Feature
>Reporter: Sergey Nuyanzin
>Assignee: Sergey Nuyanzin
>Priority: Major
>  Labels: pull-request-available
>
> The task is to separate activity from depending on 
> https://issues.apache.org/jira/browse/CALCITE-2303 from all others that could 
> be done without upgrade avatica/calcite in  
> https://issues.apache.org/jira/browse/FLINK-8518
> Now the implementations of next functions are blocked
> {code:sql}
> extract(decade from ...)
> extract(epoch from ...)
> extract(millisecond from ...)
> extract(microsecond from ...)
> extract(isodow from ...)
> extract(isoyear from ...)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] snuyanzin opened a new pull request #6614: [FLINK-9432] Support extract timeunit: DECADE, EPOCH, ISODOW, ISOYEAR…

2018-08-26 Thread GitBox
snuyanzin opened a new pull request #6614: [FLINK-9432] Support extract 
timeunit: DECADE, EPOCH, ISODOW, ISOYEAR…
URL: https://github.com/apache/flink/pull/6614
 
 
   ## What is the purpose of the change
   The PR provides implementation of `extract` function for timeunits: 
`decade`, `epoch`, `isodow`, `isoyear`, 'millisecond`,`microsecond`
   
   ## Brief change log
   
   - support timeunits `extract`
   - documentation updates (mention all possible timeunits for `extract`)
   - tests
   
   ## Verifying this change
 - *Added tests validates the result of `extract` call*
 - *Added tests validates that `extract` of `isodow`, `isodoy`, `epoch`, 
`decade` for time is incorrect*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes)
 - If yes, how is the feature documented? (docs)
   functions.md updated
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-10217) use Slack for user support and questions

2018-08-26 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10217?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-10217.

Resolution: Won't Fix

Slack was already discussed recently on the ML and rejected.

> use Slack for user support and questions
> 
>
> Key: FLINK-10217
> URL: https://issues.apache.org/jira/browse/FLINK-10217
> Project: Flink
>  Issue Type: Task
>  Components: Project Website
>Reporter: Nicos Maris
>Priority: Critical
>
> h2. Current status
> For user support and questions, users are instructed to subscribe to 
> u...@flink.apache.org but there are users like me who enjoy using also a chat 
> channel. However, the instructions to do so are not clear and the IRC 
> activity is low and it is definitely [not indicative of the project's 
> activity|https://issues.apache.org/jira/browse/FLINK-3862?focusedCommentId=16152376=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-16152376].
> The [website|https://flink.apache.org/community.html] mentions that "If you 
> want to talk with the Flink committers and users in a chat, there is an [IRC 
> channel|https://flink.apache.org/community.html#irc].;
> h2. Option 1: Use Slack
> An example of an Apache project that using 
> [Slack|https://tedium.co/2017/10/17/irc-vs-slack-chat-history] is: 
> http://mesos.apache.org/community
> I can assist on setting it up if at least one expert joins from the very 
> beginning.
> h2. Option 2: Keep using IRC and document it
> Add the [missing 
> section|https://github.com/apache/flink-web/blob/master/community.md#irc] at 
> the website along with instructions for people who have never used IRC.
> h2. Option 3: Use only the mailing list
> Use only u...@flink.apache.org for user support and questions and do not 
> mention IRC at the website.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10218) Allow write DataSet without path parameter

2018-08-26 Thread Paul Lin (JIRA)
Paul Lin created FLINK-10218:


 Summary: Allow write DataSet without path parameter
 Key: FLINK-10218
 URL: https://issues.apache.org/jira/browse/FLINK-10218
 Project: Flink
  Issue Type: Improvement
  Components: DataSet API
Affects Versions: 1.6.0
Reporter: Paul Lin


Currently, DataSet API has two overloaded write methods for using 
FileOutputFormat as output format, and both require a path parameter, but the 
output path could already be set in the FileOutputFormat object. What's more, 
the subclasses of FileOutputFormat mostly don't have default constructors and 
required a path parameter too, so users have to set output path twice in the 
code, like:
{code:java}
  String output = "hdfs:///tmp/";
  dataset.write(new TextOutputFormat<>(new Path(output)), output);
{code}
So I propose to add another write method that requires no path parameter. May 
someone assign this issue to me?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9311) PubSub connector

2018-08-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16592832#comment-16592832
 ] 

ASF GitHub Bot commented on FLINK-9311:
---

Xeli commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added 
PubSub source connector with support for checkpointing (ATLEAST_ONCE)
URL: https://github.com/apache/flink/pull/6594#discussion_r212798502
 
 

 ##
 File path: 
flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/Bound.java
 ##
 @@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pubsub;
+
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.Timer;
+import java.util.TimerTask;
+
+class Bound implements Serializable {
+   private static final Logger LOG = LoggerFactory.getLogger(Bound.class);
+
+   private final Bound.Mode mode;
+   private final long maxMessagedReceived;
+   private final long maxTimeBetweenMessages;
+
+   private SourceFunction sourceFunction;
+   private transient Timer timer;
+   private long messagesReceived;
+   private long lastReceivedMessage;
+   private boolean cancelled = false;
+
+   private Bound(Bound.Mode mode, long maxMessagedReceived, long 
maxTimeBetweenMessages) {
+   this.mode = mode;
+   this.maxMessagedReceived = maxMessagedReceived;
+   this.maxTimeBetweenMessages = maxTimeBetweenMessages;
+   this.messagesReceived = 0L;
+   }
+
+   static  Bound boundByAmountOfMessages(long 
maxMessagedReceived) {
+   return new Bound<>(Mode.COUNTER, maxMessagedReceived, 0L);
+   }
+
+   static  Bound boundByTimeSinceLastMessage(long 
maxTimeBetweenMessages) {
+   return new Bound<>(Mode.TIMER, 0L, maxTimeBetweenMessages);
+   }
+
+   static  Bound 
boundByAmountOfMessagesOrTimeSinceLastMessage(long maxMessagedReceived, long 
maxTimeBetweenMessages) {
+   return new Bound<>(Mode.COUNTER_OR_TIMER, maxMessagedReceived, 
maxTimeBetweenMessages);
+   }
+
+   private TimerTask shutdownPubSubSource() {
+   return new TimerTask() {
+   @Override
+   public void run() {
+   if (maxTimeBetweenMessagesElapsed()) {
+   
cancelPubSubSource("BoundedSourceFunction: Idle timeout --> canceling source");
+   timer.cancel();
+   }
+   }
+   };
+   }
+
+   private synchronized boolean maxTimeBetweenMessagesElapsed() {
+   return System.currentTimeMillis() - lastReceivedMessage > 
maxTimeBetweenMessages;
+   }
+
+   private synchronized void cancelPubSubSource(String logMessage) {
+   if (!cancelled) {
+   cancelled = true;
+   sourceFunction.cancel();
+   LOG.info(logMessage);
+   }
+   }
+
+   void start(SourceFunction sourceFunction) {
+   if (this.sourceFunction != null) {
+   throw new IllegalStateException("start() already 
called");
+   }
+
+   this.sourceFunction = sourceFunction;
+   messagesReceived = 0;
+
+   if (mode == Mode.TIMER || mode == Mode.COUNTER_OR_TIMER) {
+   lastReceivedMessage = System.currentTimeMillis();
+   timer = new Timer();
+   timer.schedule(shutdownPubSubSource(), 0, 100);
+   }
+   }
+
+   synchronized void receivedMessage() {
+   if (sourceFunction == null) {
+   throw new IllegalStateException("start() not called");
+   }
+
+   lastReceivedMessage = System.currentTimeMillis();
+   messagesReceived++;
 
 Review comment:
   What information could 

[GitHub] Xeli commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)

2018-08-26 Thread GitBox
Xeli commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added 
PubSub source connector with support for checkpointing (ATLEAST_ONCE)
URL: https://github.com/apache/flink/pull/6594#discussion_r212798502
 
 

 ##
 File path: 
flink-connectors/flink-connector-pubsub/src/main/java/org/apache/flink/streaming/connectors/pubsub/Bound.java
 ##
 @@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pubsub;
+
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.Timer;
+import java.util.TimerTask;
+
+class Bound implements Serializable {
+   private static final Logger LOG = LoggerFactory.getLogger(Bound.class);
+
+   private final Bound.Mode mode;
+   private final long maxMessagedReceived;
+   private final long maxTimeBetweenMessages;
+
+   private SourceFunction sourceFunction;
+   private transient Timer timer;
+   private long messagesReceived;
+   private long lastReceivedMessage;
+   private boolean cancelled = false;
+
+   private Bound(Bound.Mode mode, long maxMessagedReceived, long 
maxTimeBetweenMessages) {
+   this.mode = mode;
+   this.maxMessagedReceived = maxMessagedReceived;
+   this.maxTimeBetweenMessages = maxTimeBetweenMessages;
+   this.messagesReceived = 0L;
+   }
+
+   static  Bound boundByAmountOfMessages(long 
maxMessagedReceived) {
+   return new Bound<>(Mode.COUNTER, maxMessagedReceived, 0L);
+   }
+
+   static  Bound boundByTimeSinceLastMessage(long 
maxTimeBetweenMessages) {
+   return new Bound<>(Mode.TIMER, 0L, maxTimeBetweenMessages);
+   }
+
+   static  Bound 
boundByAmountOfMessagesOrTimeSinceLastMessage(long maxMessagedReceived, long 
maxTimeBetweenMessages) {
+   return new Bound<>(Mode.COUNTER_OR_TIMER, maxMessagedReceived, 
maxTimeBetweenMessages);
+   }
+
+   private TimerTask shutdownPubSubSource() {
+   return new TimerTask() {
+   @Override
+   public void run() {
+   if (maxTimeBetweenMessagesElapsed()) {
+   
cancelPubSubSource("BoundedSourceFunction: Idle timeout --> canceling source");
+   timer.cancel();
+   }
+   }
+   };
+   }
+
+   private synchronized boolean maxTimeBetweenMessagesElapsed() {
+   return System.currentTimeMillis() - lastReceivedMessage > 
maxTimeBetweenMessages;
+   }
+
+   private synchronized void cancelPubSubSource(String logMessage) {
+   if (!cancelled) {
+   cancelled = true;
+   sourceFunction.cancel();
+   LOG.info(logMessage);
+   }
+   }
+
+   void start(SourceFunction sourceFunction) {
+   if (this.sourceFunction != null) {
+   throw new IllegalStateException("start() already 
called");
+   }
+
+   this.sourceFunction = sourceFunction;
+   messagesReceived = 0;
+
+   if (mode == Mode.TIMER || mode == Mode.COUNTER_OR_TIMER) {
+   lastReceivedMessage = System.currentTimeMillis();
+   timer = new Timer();
+   timer.schedule(shutdownPubSubSource(), 0, 100);
+   }
+   }
+
+   synchronized void receivedMessage() {
+   if (sourceFunction == null) {
+   throw new IllegalStateException("start() not called");
+   }
+
+   lastReceivedMessage = System.currentTimeMillis();
+   messagesReceived++;
 
 Review comment:
   What information could we report as metrics? I was assuming counters for 
messages received by the PubSubSource would be available as Metrics or is this 
not the case?


This is an automated message