[GitHub] flink issue #4560: Flink 7077

2017-08-18 Thread EronWright
Github user EronWright commented on the issue:

https://github.com/apache/flink/pull/4560
  
@tillrohrmann fixed the test issue and other noted issues.  Once #4555 is 
in, this is good to go.

The test issue was a race in RM acquiring leadership vs RPC call.   Added 
some code to `TestLeaderElectionService` to wait for leader ack and updated the 
base RM and Mesos RM to use it. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Comment Edited] (FLINK-7479) Support to retrieve the past event by physical offset

2017-08-18 Thread Dian Fu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16133888#comment-16133888
 ] 

Dian Fu edited comment on FLINK-7479 at 8/19/17 2:27 AM:
-

Hi [~kkl0u], 
Thanks a lot for your reply.
{quote}
Before opening a PR for a feature, we first have to discuss iff we want to 
include that feature in the library. 
If we do not go like this, then the only thing that will happen is that we pile 
up PRs that we do not know how to handle at the end.
{quote}
Agree. I will create the PR until we agree on the feature next time. Thanks for 
remind. :)

{quote}
I have not looked at the code itself, but from the topic, this just seems to 
just be syntactic sugar on top of the existing APIs.
{quote}
I don't think so. The tests in the PR can show that it's not able to achieve 
this functionality on top of the existing APIs.

{quote}
Until users start using the library and ask for features, and given that this 
is a first iteration of the CEP/SQL integration, 
I would suggest to keep it to a minimal and avoid stuff that only contribute 
"facilitation methods" for already existing functionality.
{quote}
Totally agree. I will try to keep the CEP/SQL integration feature a minimal and 
only adding features when required.  For the {{PREV}} clause, I think it should 
be included in the minimal feature as it's one important building block for 
pattern defination.


was (Author: dian.fu):
Hi [~kkl0u], 
{quote}
Before opening a PR for a feature, we first have to discuss iff we want to 
include that feature in the library. 
If we do not go like this, then the only thing that will happen is that we pile 
up PRs that we do not know how to handle at the end.
{quote}
Agree. I will create the PR until we agree on the feature next time. Thanks for 
remind. :)

{quote}
I have not looked at the code itself, but from the topic, this just seems to 
just be syntactic sugar on top of the existing APIs.
{quote}
I don't think so. The tests in the PR can show that it's not able to achieve 
this functionality on top of the existing APIs.

{quote}
Until users start using the library and ask for features, and given that this 
is a first iteration of the CEP/SQL integration, 
I would suggest to keep it to a minimal and avoid stuff that only contribute 
"facilitation methods" for already existing functionality.
{quote}
Totally agree. I will try to keep the CEP/SQL integration feature a minimal and 
only adding features when required.  For the {{PREV}} clause, I think it should 
be included in the minimal feature as it's one important building block for 
pattern defination.

> Support to retrieve the past event by physical offset 
> --
>
> Key: FLINK-7479
> URL: https://issues.apache.org/jira/browse/FLINK-7479
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API & SQL
>Reporter: Dian Fu
>Assignee: Dian Fu
>
> Currently, it's already able to retrieve events matched to the specifed 
> pattern in {{IterativeCondition.Context}}. While there are also requirements 
> to retrieve events by an physical offset. The retrieved events may not be 
> matched to any pattern.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7479) Support to retrieve the past event by physical offset

2017-08-18 Thread Dian Fu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16133888#comment-16133888
 ] 

Dian Fu commented on FLINK-7479:


Hi [~kkl0u], 
{quote}
Before opening a PR for a feature, we first have to discuss iff we want to 
include that feature in the library. 
If we do not go like this, then the only thing that will happen is that we pile 
up PRs that we do not know how to handle at the end.
{quote}
Agree. I will create the PR until we agree on the feature next time. Thanks for 
remind. :)

{quote}
I have not looked at the code itself, but from the topic, this just seems to 
just be syntactic sugar on top of the existing APIs.
{quote}
I don't think so. The tests in the PR can show that it's not able to achieve 
this functionality on top of the existing APIs.

{quote}
Until users start using the library and ask for features, and given that this 
is a first iteration of the CEP/SQL integration, 
I would suggest to keep it to a minimal and avoid stuff that only contribute 
"facilitation methods" for already existing functionality.
{quote}
Totally agree. I will try to keep the CEP/SQL integration feature a minimal and 
only adding features when required.  For the {{PREV}} clause, I think it should 
be included in the minimal feature as it's one important building block for 
pattern defination.

> Support to retrieve the past event by physical offset 
> --
>
> Key: FLINK-7479
> URL: https://issues.apache.org/jira/browse/FLINK-7479
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API & SQL
>Reporter: Dian Fu
>Assignee: Dian Fu
>
> Currently, it's already able to retrieve events matched to the specifed 
> pattern in {{IterativeCondition.Context}}. While there are also requirements 
> to retrieve events by an physical offset. The retrieved events may not be 
> matched to any pattern.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7479) Support to retrieve the past event by physical offset

2017-08-18 Thread Kostas Kloudas (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16133628#comment-16133628
 ] 

Kostas Kloudas commented on FLINK-7479:
---

Hi [~dian.fu], 

Before opening a PR for a feature, we first have to discuss iff we want to 
include that feature in the library. 
If we do not go like this, then the only thing that will happen is that we pile 
up PRs that we do not know how to handle at the end. 

I have not looked at the code itself, but from the topic, this just seems to 
just be syntactic sugar on top of the existing APIs.
It may be necessary for completeness, as it is in the SQL specification, but a 
specification is just a specification until users start using it
or asking for it. Until users start using the library and ask for features, and 
given that this is a first iteration of the CEP/SQL integration, 
I would suggest to keep it to a minimal and avoid stuff that only contribute 
"facilitation methods" for already existing functionality.


> Support to retrieve the past event by physical offset 
> --
>
> Key: FLINK-7479
> URL: https://issues.apache.org/jira/browse/FLINK-7479
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API & SQL
>Reporter: Dian Fu
>Assignee: Dian Fu
>
> Currently, it's already able to retrieve events matched to the specifed 
> pattern in {{IterativeCondition.Context}}. While there are also requirements 
> to retrieve events by an physical offset. The retrieved events may not be 
> matched to any pattern.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6988) Add Apache Kafka 0.11 connector

2017-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16133557#comment-16133557
 ] 

ASF GitHub Bot commented on FLINK-6988:
---

Github user rangadi commented on a diff in the pull request:

https://github.com/apache/flink/pull/4239#discussion_r134044031
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
 ---
@@ -0,0 +1,1000 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.util.SerializableObject;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import 
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import 
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import 
org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer;
+import 
org.apache.flink.streaming.connectors.kafka.internal.metrics.KafkaMetricMuttableWrapper;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.NetUtils;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.errors.InvalidTxnStateException;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Flink Sink to produce data into a Kafka topic. This producer is 

[GitHub] flink pull request #4239: [FLINK-6988] flink-connector-kafka-0.11 with exact...

2017-08-18 Thread rangadi
Github user rangadi commented on a diff in the pull request:

https://github.com/apache/flink/pull/4239#discussion_r134044031
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
 ---
@@ -0,0 +1,1000 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.util.SerializableObject;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import 
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import 
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import 
org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer;
+import 
org.apache.flink.streaming.connectors.kafka.internal.metrics.KafkaMetricMuttableWrapper;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.NetUtils;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.errors.InvalidTxnStateException;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Flink Sink to produce data into a Kafka topic. This producer is 
compatible with Kafka 0.11.x. By default producer
+ * will use {@link Semantic#EXACTLY_ONCE} semantic.
+ *
+ * Implementation note: This producer is a hybrid between a regular 
regular
+ * {@link 

[GitHub] flink pull request #4560: Flink 7077

2017-08-18 Thread EronWright
Github user EronWright commented on a diff in the pull request:

https://github.com/apache/flink/pull/4560#discussion_r134004745
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerRegistration.java
 ---
@@ -68,7 +68,9 @@ public boolean isIdle() {
}
 
public void markIdle() {
-   idleSince = System.currentTimeMillis();
+   if (!isIdle()) {
+   idleSince = System.currentTimeMillis();
+   }
--- End diff --

The reason that the SM idleness unit tests didn't catch this is, `markIdle` 
is called when a slot report is received, which in practice occurs continuously 
but is tough to simulate.  Just making clear that there are idle tests but I 
didn't see an obvious way to improve them.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4560: Flink 7077

2017-08-18 Thread EronWright
Github user EronWright commented on a diff in the pull request:

https://github.com/apache/flink/pull/4560#discussion_r134003593
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 ---
@@ -960,12 +965,26 @@ public void handleError(final Exception exception) {
 
@Override
public void releaseResource(InstanceID instanceId) {
-   stopWorker(instanceId);
+   runAsync(new Runnable() {
+   @Override
+   public void run() {
+   for (Map.Entry entry : taskExecutors.entrySet()) {
+   if 
(entry.getValue().getInstanceID().equals(instanceId)) {
+   
stopWorker(entry.getKey());
--- End diff --

I was thinking along those same lines - the slot manager deals with 
`InstanceID` mostly, and its log lines are tough to correlate with lower-level 
resource manager information that is `ResourceID` based.   It would be a nice 
improvement to make `InstanceID` a composite key that included `ResourceID`.

Regardless, I think the `stopWorker` method should use `ResourceID`, 
because `InstanceID` isn't a concept exposed to the RM subclasses.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7442) Add option for using a child-first classloader for loading user code

2017-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7442?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16133184#comment-16133184
 ] 

ASF GitHub Bot commented on FLINK-7442:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/4564
  
Thanks, this looks better to me!

I would suggest to not pass the configuration into the library cache 
manager.
I think passing configuration objects into specialized / dedicated 
components is an anti-pattern. It makes testing complicated, signatures 
inexplicit, etc.

For the second task of the classloader, loading resources: Do we have a 
test that validates the resource resolution? I think this passes the tests of 
the previous pull request because it behaves for resources like a *parent-first 
classloader*, which seems inconsistent. Admittedly the existing implementation 
of the child-first classloader was for resources a child-only classloader, 
which was also not correct.


> Add option for using a child-first classloader for loading user code
> 
>
> Key: FLINK-7442
> URL: https://issues.apache.org/jira/browse/FLINK-7442
> Project: Flink
>  Issue Type: Improvement
>  Components: Local Runtime
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4564: [FLINK-7442] Add option for using child-first classloader...

2017-08-18 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/4564
  
Thanks, this looks better to me!

I would suggest to not pass the configuration into the library cache 
manager.
I think passing configuration objects into specialized / dedicated 
components is an anti-pattern. It makes testing complicated, signatures 
inexplicit, etc.

For the second task of the classloader, loading resources: Do we have a 
test that validates the resource resolution? I think this passes the tests of 
the previous pull request because it behaves for resources like a *parent-first 
classloader*, which seems inconsistent. Admittedly the existing implementation 
of the child-first classloader was for resources a child-only classloader, 
which was also not correct.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7477) Use "hadoop classpath" to augment classpath when available

2017-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16133174#comment-16133174
 ] 

ASF GitHub Bot commented on FLINK-7477:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4566
  
@rmetzger I addressed your comments.


> Use "hadoop classpath" to augment classpath when available
> --
>
> Key: FLINK-7477
> URL: https://issues.apache.org/jira/browse/FLINK-7477
> Project: Flink
>  Issue Type: Bug
>  Components: Startup Shell Scripts
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
> Fix For: 1.4.0
>
>
> Currently, some cloud environments don't properly put the Hadoop jars into 
> {{HADOOP_CLASSPATH}} (or don't set {{HADOOP_CLASSPATH}}) at all. We should 
> check in {{config.sh}} if the {{hadoop}} binary is on the path and augment 
> our {{INTERNAL_HADOOP_CLASSPATHS}} with the result of {{hadoop classpath}} in 
> our scripts.
> This will improve the out-of-box experience of users that otherwise have to 
> manually set {{HADOOP_CLASSPATH}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4566: [FLINK-7477] [FLINK-7480] Various improvements to Flink s...

2017-08-18 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4566
  
@rmetzger I addressed your comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7423) Always reuse an instance to get elements from the inputFormat

2017-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16133142#comment-16133142
 ] 

ASF GitHub Bot commented on FLINK-7423:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/4525
  
I think `null` values are not really permitted at the moment, but I think 
the InputFormats do not explicitly forbid them. That's why the logic is not 
"run until returns null", but "run until reachedEnd()".

The change here implements a mixed contract like "run until reachedEnd() or 
returns null", which is more complicated to document, enforce, test, and 
understand for users.



> Always reuse an instance  to get elements from the inputFormat 
> ---
>
> Key: FLINK-7423
> URL: https://issues.apache.org/jira/browse/FLINK-7423
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Reporter: Xu Pingyong
>Assignee: Xu Pingyong
>
> In InputFormatSourceFunction.java:
> {code:java}
> OUT nextElement = serializer.createInstance();
>   while (isRunning) {
>   format.open(splitIterator.next());
>   // for each element we also check if cancel
>   // was called by checking the isRunning flag
>   while (isRunning && !format.reachedEnd()) {
>   nextElement = 
> format.nextRecord(nextElement);
>   if (nextElement != null) {
>   ctx.collect(nextElement);
>   } else {
>   break;
>   }
>   }
>   format.close();
>   completedSplitsCounter.inc();
>   if (isRunning) {
>   isRunning = splitIterator.hasNext();
>   }
>   }
> {code}
> the format may return other element or null when nextRecord, that will may 
> cause exception.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4525: [FLINK-7423] Always reuse an instance to get elements fro...

2017-08-18 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/4525
  
I think `null` values are not really permitted at the moment, but I think 
the InputFormats do not explicitly forbid them. That's why the logic is not 
"run until returns null", but "run until reachedEnd()".

The change here implements a mixed contract like "run until reachedEnd() or 
returns null", which is more complicated to document, enforce, test, and 
understand for users.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7477) Use "hadoop classpath" to augment classpath when available

2017-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16133120#comment-16133120
 ] 

ASF GitHub Bot commented on FLINK-7477:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/4566#discussion_r133985865
  
--- Diff: flink-dist/src/main/flink-bin/bin/config.sh ---
@@ -351,8 +351,20 @@ if [ -z "$HADOOP_CONF_DIR" ]; then
 fi
 fi
 
+# try and set HADOOP_CONF_DIR to some common default if it's not set
+if [ -z "$HADOOP_CONF_DIR" ]; then
+if [ -d "/etc/hadoop/conf" ]; then
+HADOOP_CONF_DIR="/etc/hadoop/conf"
+fi
+fi
+
 
INTERNAL_HADOOP_CLASSPATHS="${HADOOP_CLASSPATH}:${HADOOP_CONF_DIR}:${YARN_CONF_DIR}"
 
+# check if the "hadoop" binary is available, if yes, use that to augment 
the CLASSPATH
+if command -v hadoop >/dev/null 2>&1; then
+
INTERNAL_HADOOP_CLASSPATHS="${HADOOP_CLASSPATH}:${HADOOP_CONF_DIR}:${YARN_CONF_DIR}:`hadoop
 classpath`"
--- End diff --

I would actually append `INTERNAL_HADOOP_CLASSPATHS` instead of overwriting 
it. 


> Use "hadoop classpath" to augment classpath when available
> --
>
> Key: FLINK-7477
> URL: https://issues.apache.org/jira/browse/FLINK-7477
> Project: Flink
>  Issue Type: Bug
>  Components: Startup Shell Scripts
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
> Fix For: 1.4.0
>
>
> Currently, some cloud environments don't properly put the Hadoop jars into 
> {{HADOOP_CLASSPATH}} (or don't set {{HADOOP_CLASSPATH}}) at all. We should 
> check in {{config.sh}} if the {{hadoop}} binary is on the path and augment 
> our {{INTERNAL_HADOOP_CLASSPATHS}} with the result of {{hadoop classpath}} in 
> our scripts.
> This will improve the out-of-box experience of users that otherwise have to 
> manually set {{HADOOP_CLASSPATH}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4566: [FLINK-7477] [FLINK-7480] Various improvements to ...

2017-08-18 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/4566#discussion_r133985865
  
--- Diff: flink-dist/src/main/flink-bin/bin/config.sh ---
@@ -351,8 +351,20 @@ if [ -z "$HADOOP_CONF_DIR" ]; then
 fi
 fi
 
+# try and set HADOOP_CONF_DIR to some common default if it's not set
+if [ -z "$HADOOP_CONF_DIR" ]; then
+if [ -d "/etc/hadoop/conf" ]; then
+HADOOP_CONF_DIR="/etc/hadoop/conf"
+fi
+fi
+
 
INTERNAL_HADOOP_CLASSPATHS="${HADOOP_CLASSPATH}:${HADOOP_CONF_DIR}:${YARN_CONF_DIR}"
 
+# check if the "hadoop" binary is available, if yes, use that to augment 
the CLASSPATH
+if command -v hadoop >/dev/null 2>&1; then
+
INTERNAL_HADOOP_CLASSPATHS="${HADOOP_CLASSPATH}:${HADOOP_CONF_DIR}:${YARN_CONF_DIR}:`hadoop
 classpath`"
--- End diff --

I would actually append `INTERNAL_HADOOP_CLASSPATHS` instead of overwriting 
it. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4566: [FLINK-7477] [FLINK-7480] Various improvements to ...

2017-08-18 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/4566#discussion_r133985612
  
--- Diff: flink-dist/src/main/flink-bin/bin/config.sh ---
@@ -351,8 +351,20 @@ if [ -z "$HADOOP_CONF_DIR" ]; then
 fi
 fi
 
+# try and set HADOOP_CONF_DIR to some common default if it's not set
+if [ -z "$HADOOP_CONF_DIR" ]; then
+if [ -d "/etc/hadoop/conf" ]; then
+HADOOP_CONF_DIR="/etc/hadoop/conf"
--- End diff --

I would suggest to print a message to the user that we are using this 
HADOOP_CONF_DIR


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7477) Use "hadoop classpath" to augment classpath when available

2017-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16133119#comment-16133119
 ] 

ASF GitHub Bot commented on FLINK-7477:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/4566#discussion_r133985612
  
--- Diff: flink-dist/src/main/flink-bin/bin/config.sh ---
@@ -351,8 +351,20 @@ if [ -z "$HADOOP_CONF_DIR" ]; then
 fi
 fi
 
+# try and set HADOOP_CONF_DIR to some common default if it's not set
+if [ -z "$HADOOP_CONF_DIR" ]; then
+if [ -d "/etc/hadoop/conf" ]; then
+HADOOP_CONF_DIR="/etc/hadoop/conf"
--- End diff --

I would suggest to print a message to the user that we are using this 
HADOOP_CONF_DIR


> Use "hadoop classpath" to augment classpath when available
> --
>
> Key: FLINK-7477
> URL: https://issues.apache.org/jira/browse/FLINK-7477
> Project: Flink
>  Issue Type: Bug
>  Components: Startup Shell Scripts
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
> Fix For: 1.4.0
>
>
> Currently, some cloud environments don't properly put the Hadoop jars into 
> {{HADOOP_CLASSPATH}} (or don't set {{HADOOP_CLASSPATH}}) at all. We should 
> check in {{config.sh}} if the {{hadoop}} binary is on the path and augment 
> our {{INTERNAL_HADOOP_CLASSPATHS}} with the result of {{hadoop classpath}} in 
> our scripts.
> This will improve the out-of-box experience of users that otherwise have to 
> manually set {{HADOOP_CLASSPATH}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7480) Set HADOOP_CONF_DIR to sane default if not set

2017-08-18 Thread Robert Metzger (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16133118#comment-16133118
 ] 

Robert Metzger commented on FLINK-7480:
---

+1

> Set HADOOP_CONF_DIR to sane default if not set
> --
>
> Key: FLINK-7480
> URL: https://issues.apache.org/jira/browse/FLINK-7480
> Project: Flink
>  Issue Type: Improvement
>  Components: Startup Shell Scripts
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
> Fix For: 1.4.0
>
>
> Currently, both AWS and GCE don't have a {{HADOOP_CONF_DIR}} set by default. 
> This makes the out-of-box experience on these cloud environments bad because 
> not setting it results in errors that are not obviously clear.
> In case {{HADOOP_CONF_DIR}} is not set we should check if 
> {{/etc/hadoop/conf}} exits and set {{HADOOP_CONF_DIR}} to that.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4560: Flink 7077

2017-08-18 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4560#discussion_r133980216
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 ---
@@ -960,12 +965,26 @@ public void handleError(final Exception exception) {
 
@Override
public void releaseResource(InstanceID instanceId) {
-   stopWorker(instanceId);
+   runAsync(new Runnable() {
+   @Override
+   public void run() {
+   for (Map.Entry entry : taskExecutors.entrySet()) {
+   if 
(entry.getValue().getInstanceID().equals(instanceId)) {
+   
stopWorker(entry.getKey());
--- End diff --

In the future we should make these ids being composed of each other. Then 
we should easily obtain the `ResourceID` from the `InstanceID`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4560: Flink 7077

2017-08-18 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4560#discussion_r133980386
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerRegistration.java
 ---
@@ -68,7 +68,9 @@ public boolean isIdle() {
}
 
public void markIdle() {
-   idleSince = System.currentTimeMillis();
+   if (!isIdle()) {
+   idleSince = System.currentTimeMillis();
+   }
--- End diff --

good catch!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7402) Ineffective null check in NettyMessage#write()

2017-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7402?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16133081#comment-16133081
 ] 

ASF GitHub Bot commented on FLINK-7402:
---

Github user tedyu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4562#discussion_r133980441
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
 ---
@@ -220,6 +220,10 @@ void releaseBuffer() {
 
@Override
ByteBuf write(ByteBufAllocator allocator) throws IOException {
+   if (null == buffer) {
+   throw new NullPointerException();
--- End diff --

Add exception message to provide more information.


> Ineffective null check in NettyMessage#write()
> --
>
> Key: FLINK-7402
> URL: https://issues.apache.org/jira/browse/FLINK-7402
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Priority: Minor
>
> Here is the null check in finally block:
> {code}
>   finally {
> if (buffer != null) {
>   buffer.recycle();
> }
> {code}
> But buffer has been dereferenced in the try block without guard.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4562: [FLINK-7402] Fix ineffective null check in NettyMe...

2017-08-18 Thread tedyu
Github user tedyu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4562#discussion_r133980441
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
 ---
@@ -220,6 +220,10 @@ void releaseBuffer() {
 
@Override
ByteBuf write(ByteBufAllocator allocator) throws IOException {
+   if (null == buffer) {
+   throw new NullPointerException();
--- End diff --

Add exception message to provide more information.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6630) Implement FLIP-6 MesosAppMasterRunner

2017-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16133058#comment-16133058
 ] 

ASF GitHub Bot commented on FLINK-6630:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4555#discussion_r133975100
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java
 ---
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.entrypoint;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.JobManagerOptions;
+import 
org.apache.flink.mesos.runtime.clusterframework.MesosResourceManager;
+import 
org.apache.flink.mesos.runtime.clusterframework.MesosTaskManagerParameters;
+import 
org.apache.flink.mesos.runtime.clusterframework.services.MesosServices;
+import 
org.apache.flink.mesos.runtime.clusterframework.services.MesosServicesUtils;
+import org.apache.flink.mesos.util.MesosConfiguration;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import org.apache.flink.runtime.clusterframework.ContainerSpecification;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.entrypoint.JobClusterEntrypoint;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import 
org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import 
org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
+import 
org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.util.FlinkException;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.util.Map;
+
+/**
+ * Entry point for Mesos per-job clusters.
+ */
+public class MesosJobClusterEntrypoint extends JobClusterEntrypoint {
+
+   public static final String JOB_GRAPH_FILE_PATH = "flink.jobgraph.path";
+
+   // 

+   //  Command-line options
+   // 

+
+   private static final Options ALL_OPTIONS;
+
+   static {
+   ALL_OPTIONS =
+   new Options()
+   
.addOption(BootstrapTools.newDynamicPropertiesOption());
+   }
+
+   private MesosConfiguration schedulerConfiguration;
+
+   private MesosServices mesosServices;
+
+   private MesosTaskManagerParameters taskManagerParameters;
+
+   private ContainerSpecification taskManagerContainerSpec;
+
+   public MesosJobClusterEntrypoint(Configuration config) {
+   super(config);
+   }
+
+   @Override
+   protected void initializeServices(Configuration config) throws 
Exception {
+   super.initializeServices(config);
+
+   final String hostname = 
config.getString(JobManagerOptions.ADDRESS);
+
+   // Mesos configuration
+   schedulerConfiguration = 

[jira] [Commented] (FLINK-6630) Implement FLIP-6 MesosAppMasterRunner

2017-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16133057#comment-16133057
 ] 

ASF GitHub Bot commented on FLINK-6630:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4555#discussion_r133975661
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosTaskExecutorRunner.java
 ---
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.entrypoint;
+
+import org.apache.flink.configuration.AkkaOptions;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.mesos.runtime.clusterframework.MesosConfigKeys;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.security.SecurityUtils;
+import org.apache.flink.runtime.taskexecutor.TaskManagerRunner;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.JvmShutdownSafeguard;
+import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.Callable;
+
+/**
+ * The entry point for running a TaskManager in a Mesos container.
+ */
+public class MesosTaskExecutorRunner {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(MesosTaskExecutorRunner.class);
+
+   private static final int INIT_ERROR_EXIT_CODE = 31;
+
+   private static final Options ALL_OPTIONS;
+
+   static {
+   ALL_OPTIONS =
+   new Options()
+   
.addOption(BootstrapTools.newDynamicPropertiesOption());
+   }
+
+   /** The process environment variables. */
+   private static final Map ENV = System.getenv();
+
+   public static void main(String[] args) throws Exception {
+   EnvironmentInformation.logEnvironmentInfo(LOG, 
MesosTaskExecutorRunner.class.getSimpleName(), args);
+   SignalHandler.register(LOG);
+   JvmShutdownSafeguard.installAsShutdownHook(LOG);
+
+   // try to parse the command line arguments
+   CommandLineParser parser = new PosixParser();
+   CommandLine cmd = parser.parse(ALL_OPTIONS, args);
+
+   final Configuration configuration;
+   try {
+   final Configuration dynamicProperties = 
BootstrapTools.parseDynamicProperties(cmd);
+   
GlobalConfiguration.setDynamicProperties(dynamicProperties);
+   LOG.debug("Mesos dynamic properties: {}", 
dynamicProperties);
+
+   configuration = GlobalConfiguration.loadConfiguration();
--- End diff --

Here I think we could use `MesosEntrypointUtils#loadConfiguration(cmd)`.


> Implement FLIP-6 MesosAppMasterRunner
> -
>
> Key: FLINK-6630
> URL: https://issues.apache.org/jira/browse/FLINK-6630
> Project: Flink
>  Issue Type: Sub-task
>  Components: Mesos
>Reporter: Eron Wright 
>Assignee: Eron Wright 
>
> A new runner must be developed for the FLIP-6 RM.  Target the "single job" 
> scenario.
> Take some time to consider a general solution or a base implementation that 
> is shared with the old 

[GitHub] flink pull request #4555: [FLINK-6630] [Mesos] Implement FLIP-6 MesosAppMast...

2017-08-18 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4555#discussion_r133974899
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java
 ---
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.entrypoint;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.JobManagerOptions;
+import 
org.apache.flink.mesos.runtime.clusterframework.MesosResourceManager;
+import 
org.apache.flink.mesos.runtime.clusterframework.MesosTaskManagerParameters;
+import 
org.apache.flink.mesos.runtime.clusterframework.services.MesosServices;
+import 
org.apache.flink.mesos.runtime.clusterframework.services.MesosServicesUtils;
+import org.apache.flink.mesos.util.MesosConfiguration;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import org.apache.flink.runtime.clusterframework.ContainerSpecification;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.entrypoint.JobClusterEntrypoint;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import 
org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import 
org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
+import 
org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.util.FlinkException;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.util.Map;
+
+/**
+ * Entry point for Mesos per-job clusters.
+ */
+public class MesosJobClusterEntrypoint extends JobClusterEntrypoint {
+
+   public static final String JOB_GRAPH_FILE_PATH = "flink.jobgraph.path";
+
+   // 

+   //  Command-line options
+   // 

+
+   private static final Options ALL_OPTIONS;
+
+   static {
+   ALL_OPTIONS =
+   new Options()
+   
.addOption(BootstrapTools.newDynamicPropertiesOption());
+   }
+
+   private MesosConfiguration schedulerConfiguration;
+
+   private MesosServices mesosServices;
+
+   private MesosTaskManagerParameters taskManagerParameters;
+
+   private ContainerSpecification taskManagerContainerSpec;
+
+   public MesosJobClusterEntrypoint(Configuration config) {
+   super(config);
+   }
+
+   @Override
+   protected void initializeServices(Configuration config) throws 
Exception {
+   super.initializeServices(config);
+
+   final String hostname = 
config.getString(JobManagerOptions.ADDRESS);
+
+   // Mesos configuration
+   schedulerConfiguration = 
MesosEntrypointUtils.createMesosSchedulerConfiguration(config, hostname);
+
+   // services
+   mesosServices = MesosServicesUtils.createMesosServices(config, 
hostname);
+
+   // TM configuration
+   

[jira] [Commented] (FLINK-6630) Implement FLIP-6 MesosAppMasterRunner

2017-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16133056#comment-16133056
 ] 

ASF GitHub Bot commented on FLINK-6630:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4555#discussion_r133974899
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java
 ---
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.entrypoint;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.JobManagerOptions;
+import 
org.apache.flink.mesos.runtime.clusterframework.MesosResourceManager;
+import 
org.apache.flink.mesos.runtime.clusterframework.MesosTaskManagerParameters;
+import 
org.apache.flink.mesos.runtime.clusterframework.services.MesosServices;
+import 
org.apache.flink.mesos.runtime.clusterframework.services.MesosServicesUtils;
+import org.apache.flink.mesos.util.MesosConfiguration;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import org.apache.flink.runtime.clusterframework.ContainerSpecification;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.entrypoint.JobClusterEntrypoint;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import 
org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import 
org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
+import 
org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.util.FlinkException;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.util.Map;
+
+/**
+ * Entry point for Mesos per-job clusters.
+ */
+public class MesosJobClusterEntrypoint extends JobClusterEntrypoint {
+
+   public static final String JOB_GRAPH_FILE_PATH = "flink.jobgraph.path";
+
+   // 

+   //  Command-line options
+   // 

+
+   private static final Options ALL_OPTIONS;
+
+   static {
+   ALL_OPTIONS =
+   new Options()
+   
.addOption(BootstrapTools.newDynamicPropertiesOption());
+   }
+
+   private MesosConfiguration schedulerConfiguration;
+
+   private MesosServices mesosServices;
+
+   private MesosTaskManagerParameters taskManagerParameters;
+
+   private ContainerSpecification taskManagerContainerSpec;
+
+   public MesosJobClusterEntrypoint(Configuration config) {
+   super(config);
+   }
+
+   @Override
+   protected void initializeServices(Configuration config) throws 
Exception {
+   super.initializeServices(config);
+
+   final String hostname = 
config.getString(JobManagerOptions.ADDRESS);
+
+   // Mesos configuration
+   schedulerConfiguration = 

[GitHub] flink pull request #4555: [FLINK-6630] [Mesos] Implement FLIP-6 MesosAppMast...

2017-08-18 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4555#discussion_r133974223
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosEntrypointUtils.java
 ---
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.entrypoint;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.mesos.configuration.MesosOptions;
+import 
org.apache.flink.mesos.runtime.clusterframework.MesosTaskManagerParameters;
+import org.apache.flink.mesos.util.MesosConfiguration;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import org.apache.flink.runtime.clusterframework.ContainerSpecification;
+import 
org.apache.flink.runtime.clusterframework.overlays.CompositeContainerOverlay;
+import 
org.apache.flink.runtime.clusterframework.overlays.FlinkDistributionOverlay;
+import 
org.apache.flink.runtime.clusterframework.overlays.HadoopConfOverlay;
+import 
org.apache.flink.runtime.clusterframework.overlays.HadoopUserOverlay;
+import org.apache.flink.runtime.clusterframework.overlays.KeytabOverlay;
+import org.apache.flink.runtime.clusterframework.overlays.Krb5ConfOverlay;
+import org.apache.flink.runtime.clusterframework.overlays.SSLStoreOverlay;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.mesos.Protos;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
+
+/**
+ * Utils for Mesos entrpoints.
+ */
+public class MesosEntrypointUtils {
+
+   @Deprecated
+   public static Configuration loadConfiguration(CommandLine cmd) {
+
+   // merge the dynamic properties from the command-line
+   Configuration dynamicProperties = 
BootstrapTools.parseDynamicProperties(cmd);
+   GlobalConfiguration.setDynamicProperties(dynamicProperties);
+   Configuration config = GlobalConfiguration.loadConfiguration();
+
+   return config;
+   }
+
+   /**
+* Loads and validates the Mesos scheduler configuration.
+* @param flinkConfig the global configuration.
+* @param hostname the hostname to advertise to the Mesos master.
+*/
+   public static MesosConfiguration 
createMesosSchedulerConfiguration(Configuration flinkConfig, String hostname) {
+
+   Protos.FrameworkInfo.Builder frameworkInfo = 
Protos.FrameworkInfo.newBuilder()
+   .setHostname(hostname);
+   Protos.Credential.Builder credential = null;
+
+   if (!flinkConfig.contains(MesosOptions.MASTER_URL)) {
+   throw new 
IllegalConfigurationException(MesosOptions.MASTER_URL.key() + " must be 
configured.");
+   }
+   String masterUrl = 
flinkConfig.getString(MesosOptions.MASTER_URL);
+
+   Duration failoverTimeout = FiniteDuration.apply(
+   flinkConfig.getInteger(
+   MesosOptions.FAILOVER_TIMEOUT_SECONDS),
+   TimeUnit.SECONDS);
+   frameworkInfo.setFailoverTimeout(failoverTimeout.toSeconds());
+
+   frameworkInfo.setName(flinkConfig.getString(
+   MesosOptions.RESOURCEMANAGER_FRAMEWORK_NAME));
+
+   frameworkInfo.setRole(flinkConfig.getString(
+   MesosOptions.RESOURCEMANAGER_FRAMEWORK_ROLE));
+
+   frameworkInfo.setUser(flinkConfig.getString(
+   MesosOptions.RESOURCEMANAGER_FRAMEWORK_USER));
+
+   if 
(flinkConfig.contains(MesosOptions.RESOURCEMANAGER_FRAMEWORK_PRINCIPAL)) {
+   

[GitHub] flink pull request #4555: [FLINK-6630] [Mesos] Implement FLIP-6 MesosAppMast...

2017-08-18 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4555#discussion_r133975661
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosTaskExecutorRunner.java
 ---
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.entrypoint;
+
+import org.apache.flink.configuration.AkkaOptions;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.mesos.runtime.clusterframework.MesosConfigKeys;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.security.SecurityUtils;
+import org.apache.flink.runtime.taskexecutor.TaskManagerRunner;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.JvmShutdownSafeguard;
+import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.Callable;
+
+/**
+ * The entry point for running a TaskManager in a Mesos container.
+ */
+public class MesosTaskExecutorRunner {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(MesosTaskExecutorRunner.class);
+
+   private static final int INIT_ERROR_EXIT_CODE = 31;
+
+   private static final Options ALL_OPTIONS;
+
+   static {
+   ALL_OPTIONS =
+   new Options()
+   
.addOption(BootstrapTools.newDynamicPropertiesOption());
+   }
+
+   /** The process environment variables. */
+   private static final Map ENV = System.getenv();
+
+   public static void main(String[] args) throws Exception {
+   EnvironmentInformation.logEnvironmentInfo(LOG, 
MesosTaskExecutorRunner.class.getSimpleName(), args);
+   SignalHandler.register(LOG);
+   JvmShutdownSafeguard.installAsShutdownHook(LOG);
+
+   // try to parse the command line arguments
+   CommandLineParser parser = new PosixParser();
+   CommandLine cmd = parser.parse(ALL_OPTIONS, args);
+
+   final Configuration configuration;
+   try {
+   final Configuration dynamicProperties = 
BootstrapTools.parseDynamicProperties(cmd);
+   
GlobalConfiguration.setDynamicProperties(dynamicProperties);
+   LOG.debug("Mesos dynamic properties: {}", 
dynamicProperties);
+
+   configuration = GlobalConfiguration.loadConfiguration();
--- End diff --

Here I think we could use `MesosEntrypointUtils#loadConfiguration(cmd)`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6630) Implement FLIP-6 MesosAppMasterRunner

2017-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16133055#comment-16133055
 ] 

ASF GitHub Bot commented on FLINK-6630:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4555#discussion_r133974223
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosEntrypointUtils.java
 ---
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.entrypoint;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.mesos.configuration.MesosOptions;
+import 
org.apache.flink.mesos.runtime.clusterframework.MesosTaskManagerParameters;
+import org.apache.flink.mesos.util.MesosConfiguration;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import org.apache.flink.runtime.clusterframework.ContainerSpecification;
+import 
org.apache.flink.runtime.clusterframework.overlays.CompositeContainerOverlay;
+import 
org.apache.flink.runtime.clusterframework.overlays.FlinkDistributionOverlay;
+import 
org.apache.flink.runtime.clusterframework.overlays.HadoopConfOverlay;
+import 
org.apache.flink.runtime.clusterframework.overlays.HadoopUserOverlay;
+import org.apache.flink.runtime.clusterframework.overlays.KeytabOverlay;
+import org.apache.flink.runtime.clusterframework.overlays.Krb5ConfOverlay;
+import org.apache.flink.runtime.clusterframework.overlays.SSLStoreOverlay;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.mesos.Protos;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
+
+/**
+ * Utils for Mesos entrpoints.
+ */
+public class MesosEntrypointUtils {
+
+   @Deprecated
+   public static Configuration loadConfiguration(CommandLine cmd) {
+
+   // merge the dynamic properties from the command-line
+   Configuration dynamicProperties = 
BootstrapTools.parseDynamicProperties(cmd);
+   GlobalConfiguration.setDynamicProperties(dynamicProperties);
+   Configuration config = GlobalConfiguration.loadConfiguration();
+
+   return config;
+   }
+
+   /**
+* Loads and validates the Mesos scheduler configuration.
+* @param flinkConfig the global configuration.
+* @param hostname the hostname to advertise to the Mesos master.
+*/
+   public static MesosConfiguration 
createMesosSchedulerConfiguration(Configuration flinkConfig, String hostname) {
+
+   Protos.FrameworkInfo.Builder frameworkInfo = 
Protos.FrameworkInfo.newBuilder()
+   .setHostname(hostname);
+   Protos.Credential.Builder credential = null;
+
+   if (!flinkConfig.contains(MesosOptions.MASTER_URL)) {
+   throw new 
IllegalConfigurationException(MesosOptions.MASTER_URL.key() + " must be 
configured.");
+   }
+   String masterUrl = 
flinkConfig.getString(MesosOptions.MASTER_URL);
+
+   Duration failoverTimeout = FiniteDuration.apply(
+   flinkConfig.getInteger(
+   MesosOptions.FAILOVER_TIMEOUT_SECONDS),
+   TimeUnit.SECONDS);
+   frameworkInfo.setFailoverTimeout(failoverTimeout.toSeconds());
+
+   frameworkInfo.setName(flinkConfig.getString(
+   MesosOptions.RESOURCEMANAGER_FRAMEWORK_NAME));
+
+   frameworkInfo.setRole(flinkConfig.getString(
+   MesosOptions.RESOURCEMANAGER_FRAMEWORK_ROLE));
+
+   frameworkInfo.setUser(flinkConfig.getString(

[GitHub] flink pull request #4555: [FLINK-6630] [Mesos] Implement FLIP-6 MesosAppMast...

2017-08-18 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4555#discussion_r133975100
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java
 ---
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.entrypoint;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.JobManagerOptions;
+import 
org.apache.flink.mesos.runtime.clusterframework.MesosResourceManager;
+import 
org.apache.flink.mesos.runtime.clusterframework.MesosTaskManagerParameters;
+import 
org.apache.flink.mesos.runtime.clusterframework.services.MesosServices;
+import 
org.apache.flink.mesos.runtime.clusterframework.services.MesosServicesUtils;
+import org.apache.flink.mesos.util.MesosConfiguration;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import org.apache.flink.runtime.clusterframework.ContainerSpecification;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.entrypoint.JobClusterEntrypoint;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import 
org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import 
org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
+import 
org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.util.FlinkException;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.util.Map;
+
+/**
+ * Entry point for Mesos per-job clusters.
+ */
+public class MesosJobClusterEntrypoint extends JobClusterEntrypoint {
+
+   public static final String JOB_GRAPH_FILE_PATH = "flink.jobgraph.path";
+
+   // 

+   //  Command-line options
+   // 

+
+   private static final Options ALL_OPTIONS;
+
+   static {
+   ALL_OPTIONS =
+   new Options()
+   
.addOption(BootstrapTools.newDynamicPropertiesOption());
+   }
+
+   private MesosConfiguration schedulerConfiguration;
+
+   private MesosServices mesosServices;
+
+   private MesosTaskManagerParameters taskManagerParameters;
+
+   private ContainerSpecification taskManagerContainerSpec;
+
+   public MesosJobClusterEntrypoint(Configuration config) {
+   super(config);
+   }
+
+   @Override
+   protected void initializeServices(Configuration config) throws 
Exception {
+   super.initializeServices(config);
+
+   final String hostname = 
config.getString(JobManagerOptions.ADDRESS);
+
+   // Mesos configuration
+   schedulerConfiguration = 
MesosEntrypointUtils.createMesosSchedulerConfiguration(config, hostname);
+
+   // services
+   mesosServices = MesosServicesUtils.createMesosServices(config, 
hostname);
+
+   // TM configuration
+   

[jira] [Commented] (FLINK-7300) End-to-end tests are instable on Travis

2017-08-18 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7300?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16133053#comment-16133053
 ] 

Aljoscha Krettek commented on FLINK-7300:
-

The reason this time is an {{AskTimeoutException}}. The problem is that even a 
normal run of Flink can have exceptions and errors in the log. In our release 
testing we have this section about "running a cluster and verifying that the 
log and output are clear of exceptions and errors". I think in the real world 
the log is never clear of exceptions and errors, even in the case where 
everything wen't well.

[~till.rohrmann] You think we should maybe just not test for the log being 
clean? I could also add {{AskTimeoutException}} to the list of exceptions that 
we expect to occur. I'm guessing this just sometimes occurs with Akka?

> End-to-end tests are instable on Travis
> ---
>
> Key: FLINK-7300
> URL: https://issues.apache.org/jira/browse/FLINK-7300
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.4.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Aljoscha Krettek
>  Labels: test-stability
> Fix For: 1.4.0
>
>
> It seems like the end-to-end tests are instable, causing the {{misc}} build 
> profile to sporadically fail.
> Incorrect matched output:
> https://s3.amazonaws.com/archive.travis-ci.org/jobs/258569408/log.txt?X-Amz-Expires=30=20170731T060526Z=AWS4-HMAC-SHA256=AKIAJRYRXRSVGNKPKO5A/20170731/us-east-1/s3/aws4_request=host=4ef9ff5e60fe06db53a84be8d73775a46cb595a8caeb806b05dbbf824d3b69e8
> Another failure example of a different cause then the above, also on the 
> end-to-end tests:
> https://s3.amazonaws.com/archive.travis-ci.org/jobs/258841693/log.txt?X-Amz-Expires=30=20170731T060007Z=AWS4-HMAC-SHA256=AKIAJRYRXRSVGNKPKO5A/20170731/us-east-1/s3/aws4_request=host=4a106b3990228b7628c250cc15407bc2c131c8332e1a94ad68d649fe8d32d726



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7446) Support to define an existing field as the rowtime field for TableSource

2017-08-18 Thread Xingcan Cui (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16133050#comment-16133050
 ] 

Xingcan Cui commented on FLINK-7446:


Thanks for the response, [~fhueske]. From my perspective, I prefer the later 
option (create new interfaces/operators...). Maybe we can provide one or more 
default watermark generators. Users can directly set them by providing some 
parameters (e.g., the watermark interval and the expected delay to the latest 
rowtime). Moreover, if the provided watermark generators can not meet the 
requirements, users can implement their own ones. What do you think, [~jark]?

> Support to define an existing field as the rowtime field for TableSource
> 
>
> Key: FLINK-7446
> URL: https://issues.apache.org/jira/browse/FLINK-7446
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Jark Wu
>
> Currently {{DefinedRowtimeAttribute}} can define an appended rowtime field 
> for a {{TableSource}}. But it would be helpful if we can support to define an 
> existing field as the rowtime field. Just like registering a DataStream, the 
> rowtime field can be appended but also can replace an existing field.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7477) Use "hadoop classpath" to augment classpath when available

2017-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16133044#comment-16133044
 ] 

ASF GitHub Bot commented on FLINK-7477:
---

GitHub user aljoscha opened a pull request:

https://github.com/apache/flink/pull/4566

[FLINK-7477] [FLINK-7480] Various improvements to Flink scripts



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aljoscha/flink hadoop-env-improvements

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4566.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4566


commit 6b4d7e5e09dcd913fbb9c84c59fc8a10e6c662cc
Author: Aljoscha Krettek 
Date:   2017-08-18T14:39:41Z

[FLINK-7477] Use "hadoop classpath" to augment classpath when available

This improves the out-of-box experience on GCE and AWS, both of which
don't set a HADOOP_CLASSPATH but have "hadoop" available on the $PATH.

commit f63e2d03d739014f0cd94634d731e552a02c76d9
Author: Aljoscha Krettek 
Date:   2017-08-18T14:40:55Z

[FLINK-7480] Set HADOOP_CONF_DIR to sane default if not set

This improves the out-of-box experience on GCE and AWS, both of which
don't set HADOOP_CONF_DIR by default but use /etc/hadoop/conf




> Use "hadoop classpath" to augment classpath when available
> --
>
> Key: FLINK-7477
> URL: https://issues.apache.org/jira/browse/FLINK-7477
> Project: Flink
>  Issue Type: Bug
>  Components: Startup Shell Scripts
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
> Fix For: 1.4.0
>
>
> Currently, some cloud environments don't properly put the Hadoop jars into 
> {{HADOOP_CLASSPATH}} (or don't set {{HADOOP_CLASSPATH}}) at all. We should 
> check in {{config.sh}} if the {{hadoop}} binary is on the path and augment 
> our {{INTERNAL_HADOOP_CLASSPATHS}} with the result of {{hadoop classpath}} in 
> our scripts.
> This will improve the out-of-box experience of users that otherwise have to 
> manually set {{HADOOP_CLASSPATH}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4566: [FLINK-7477] [FLINK-7480] Various improvements to ...

2017-08-18 Thread aljoscha
GitHub user aljoscha opened a pull request:

https://github.com/apache/flink/pull/4566

[FLINK-7477] [FLINK-7480] Various improvements to Flink scripts



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aljoscha/flink hadoop-env-improvements

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4566.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4566


commit 6b4d7e5e09dcd913fbb9c84c59fc8a10e6c662cc
Author: Aljoscha Krettek 
Date:   2017-08-18T14:39:41Z

[FLINK-7477] Use "hadoop classpath" to augment classpath when available

This improves the out-of-box experience on GCE and AWS, both of which
don't set a HADOOP_CLASSPATH but have "hadoop" available on the $PATH.

commit f63e2d03d739014f0cd94634d731e552a02c76d9
Author: Aljoscha Krettek 
Date:   2017-08-18T14:40:55Z

[FLINK-7480] Set HADOOP_CONF_DIR to sane default if not set

This improves the out-of-box experience on GCE and AWS, both of which
don't set HADOOP_CONF_DIR by default but use /etc/hadoop/conf




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-7477) Use "hadoop classpath" to augment classpath when available

2017-08-18 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7477?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-7477:

Fix Version/s: 1.4.0

> Use "hadoop classpath" to augment classpath when available
> --
>
> Key: FLINK-7477
> URL: https://issues.apache.org/jira/browse/FLINK-7477
> Project: Flink
>  Issue Type: Bug
>  Components: Startup Shell Scripts
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
> Fix For: 1.4.0
>
>
> Currently, some cloud environments don't properly put the Hadoop jars into 
> {{HADOOP_CLASSPATH}} (or don't set {{HADOOP_CLASSPATH}}) at all. We should 
> check in {{config.sh}} if the {{hadoop}} binary is on the path and augment 
> our {{INTERNAL_HADOOP_CLASSPATHS}} with the result of {{hadoop classpath}} in 
> our scripts.
> This will improve the out-of-box experience of users that otherwise have to 
> manually set {{HADOOP_CLASSPATH}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7480) Set HADOOP_CONF_DIR to sane default if not set

2017-08-18 Thread Aljoscha Krettek (JIRA)
Aljoscha Krettek created FLINK-7480:
---

 Summary: Set HADOOP_CONF_DIR to sane default if not set
 Key: FLINK-7480
 URL: https://issues.apache.org/jira/browse/FLINK-7480
 Project: Flink
  Issue Type: Improvement
  Components: Startup Shell Scripts
Reporter: Aljoscha Krettek
Assignee: Aljoscha Krettek
 Fix For: 1.4.0


Currently, both AWS and GCE don't have a {{HADOOP_CONF_DIR}} set by default. 
This makes the out-of-box experience on these cloud environments bad because 
not setting it results in errors that are not obviously clear.

In case {{HADOOP_CONF_DIR}} is not set we should check if {{/etc/hadoop/conf}} 
exits and set {{HADOOP_CONF_DIR}} to that.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7471) Improve bounded OVER support non-retract method AGG

2017-08-18 Thread Jark Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7471?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16133031#comment-16133031
 ] 

Jark Wu commented on FLINK-7471:


Hi [~fhueske], I think [~sunjincheng121] mean that retraction for over 
aggregates is an optimization in most cases but not all cases. For example, 
min/max with retraction needs to store all records of the group in the 
min/max's accumulator. It's hard to say the overload of retraction mode (store 
all records) is certainly less than non-retraction mode (compute all records of 
a group).  That's why the over window can support non-retract AGG. The over 
window mode (retract/non-retract) is depends on whether all the aggregates 
implement the retract methods. And the retract method is not mandatory anymore 
for over window.

> Improve bounded OVER support non-retract method AGG
> ---
>
> Key: FLINK-7471
> URL: https://issues.apache.org/jira/browse/FLINK-7471
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> Currently BOUNDED OVER WINDOW only support have {{retract}} method AGG. In 
> this JIRA. will add non-retract method support.
> What do you think? [~fhueske]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4530: [FLINK-7245] [stream] Support holding back watermarks wit...

2017-08-18 Thread xccui
Github user xccui commented on the issue:

https://github.com/apache/flink/pull/4530
  
Thanks for the comment, @aljoscha. IMO, making the `timeServiceManager` 
protected indeed will minimise the impact on `AbstractStreamOperator`, while 
that may introduce duplicated codes in the subclasses. We make some trade-offs 
here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7245) Enhance the operators to support holding back watermarks

2017-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16133027#comment-16133027
 ] 

ASF GitHub Bot commented on FLINK-7245:
---

Github user xccui commented on the issue:

https://github.com/apache/flink/pull/4530
  
Thanks for the comment, @aljoscha. IMO, making the `timeServiceManager` 
protected indeed will minimise the impact on `AbstractStreamOperator`, while 
that may introduce duplicated codes in the subclasses. We make some trade-offs 
here.


> Enhance the operators to support holding back watermarks
> 
>
> Key: FLINK-7245
> URL: https://issues.apache.org/jira/browse/FLINK-7245
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Reporter: Xingcan Cui
>Assignee: Xingcan Cui
>
> Currently the watermarks are applied and emitted by the 
> {{AbstractStreamOperator}} instantly. 
> {code:java}
> public void processWatermark(Watermark mark) throws Exception {
>   if (timeServiceManager != null) {
>   timeServiceManager.advanceWatermark(mark);
>   }
>   output.emitWatermark(mark);
> }
> {code}
> Some calculation results (with timestamp fields) triggered by these 
> watermarks (e.g., join or aggregate results) may be regarded as delayed by 
> the downstream operators since their timestamps must be less than or equal to 
> the corresponding triggers. 
> This issue aims to add another "working mode", which supports holding back 
> watermarks, to current operators. These watermarks should be blocked and 
> stored by the operators until all the corresponding new generated results are 
> emitted.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7479) Support to retrieve the past event by physical offset

2017-08-18 Thread Dian Fu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7479?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dian Fu updated FLINK-7479:
---
Summary: Support to retrieve the past event by physical offset   (was: 
Support to retrieve the past event by an offset )

> Support to retrieve the past event by physical offset 
> --
>
> Key: FLINK-7479
> URL: https://issues.apache.org/jira/browse/FLINK-7479
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API & SQL
>Reporter: Dian Fu
>Assignee: Dian Fu
>
> Currently, it's already able to retrieve events matched to the specifed 
> pattern in {{IterativeCondition.Context}}. While there are also requirements 
> to retrieve events by an physical offset. The retrieved events may not be 
> matched to any pattern.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7479) Support to retrieve the past event by an offset

2017-08-18 Thread Dian Fu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16133007#comment-16133007
 ] 

Dian Fu commented on FLINK-7479:


This is to support the {{PREV}} clause in {{MATCH_RECOGNIZE}}. As described in 
the [doc|https://docs.oracle.com/database/121/DWHSG/pattern.htm#DWHSG8996], 
{{PREV}} defines an expression using a previous row by physical offset. This 
means that in the {{filter}} method of {{IterativeCondition}}, we may also need 
to access the past events by physical offset and the accessed events may be not 
matched by any pattern.

> Support to retrieve the past event by an offset 
> 
>
> Key: FLINK-7479
> URL: https://issues.apache.org/jira/browse/FLINK-7479
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API & SQL
>Reporter: Dian Fu
>Assignee: Dian Fu
>
> Currently, it's already able to retrieve events matched to the specifed 
> pattern in {{IterativeCondition.Context}}. While there are also requirements 
> to retrieve events by an physical offset. The retrieved events may not be 
> matched to any pattern.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4553: [FLINK-7642] [docs] Add very obvious warning about outdat...

2017-08-18 Thread rmetzger
Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/4553
  
Afaik the problem is that the doc builders are independent of Flink 
version. So we use the same toolchain for building 0.8 docs and 1.4 docs.
The `flink.conf` file in 
https://svn.apache.org/repos/infra/infrastructure/buildbot/aegis/buildmaster/master1/projects
 contains the builder config. I've introduced @twalthr into the ugly details of 
it a few weeks ago, maybe he can look into it. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7429) Add restore from 1.2 / 1.3 migration tests for FlinkKinesisConsumer

2017-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7429?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16132182#comment-16132182
 ] 

ASF GitHub Bot commented on FLINK-7429:
---

GitHub user tzulitai opened a pull request:

https://github.com/apache/flink/pull/4565

[FLINK-7429] [kinesis] Add migration test coverage for Flink 1.2 and 1.3

## What is the purpose of the change

The migration tests for the Kinesis consumer did not cover Flink 1.2 and 
1.3. This pull request fixes that.

It also fixes a minor bug, where restoring from empty state had different 
behaviours across different Flink versions, that was discovered as part of 
additionally having these new migration tests.

## Brief change log

- (1st commit) generalize `FlinkKinesisConsumerMigrationTest` to cover all 
Flink versions & add stored savepoints for Flink 1.2 and 1.3.
- (2nd commit) Fix different restore behaviour when restoring empty state.

## Verifying this change

Previous behaviours should be covered by existing tests.
The additional tests in `FlinkKinesisConsumerMigrationTest` are simply an 
extra guard that we were doing things correctly.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): **no**
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: **no**
  - The serializers: **no**
  - The runtime per-record code paths (performance sensitive): **no**
  - Anything that affects deployment or recovery: **no**

## Documentation

  - Does this pull request introduce a new feature? **no**
  - If yes, how is the feature documented? **not applicable**



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tzulitai/flink FLINK-7429

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4565.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4565


commit 04e73db786eb2cc7129ea6a8fbe05ca156ef3bce
Author: Tzu-Li (Gordon) Tai 
Date:   2017-08-18T03:27:38Z

[FLINK-7429] [kinesis] Add IT tests for migration from 1.2 / 1.3

commit 47fbdf7495a95682a4da69a5e9a060cdf4c496b3
Author: Tzu-Li (Gordon) Tai 
Date:   2017-08-18T13:17:27Z

[FLINK-7429] [kinesis] Unify empty state restore behaviour across 1.1 / 1.2 
/ 1.3

Prior to this commit, when restoring empty state from previous Flink
versions, the behaviour was different for each version. For older
versions, restoring empty state results in `null`. For newer versions,
restoring empty state results in an empty map.

We want that an empty map represents "this is a restored run, but there
was no state for us", and a null to represent" this is not a restored
run".




> Add restore from 1.2 / 1.3 migration tests for FlinkKinesisConsumer
> ---
>
> Key: FLINK-7429
> URL: https://issues.apache.org/jira/browse/FLINK-7429
> Project: Flink
>  Issue Type: Test
>  Components: Kinesis Connector, Tests
>Affects Versions: 1.2.1, 1.4.0, 1.3.2
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.4.0, 1.3.3
>
>
> Currently, the `FlinkKinesisConsumerMigrationTest` only tests restore from 
> Flink 1.1.
> We should extend that to also verify restoring from 1.2 and 1.3.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4565: [FLINK-7429] [kinesis] Add migration test coverage...

2017-08-18 Thread tzulitai
GitHub user tzulitai opened a pull request:

https://github.com/apache/flink/pull/4565

[FLINK-7429] [kinesis] Add migration test coverage for Flink 1.2 and 1.3

## What is the purpose of the change

The migration tests for the Kinesis consumer did not cover Flink 1.2 and 
1.3. This pull request fixes that.

It also fixes a minor bug, where restoring from empty state had different 
behaviours across different Flink versions, that was discovered as part of 
additionally having these new migration tests.

## Brief change log

- (1st commit) generalize `FlinkKinesisConsumerMigrationTest` to cover all 
Flink versions & add stored savepoints for Flink 1.2 and 1.3.
- (2nd commit) Fix different restore behaviour when restoring empty state.

## Verifying this change

Previous behaviours should be covered by existing tests.
The additional tests in `FlinkKinesisConsumerMigrationTest` are simply an 
extra guard that we were doing things correctly.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): **no**
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: **no**
  - The serializers: **no**
  - The runtime per-record code paths (performance sensitive): **no**
  - Anything that affects deployment or recovery: **no**

## Documentation

  - Does this pull request introduce a new feature? **no**
  - If yes, how is the feature documented? **not applicable**



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tzulitai/flink FLINK-7429

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4565.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4565


commit 04e73db786eb2cc7129ea6a8fbe05ca156ef3bce
Author: Tzu-Li (Gordon) Tai 
Date:   2017-08-18T03:27:38Z

[FLINK-7429] [kinesis] Add IT tests for migration from 1.2 / 1.3

commit 47fbdf7495a95682a4da69a5e9a060cdf4c496b3
Author: Tzu-Li (Gordon) Tai 
Date:   2017-08-18T13:17:27Z

[FLINK-7429] [kinesis] Unify empty state restore behaviour across 1.1 / 1.2 
/ 1.3

Prior to this commit, when restoring empty state from previous Flink
versions, the behaviour was different for each version. For older
versions, restoring empty state results in `null`. For newer versions,
restoring empty state results in an empty map.

We want that an empty map represents "this is a restored run, but there
was no state for us", and a null to represent" this is not a restored
run".




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7245) Enhance the operators to support holding back watermarks

2017-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16132174#comment-16132174
 ] 

ASF GitHub Bot commented on FLINK-7245:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4530
  
I think the approach is OK, though I personally would have preferred to 
make the instance variable protected and override `processWatermark()` to 
minimise impact on `AbstractStreamOperator`, which now has an extra method 
call. (though that will probably be inlined)


> Enhance the operators to support holding back watermarks
> 
>
> Key: FLINK-7245
> URL: https://issues.apache.org/jira/browse/FLINK-7245
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Reporter: Xingcan Cui
>Assignee: Xingcan Cui
>
> Currently the watermarks are applied and emitted by the 
> {{AbstractStreamOperator}} instantly. 
> {code:java}
> public void processWatermark(Watermark mark) throws Exception {
>   if (timeServiceManager != null) {
>   timeServiceManager.advanceWatermark(mark);
>   }
>   output.emitWatermark(mark);
> }
> {code}
> Some calculation results (with timestamp fields) triggered by these 
> watermarks (e.g., join or aggregate results) may be regarded as delayed by 
> the downstream operators since their timestamps must be less than or equal to 
> the corresponding triggers. 
> This issue aims to add another "working mode", which supports holding back 
> watermarks, to current operators. These watermarks should be blocked and 
> stored by the operators until all the corresponding new generated results are 
> emitted.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4530: [FLINK-7245] [stream] Support holding back watermarks wit...

2017-08-18 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4530
  
I think the approach is OK, though I personally would have preferred to 
make the instance variable protected and override `processWatermark()` to 
minimise impact on `AbstractStreamOperator`, which now has an extra method 
call. (though that will probably be inlined)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7442) Add option for using a child-first classloader for loading user code

2017-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7442?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16132172#comment-16132172
 ] 

ASF GitHub Bot commented on FLINK-7442:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4554
  
Yes, I created an alternative PR for that: 
https://github.com/apache/flink/pull/4564

My only worry is that all the tests in that PR would also pass with the 
other child-first class loader implementation from the RocksDB test, meaning 
that we don't actually have coverage for something that I discovered by having 
the class loader in the client. If we're fine with that I will close this PR 
and merge the other one once it's reviewed.  


> Add option for using a child-first classloader for loading user code
> 
>
> Key: FLINK-7442
> URL: https://issues.apache.org/jira/browse/FLINK-7442
> Project: Flink
>  Issue Type: Improvement
>  Components: Local Runtime
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4554: [FLINK-7442] Add option for using child-first classloader...

2017-08-18 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4554
  
Yes, I created an alternative PR for that: 
https://github.com/apache/flink/pull/4564

My only worry is that all the tests in that PR would also pass with the 
other child-first class loader implementation from the RocksDB test, meaning 
that we don't actually have coverage for something that I discovered by having 
the class loader in the client. If we're fine with that I will close this PR 
and merge the other one once it's reviewed. 👌 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7442) Add option for using a child-first classloader for loading user code

2017-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7442?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16132168#comment-16132168
 ] 

ASF GitHub Bot commented on FLINK-7442:
---

GitHub user aljoscha opened a pull request:

https://github.com/apache/flink/pull/4564

[FLINK-7442] Add option for using child-first classloader for loading user 
code

This is an alternative to #4554 that does not make the client class loader 
configurable.

## What is the purpose of the change

This PR introduces a new core option (`classloader.resolve-order: 
child-first`) that allows using a child-first class loader for user code. The 
default is still to use a parent-first class loader.

This also does a minor refactoring in the way the blob manager retrieves 
the cleanup interval. It's now also read from the `Configuration`, since we 
already have the `Configuration` for the class loader settings.

## Brief change log

 - Introduce new option
 - Pass `Configuration` thought to all places where we previously created a 
user class loader
 - Instantiate correct class loader based on config

## Verifying this change

This PR introduces new end-to-end tests that verify the new feature in a 
complete Flink workflow, including starting the program using `bin/flink run`.

## Does this pull request potentially affect one of the following parts:


This affects class loader, which is quite important to get right.

## Documentation

 - the new flag is documented in the config documentation


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aljoscha/flink 
jira-7441-child-first-classloader-alternative

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4564.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4564


commit 33bd4cf5160ec64cbaace876b305922b804cc3a1
Author: Aljoscha Krettek 
Date:   2017-08-14T12:53:14Z

[FLINK-7442] Add option for using a child-first classloader for loading 
user code




> Add option for using a child-first classloader for loading user code
> 
>
> Key: FLINK-7442
> URL: https://issues.apache.org/jira/browse/FLINK-7442
> Project: Flink
>  Issue Type: Improvement
>  Components: Local Runtime
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4564: [FLINK-7442] Add option for using child-first clas...

2017-08-18 Thread aljoscha
GitHub user aljoscha opened a pull request:

https://github.com/apache/flink/pull/4564

[FLINK-7442] Add option for using child-first classloader for loading user 
code

This is an alternative to #4554 that does not make the client class loader 
configurable.

## What is the purpose of the change

This PR introduces a new core option (`classloader.resolve-order: 
child-first`) that allows using a child-first class loader for user code. The 
default is still to use a parent-first class loader.

This also does a minor refactoring in the way the blob manager retrieves 
the cleanup interval. It's now also read from the `Configuration`, since we 
already have the `Configuration` for the class loader settings.

## Brief change log

 - Introduce new option
 - Pass `Configuration` thought to all places where we previously created a 
user class loader
 - Instantiate correct class loader based on config

## Verifying this change

This PR introduces new end-to-end tests that verify the new feature in a 
complete Flink workflow, including starting the program using `bin/flink run`.

## Does this pull request potentially affect one of the following parts:


This affects class loader, which is quite important to get right.

## Documentation

 - the new flag is documented in the config documentation


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aljoscha/flink 
jira-7441-child-first-classloader-alternative

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4564.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4564


commit 33bd4cf5160ec64cbaace876b305922b804cc3a1
Author: Aljoscha Krettek 
Date:   2017-08-14T12:53:14Z

[FLINK-7442] Add option for using a child-first classloader for loading 
user code




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7479) Support to retrieve the past event by an offset

2017-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16132141#comment-16132141
 ] 

ASF GitHub Bot commented on FLINK-7479:
---

GitHub user dianfu opened a pull request:

https://github.com/apache/flink/pull/4563

[FLINK-7479] [cep] Support to retrieve the past event by an offset

## What is the purpose of the change

*Currently, it's already able to retrieve events matched to the specifed 
pattern in IterativeCondition.Context. While there are also requirements to 
retrieve events by an physical offset. The retrieved events may not be matched 
to any pattern.*


## Brief change log

  - *Add API retain() in Pattern*
  - *Buffer the past events in NFA.process*
  - *Access the past events by the newly added API getEventByOffset in 
IterativeCondition.Context*


## Verifying this change

This change added tests and can be verified as follows:

  - *Added test in IterativeConditionsITCase*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)

## Documentation

  - Does this pull request introduce a new feature? (yes)
  - If yes, how is the feature documented? (docs / JavaDocs)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dianfu/flink FLINK-7479

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4563.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4563


commit e9feb55e6c2f7d32ec6266049be0ea9bbff967b2
Author: Dian Fu 
Date:   2017-08-18T12:50:25Z

[FLINK-7479] [cep] Support to retrieve the past event by an offset




> Support to retrieve the past event by an offset 
> 
>
> Key: FLINK-7479
> URL: https://issues.apache.org/jira/browse/FLINK-7479
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API & SQL
>Reporter: Dian Fu
>Assignee: Dian Fu
>
> Currently, it's already able to retrieve events matched to the specifed 
> pattern in {{IterativeCondition.Context}}. While there are also requirements 
> to retrieve events by an physical offset. The retrieved events may not be 
> matched to any pattern.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4563: [FLINK-7479] [cep] Support to retrieve the past ev...

2017-08-18 Thread dianfu
GitHub user dianfu opened a pull request:

https://github.com/apache/flink/pull/4563

[FLINK-7479] [cep] Support to retrieve the past event by an offset

## What is the purpose of the change

*Currently, it's already able to retrieve events matched to the specifed 
pattern in IterativeCondition.Context. While there are also requirements to 
retrieve events by an physical offset. The retrieved events may not be matched 
to any pattern.*


## Brief change log

  - *Add API retain() in Pattern*
  - *Buffer the past events in NFA.process*
  - *Access the past events by the newly added API getEventByOffset in 
IterativeCondition.Context*


## Verifying this change

This change added tests and can be verified as follows:

  - *Added test in IterativeConditionsITCase*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)

## Documentation

  - Does this pull request introduce a new feature? (yes)
  - If yes, how is the feature documented? (docs / JavaDocs)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dianfu/flink FLINK-7479

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4563.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4563


commit e9feb55e6c2f7d32ec6266049be0ea9bbff967b2
Author: Dian Fu 
Date:   2017-08-18T12:50:25Z

[FLINK-7479] [cep] Support to retrieve the past event by an offset




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-7479) Support to retrieve the past event by an offset

2017-08-18 Thread Dian Fu (JIRA)
Dian Fu created FLINK-7479:
--

 Summary: Support to retrieve the past event by an offset 
 Key: FLINK-7479
 URL: https://issues.apache.org/jira/browse/FLINK-7479
 Project: Flink
  Issue Type: Sub-task
Reporter: Dian Fu
Assignee: Dian Fu


Currently, it's already able to retrieve events matched to the specifed pattern 
in {{IterativeCondition.Context}}. While there are also requirements to 
retrieve events by an physical offset. The retrieved events may not be matched 
to any pattern.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7473) Possible Leak in GlobalWindows

2017-08-18 Thread Nico Kruber (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7473?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16132125#comment-16132125
 ] 

Nico Kruber commented on FLINK-7473:


Another fix: properly fix your event time use (which is the reason for max int 
in your case):

[Flink 1.3 Docs: Event 
Time|https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/event_time.html]
 states: "... the program needs to either use sources that directly define 
event time for the data and emit watermarks themselves, or the program must 
inject a Timestamp Assigner & Watermark Generator after the sources."
Various sub-topics go into more details, e.g. [Flink 1.3 Docs: Generating 
Timestamps / 
Watermarks|https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/event_timestamps_watermarks.html].

> Possible Leak in GlobalWindows
> --
>
> Key: FLINK-7473
> URL: https://issues.apache.org/jira/browse/FLINK-7473
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.3.2
> Environment: See attached project
>Reporter: Steve Jerman
> Attachments: timerIssue.zip
>
>
> Hi,
> I have been wrestling with a issue with GlobalWindows. It seems like it leaks 
> instances of InternalTimer.
> I can't tell if it's a bug or my code so I created a 'minimal' project that 
> has the issue...
> If you run the Unit Test  in the attached and then monitor heap you will see 
> that the number of InternalTimers continually increases. I added code to 
> explicitly delete them.. doesn't seem to help.
> If I comment out registerEventTimeTimer ... no leak :)
> My suspicion is that PURGE/FIRE_AND_PURGE is leaving the timer in limbo.
> Steve



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7129) Dynamically changing patterns

2017-08-18 Thread Dawid Wysakowicz (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7129?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16132111#comment-16132111
 ] 

Dawid Wysakowicz commented on FLINK-7129:
-

Hi [~dian.fu]
Yes the idea was to enable multiple patterns in a single stream with option 2. 
Unfortunately while implementing it,  I stumbled across a problem that right 
now it is impossible to distribute a Pattern into all instances of operators 
with all keys. Also there might be a problem that some keys were not spotted 
before a new Pattern arrived. 

We could implement a solution that multiple static Patterns are applied to same 
stream, but I don't see much advantage over current solution.

> Dynamically changing patterns
> -
>
> Key: FLINK-7129
> URL: https://issues.apache.org/jira/browse/FLINK-7129
> Project: Flink
>  Issue Type: New Feature
>  Components: CEP
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>
> An umbrella task for introducing mechanism for injecting patterns through 
> coStream



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7471) Improve bounded OVER support non-retract method AGG

2017-08-18 Thread sunjincheng (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7471?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16132090#comment-16132090
 ] 

sunjincheng commented on FLINK-7471:


We know it's poor compute if all aggregates for each record completely from 
scratch performance. that's why we using {{retract}} method to implement 
BOUNDED OVER WINDOW. I think that's the best choice.
We also have add the comment in {{AggregateFunction}} for {{retract}} method.
{code}
...
This(retract) function must be implemented for
datastream bounded over aggregate.
{code}
So, I think we did the right thing when we do the aggregate design and 
implementation of the bounded over. 

But I think using the {{retract}} method is an optimized implementation of 
bounded over window, but we can not force the user to implement the {{retract}} 
method. especially the business scenario is an append only table that never 
generates retract information, so the user may define a aggregate function that 
does not have a retract method. we can not  force the user must implement the 
retract method because of the way of realization for bounded over window. And I 
think we may not need a lot of refactoring the original code, but on the basis 
of the original code to increase the non-retract method support. 

[~fhueske] I think your worry is necessary, but I do not really like the 
implementation of the OVER window will force the user to implement the 
AggregateFunction # retract method.



> Improve bounded OVER support non-retract method AGG
> ---
>
> Key: FLINK-7471
> URL: https://issues.apache.org/jira/browse/FLINK-7471
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> Currently BOUNDED OVER WINDOW only support have {{retract}} method AGG. In 
> this JIRA. will add non-retract method support.
> What do you think? [~fhueske]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7423) Always reuse an instance to get elements from the inputFormat

2017-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16132062#comment-16132062
 ] 

ASF GitHub Bot commented on FLINK-7423:
---

Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/4525
  
@StephanEwen why are `null` values permitted if not in the contract of the 
input formats?


> Always reuse an instance  to get elements from the inputFormat 
> ---
>
> Key: FLINK-7423
> URL: https://issues.apache.org/jira/browse/FLINK-7423
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Reporter: Xu Pingyong
>Assignee: Xu Pingyong
>
> In InputFormatSourceFunction.java:
> {code:java}
> OUT nextElement = serializer.createInstance();
>   while (isRunning) {
>   format.open(splitIterator.next());
>   // for each element we also check if cancel
>   // was called by checking the isRunning flag
>   while (isRunning && !format.reachedEnd()) {
>   nextElement = 
> format.nextRecord(nextElement);
>   if (nextElement != null) {
>   ctx.collect(nextElement);
>   } else {
>   break;
>   }
>   }
>   format.close();
>   completedSplitsCounter.inc();
>   if (isRunning) {
>   isRunning = splitIterator.hasNext();
>   }
>   }
> {code}
> the format may return other element or null when nextRecord, that will may 
> cause exception.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4525: [FLINK-7423] Always reuse an instance to get elements fro...

2017-08-18 Thread greghogan
Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/4525
  
@StephanEwen why are `null` values permitted if not in the contract of the 
input formats?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-7478) Update documentation for sql insert and api change in TableAPI & SQL

2017-08-18 Thread lincoln.lee (JIRA)
lincoln.lee created FLINK-7478:
--

 Summary: Update documentation for sql insert and api change in 
TableAPI & SQL
 Key: FLINK-7478
 URL: https://issues.apache.org/jira/browse/FLINK-7478
 Project: Flink
  Issue Type: New Feature
Reporter: lincoln.lee
Assignee: lincoln.lee
Priority: Minor






--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7358) Add implicitly converts support for User-defined function

2017-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7358?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16132056#comment-16132056
 ] 

ASF GitHub Bot commented on FLINK-7358:
---

Github user sunjincheng121 commented on the issue:

https://github.com/apache/flink/pull/4534
  
Hi, @fhueske Thank you for reminding me :-). I updated the description. and 
feel free to tell me if there is any inappropriate description. 

Thanks, jincheng


> Add  implicitly converts support for User-defined function
> --
>
> Key: FLINK-7358
> URL: https://issues.apache.org/jira/browse/FLINK-7358
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> Currently if user defined a UDF as follows:
> {code}
> object Func extends ScalarFunction {
>   def eval(a: Int, b: Long): String = {
> ...
>   }
> }
> {code}
> And if the table schema is (a: Int, b: int, c: String), then we can not call 
> the UDF `Func('a, 'b)`. So
> I want add implicitly converts when we call UDF. The implicitly convert rule 
> is:
> BYTE_TYPE_INFO -> SHORT_TYPE_INFO -> INT_TYPE_INFO -> LONG_TYPE_INFO -> 
> FLOAT_TYPE_INFO -> DOUBLE_TYPE_INFO
> *Note:
> In this JIRA. only for TableAPI, And SQL will be fixed in 
> https://issues.apache.org/jira/browse/CALCITE-1908.*
> What do you think? [~fhueske]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4534: [FLINK-7358][table]Add implicitly converts support for Us...

2017-08-18 Thread sunjincheng121
Github user sunjincheng121 commented on the issue:

https://github.com/apache/flink/pull/4534
  
Hi, @fhueske Thank you for reminding me :-). I updated the description. and 
feel free to tell me if there is any inappropriate description. 

Thanks, jincheng


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16132042#comment-16132042
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user kaibozhou commented on the issue:

https://github.com/apache/flink/pull/4355
  
1. Use HeapMapView/HeapListView as default implementation
2. add initialize and cleanUp interface to GeneratedAggregations


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7068) change BlobService sub-classes for permanent and transient BLOBs

2017-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16132041#comment-16132041
 ] 

ASF GitHub Bot commented on FLINK-7068:
---

Github user NicoK commented on the issue:

https://github.com/apache/flink/pull/4358
  
Rebased onto `master`, but had to drag in #4402 early to fix the end-to-end 
tests failing due to spurious warnings. The test failure you observed was 
actually a test instability introduced with #4238 for which I added a hotfix to 
this PR.


> change BlobService sub-classes for permanent and transient BLOBs
> 
>
> Key: FLINK-7068
> URL: https://issues.apache.org/jira/browse/FLINK-7068
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, Network
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> A {{PermanentBlobStore}} should resemble use cases for BLOBs that are 
> permanently stored for a job's life time (HA and non-HA).
> A {{TransientBlobStore}} should reflect BLOB offloading for logs, RPC, etc. 
> which even does not have to be reflected by files.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4355: [FLINK-7206] [table] Implementation of DataView to suppor...

2017-08-18 Thread kaibozhou
Github user kaibozhou commented on the issue:

https://github.com/apache/flink/pull/4355
  
1. Use HeapMapView/HeapListView as default implementation
2. add initialize and cleanUp interface to GeneratedAggregations


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #4358: [FLINK-7068][blob] change BlobService sub-classes for per...

2017-08-18 Thread NicoK
Github user NicoK commented on the issue:

https://github.com/apache/flink/pull/4358
  
Rebased onto `master`, but had to drag in #4402 early to fix the end-to-end 
tests failing due to spurious warnings. The test failure you observed was 
actually a test instability introduced with #4238 for which I added a hotfix to 
this PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-18 Thread kaibozhou
Github user kaibozhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r133925176
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -179,13 +214,19 @@ class AggregationCodeGenerator(
|  ${aggMapping(i)},
|  (${accTypes(i)}) accs.getField($i));""".stripMargin
   } else {
+val setDataView = if (accConfig.isDefined && 
accConfig.get.isUseState) {
--- End diff --

Yes, it make sense.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16132016#comment-16132016
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user kaibozhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r133925097
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -405,6 +481,17 @@ class AggregationCodeGenerator(
   }
 }
 
+val aggFuncCode = Seq(
+  genSetAggregationResults,
+  genAccumulate,
+  genRetract,
+  genCreateAccumulators,
+  genSetForwardedFields,
+  genSetConstantFlags,
+  genCreateOutputRow,
+  genMergeAccumulatorsPair,
+  genResetAccumulator).mkString("\n")
--- End diff --

It make sense,I have look at ProcessFunctionWithCleanupState,the cleanUp 
should be called whenever cleanupState is called.


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-18 Thread kaibozhou
Github user kaibozhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r133925097
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -405,6 +481,17 @@ class AggregationCodeGenerator(
   }
 }
 
+val aggFuncCode = Seq(
+  genSetAggregationResults,
+  genAccumulate,
+  genRetract,
+  genCreateAccumulators,
+  genSetForwardedFields,
+  genSetConstantFlags,
+  genCreateOutputRow,
+  genMergeAccumulatorsPair,
+  genResetAccumulator).mkString("\n")
--- End diff --

It make sense,I have look at ProcessFunctionWithCleanupState,the 
cleanUp should be called whenever cleanupState is called.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16132017#comment-16132017
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user kaibozhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r133925176
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -179,13 +214,19 @@ class AggregationCodeGenerator(
|  ${aggMapping(i)},
|  (${accTypes(i)}) accs.getField($i));""".stripMargin
   } else {
+val setDataView = if (accConfig.isDefined && 
accConfig.get.isUseState) {
--- End diff --

Yes, it make sense.


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4355: [FLINK-7206] [table] Implementation of DataView to...

2017-08-18 Thread kaibozhou
Github user kaibozhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r133924725
  
--- Diff: 
flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaUserDefinedAggFunctions.java
 ---
@@ -135,4 +138,172 @@ public void retract(WeightedAvgAccum accumulator, int 
iValue, int iWeight) {
accumulator.count -= iWeight;
}
}
+
+   /**
+* CountDistinct accumulator.
+*/
+   public static class CountDistinctAccum {
+   public MapView map;
+   public long count;
+   }
+
+   /**
+* CountDistinct aggregate.
+*/
+   public static class CountDistinct extends AggregateFunction {
--- End diff --

Yes, CountDistinct is just used for test case here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7206) Implementation of DataView to support state access for UDAGG

2017-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16132010#comment-16132010
 ] 

ASF GitHub Bot commented on FLINK-7206:
---

Github user kaibozhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/4355#discussion_r133924725
  
--- Diff: 
flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaUserDefinedAggFunctions.java
 ---
@@ -135,4 +138,172 @@ public void retract(WeightedAvgAccum accumulator, int 
iValue, int iWeight) {
accumulator.count -= iWeight;
}
}
+
+   /**
+* CountDistinct accumulator.
+*/
+   public static class CountDistinctAccum {
+   public MapView map;
+   public long count;
+   }
+
+   /**
+* CountDistinct aggregate.
+*/
+   public static class CountDistinct extends AggregateFunction {
--- End diff --

Yes, CountDistinct is just used for test case here.


> Implementation of DataView to support state access for UDAGG
> 
>
> Key: FLINK-7206
> URL: https://issues.apache.org/jira/browse/FLINK-7206
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kaibo Zhou
>Assignee: Kaibo Zhou
>
> Implementation of MapView and ListView to support state access for UDAGG.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7442) Add option for using a child-first classloader for loading user code

2017-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7442?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16131997#comment-16131997
 ] 

ASF GitHub Bot commented on FLINK-7442:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/4554
  
I think what you mentioned is one more reason to not use this in too many 
places for now, but only inside the TaskManager / Tasks. Let's introduce that 
as a tool that users can use to resolve conflicts and gather some feedback 
before we pull that into client / queryableStateClient / etc...


> Add option for using a child-first classloader for loading user code
> 
>
> Key: FLINK-7442
> URL: https://issues.apache.org/jira/browse/FLINK-7442
> Project: Flink
>  Issue Type: Improvement
>  Components: Local Runtime
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4554: [FLINK-7442] Add option for using child-first classloader...

2017-08-18 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/4554
  
I think what you mentioned is one more reason to not use this in too many 
places for now, but only inside the TaskManager / Tasks. Let's introduce that 
as a tool that users can use to resolve conflicts and gather some feedback 
before we pull that into client / queryableStateClient / etc...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6442) Extend TableAPI Support Sink Table Registration and ‘insert into’ Clause in SQL

2017-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6442?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16131995#comment-16131995
 ] 

ASF GitHub Bot commented on FLINK-6442:
---

Github user lincoln-lil commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r133921697
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -502,26 +513,140 @@ abstract class TableEnvironment(val config: 
TableConfig) {
 *   tEnv.sql(s"SELECT * FROM $table")
 * }}}
 *
-* @param query The SQL query to evaluate.
+* @param sql The SQL string to evaluate.
 * @return The result of the query as Table.
 */
-  def sql(query: String): Table = {
+  @deprecated
+  def sql(sql: String): Table = {
 val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, 
getTypeFactory)
 // parse the sql query
-val parsed = planner.parse(query)
+val parsed = planner.parse(sql)
 // validate the sql query
 val validated = planner.validate(parsed)
 // transform to a relational tree
 val relational = planner.rel(validated)
-
 new Table(this, LogicalRelNode(relational.rel))
   }
 
   /**
+* Evaluates a SQL Select query on registered tables and retrieves the 
result as a
+* [[Table]].
+*
+* All tables referenced by the query must be registered in the 
TableEnvironment. But
+* [[Table.toString]] will automatically register an unique table name 
and return the
+* table name. So it allows to call SQL directly on tables like this:
+*
+* {{{
+*   val table: Table = ...
+*   // the table is not registered to the table environment
+*   tEnv.sqlSelect(s"SELECT * FROM $table")
+* }}}
+*
+* @param sql The SQL string to evaluate.
+* @return The result of the query as Table or null of the DML insert 
operation.
+*/
+  def sqlQuery(sql: String): Table = {
+val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, 
getTypeFactory)
+// parse the sql query
+val parsed = planner.parse(sql)
+if (null != parsed && parsed.getKind.belongsTo(SqlKind.QUERY)) {
+  // validate the sql query
+  val validated = planner.validate(parsed)
+  // transform to a relational tree
+  val relational = planner.rel(validated)
+  new Table(this, LogicalRelNode(relational.rel))
+} else {
+  throw new TableException(
+"Unsupported sql query! sqlQuery Only accept SELECT, UNION, 
INTERSECT, EXCEPT, VALUES, " +
--- End diff --

SqlParser.parseStmt() actually call the SqlParser.parseQuery, so they're 
the same. Could not help us to distinguish the sql type, so use SqlKind here, 
SqlKind.QUERY consists of: SELECT, EXCEPT, INTERSECT, UNION, VALUES, ORDER_BY, 
EXPLICIT_TABLE.


> Extend TableAPI Support Sink Table Registration and ‘insert into’ Clause in 
> SQL
> ---
>
> Key: FLINK-6442
> URL: https://issues.apache.org/jira/browse/FLINK-6442
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: lincoln.lee
>Assignee: lincoln.lee
>Priority: Minor
>
> Currently in TableAPI  there’s only registration method for source table,  
> when we use SQL writing a streaming job, we should add additional part for 
> the sink, like TableAPI does:
> {code}
> val sqlQuery = "SELECT * FROM MyTable WHERE _1 = 3"
> val t = StreamTestData.getSmall3TupleDataStream(env)
> tEnv.registerDataStream("MyTable", t)
> // one way: invoke tableAPI’s writeToSink method directly
> val result = tEnv.sql(sqlQuery)
> result.writeToSink(new YourStreamSink)
> // another way: convert to datastream first and then invoke addSink 
> val result = tEnv.sql(sqlQuery).toDataStream[Row]
> result.addSink(new StreamITCase.StringSink)
> {code}
> From the api we can see the sink table always be a derived table because its 
> 'schema' is inferred from the result type of upstream query.
> Compare to traditional RDBMS which support DML syntax, a query with a target 
> output could be written like this:
> {code}
> insert into table target_table_name
> [(column_name [ ,...n ])]
> query
> {code}
> The equivalent form of the example above is as follows:
> {code}
> tEnv.registerTableSink("targetTable", new YourSink)
> val sql = "INSERT INTO targetTable SELECT a, b, c FROM sourceTable"
> val result = tEnv.sql(sql)
> {code}
> It is supported by Calcite’s grammar: 
> {code}
>  insert:( INSERT | UPSERT ) INTO tablePrimary
>  [ '(' column [, column ]* ')' ]
>  query
> {code}
> 

[jira] [Commented] (FLINK-6442) Extend TableAPI Support Sink Table Registration and ‘insert into’ Clause in SQL

2017-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6442?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16131996#comment-16131996
 ] 

ASF GitHub Bot commented on FLINK-6442:
---

Github user lincoln-lil commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r133920847
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/queryConfig.scala
 ---
@@ -19,9 +19,21 @@
 package org.apache.flink.table.api
 
 import _root_.java.io.Serializable
+
 import org.apache.flink.api.common.time.Time
 
-class QueryConfig private[table] extends Serializable {}
+class QueryConfig private[table] extends Serializable {
--- End diff --

There're more than one place need this get default QueryConfig from table 
env.


> Extend TableAPI Support Sink Table Registration and ‘insert into’ Clause in 
> SQL
> ---
>
> Key: FLINK-6442
> URL: https://issues.apache.org/jira/browse/FLINK-6442
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: lincoln.lee
>Assignee: lincoln.lee
>Priority: Minor
>
> Currently in TableAPI  there’s only registration method for source table,  
> when we use SQL writing a streaming job, we should add additional part for 
> the sink, like TableAPI does:
> {code}
> val sqlQuery = "SELECT * FROM MyTable WHERE _1 = 3"
> val t = StreamTestData.getSmall3TupleDataStream(env)
> tEnv.registerDataStream("MyTable", t)
> // one way: invoke tableAPI’s writeToSink method directly
> val result = tEnv.sql(sqlQuery)
> result.writeToSink(new YourStreamSink)
> // another way: convert to datastream first and then invoke addSink 
> val result = tEnv.sql(sqlQuery).toDataStream[Row]
> result.addSink(new StreamITCase.StringSink)
> {code}
> From the api we can see the sink table always be a derived table because its 
> 'schema' is inferred from the result type of upstream query.
> Compare to traditional RDBMS which support DML syntax, a query with a target 
> output could be written like this:
> {code}
> insert into table target_table_name
> [(column_name [ ,...n ])]
> query
> {code}
> The equivalent form of the example above is as follows:
> {code}
> tEnv.registerTableSink("targetTable", new YourSink)
> val sql = "INSERT INTO targetTable SELECT a, b, c FROM sourceTable"
> val result = tEnv.sql(sql)
> {code}
> It is supported by Calcite’s grammar: 
> {code}
>  insert:( INSERT | UPSERT ) INTO tablePrimary
>  [ '(' column [, column ]* ')' ]
>  query
> {code}
> I'd like to extend Flink TableAPI to support such feature.  see design doc: 
> https://goo.gl/n3phK5



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...

2017-08-18 Thread lincoln-lil
Github user lincoln-lil commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r133920847
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/queryConfig.scala
 ---
@@ -19,9 +19,21 @@
 package org.apache.flink.table.api
 
 import _root_.java.io.Serializable
+
 import org.apache.flink.api.common.time.Time
 
-class QueryConfig private[table] extends Serializable {}
+class QueryConfig private[table] extends Serializable {
--- End diff --

There're more than one place need this get default QueryConfig from table 
env.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3829: [FLINK-6442] [table] Extend TableAPI Support Sink ...

2017-08-18 Thread lincoln-lil
Github user lincoln-lil commented on a diff in the pull request:

https://github.com/apache/flink/pull/3829#discussion_r133921697
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -502,26 +513,140 @@ abstract class TableEnvironment(val config: 
TableConfig) {
 *   tEnv.sql(s"SELECT * FROM $table")
 * }}}
 *
-* @param query The SQL query to evaluate.
+* @param sql The SQL string to evaluate.
 * @return The result of the query as Table.
 */
-  def sql(query: String): Table = {
+  @deprecated
+  def sql(sql: String): Table = {
 val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, 
getTypeFactory)
 // parse the sql query
-val parsed = planner.parse(query)
+val parsed = planner.parse(sql)
 // validate the sql query
 val validated = planner.validate(parsed)
 // transform to a relational tree
 val relational = planner.rel(validated)
-
 new Table(this, LogicalRelNode(relational.rel))
   }
 
   /**
+* Evaluates a SQL Select query on registered tables and retrieves the 
result as a
+* [[Table]].
+*
+* All tables referenced by the query must be registered in the 
TableEnvironment. But
+* [[Table.toString]] will automatically register an unique table name 
and return the
+* table name. So it allows to call SQL directly on tables like this:
+*
+* {{{
+*   val table: Table = ...
+*   // the table is not registered to the table environment
+*   tEnv.sqlSelect(s"SELECT * FROM $table")
+* }}}
+*
+* @param sql The SQL string to evaluate.
+* @return The result of the query as Table or null of the DML insert 
operation.
+*/
+  def sqlQuery(sql: String): Table = {
+val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, 
getTypeFactory)
+// parse the sql query
+val parsed = planner.parse(sql)
+if (null != parsed && parsed.getKind.belongsTo(SqlKind.QUERY)) {
+  // validate the sql query
+  val validated = planner.validate(parsed)
+  // transform to a relational tree
+  val relational = planner.rel(validated)
+  new Table(this, LogicalRelNode(relational.rel))
+} else {
+  throw new TableException(
+"Unsupported sql query! sqlQuery Only accept SELECT, UNION, 
INTERSECT, EXCEPT, VALUES, " +
--- End diff --

SqlParser.parseStmt() actually call the SqlParser.parseQuery, so they're 
the same. Could not help us to distinguish the sql type, so use SqlKind here, 
SqlKind.QUERY consists of: SELECT, EXCEPT, INTERSECT, UNION, VALUES, ORDER_BY, 
EXPLICIT_TABLE.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7423) Always reuse an instance to get elements from the inputFormat

2017-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16131993#comment-16131993
 ] 

ASF GitHub Bot commented on FLINK-7423:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/4525
  
The logic is currently not correct with the contract of the input formats. 
A return value of null is not an "end of split" indicator.

Also, the description mentions that this adds a test, which I cannot find 
in the diff...


> Always reuse an instance  to get elements from the inputFormat 
> ---
>
> Key: FLINK-7423
> URL: https://issues.apache.org/jira/browse/FLINK-7423
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Reporter: Xu Pingyong
>Assignee: Xu Pingyong
>
> In InputFormatSourceFunction.java:
> {code:java}
> OUT nextElement = serializer.createInstance();
>   while (isRunning) {
>   format.open(splitIterator.next());
>   // for each element we also check if cancel
>   // was called by checking the isRunning flag
>   while (isRunning && !format.reachedEnd()) {
>   nextElement = 
> format.nextRecord(nextElement);
>   if (nextElement != null) {
>   ctx.collect(nextElement);
>   } else {
>   break;
>   }
>   }
>   format.close();
>   completedSplitsCounter.inc();
>   if (isRunning) {
>   isRunning = splitIterator.hasNext();
>   }
>   }
> {code}
> the format may return other element or null when nextRecord, that will may 
> cause exception.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4525: [FLINK-7423] Always reuse an instance to get elements fro...

2017-08-18 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/4525
  
The logic is currently not correct with the contract of the input formats. 
A return value of null is not an "end of split" indicator.

Also, the description mentions that this adds a test, which I cannot find 
in the diff...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-7477) Use "hadoop classpath" to augment classpath when available

2017-08-18 Thread Aljoscha Krettek (JIRA)
Aljoscha Krettek created FLINK-7477:
---

 Summary: Use "hadoop classpath" to augment classpath when available
 Key: FLINK-7477
 URL: https://issues.apache.org/jira/browse/FLINK-7477
 Project: Flink
  Issue Type: Bug
  Components: Startup Shell Scripts
Reporter: Aljoscha Krettek
Assignee: Aljoscha Krettek


Currently, some cloud environments don't properly put the Hadoop jars into 
{{HADOOP_CLASSPATH}} (or don't set {{HADOOP_CLASSPATH}}) at all. We should 
check in {{config.sh}} if the {{hadoop}} binary is on the path and augment our 
{{INTERNAL_HADOOP_CLASSPATHS}} with the result of {{hadoop classpath}} in our 
scripts.

This will improve the out-of-box experience of users that otherwise have to 
manually set {{HADOOP_CLASSPATH}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7475) ListState support update

2017-08-18 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7475?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-7475:

Component/s: DataStream API

> ListState support update
> 
>
> Key: FLINK-7475
> URL: https://issues.apache.org/jira/browse/FLINK-7475
> Project: Flink
>  Issue Type: Improvement
>  Components: Core, DataStream API
>Reporter: yf
>
> If I want to update the list. 
> I have to do two steps: 
> listState.clear() 
> for (Element e : myList) { 
> listState.add(e); 
> } 
> Why not I update the state by: 
> listState.update(myList) ?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7347) "removeAll" is extremely inefficient in MessageAcknowledgingSourceBase.notifyCheckpointComplete

2017-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7347?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16131967#comment-16131967
 ] 

ASF GitHub Bot commented on FLINK-7347:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4468
  
Thanks a lot for contributing this fix, @ymost!  

I merged into master, could you please close this PR?


> "removeAll" is extremely inefficient in 
> MessageAcknowledgingSourceBase.notifyCheckpointComplete
> ---
>
> Key: FLINK-7347
> URL: https://issues.apache.org/jira/browse/FLINK-7347
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.3.1
>Reporter: Yonatan Most
>Assignee: Yonatan Most
> Fix For: 1.4.0
>
>
> Observe this line in 
> {{MessageAcknowledgingSourceBase.notifyCheckpointComplete}}:
> {code}
> idsProcessedButNotAcknowledged.removeAll(checkpoint.f1);
> {code}
> The implementation of {{removeAll}} is such that if the set is smaller than 
> the collection to remove, then the set is iterated and every item is checked 
> for containment in the collection. The type of {{checkpoint.f1}} here is 
> {{ArrayList}}, so the {{contains}} action is very inefficient, and it is 
> performed for every item in {{idsProcessedButNotAcknowledged}}.
> In our pipeline we had about 10 million events processed, and the checkpoint 
> was stuck on the {{removeAll}} call for hours.
> A simple solution is to make {{idsForCurrentCheckpoint}} a {{HashSet}} 
> instead of an {{ArrayList}}. The fact that it's a list is not really used 
> anywhere.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4454: [hotfix][docs] Add section in docs about writing unit/int...

2017-08-18 Thread pnowojski
Github user pnowojski commented on the issue:

https://github.com/apache/flink/pull/4454
  
@twalthr, kindly reminder :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #4468: [FLINK-7347] [streaming] Keep ids for current checkpoint ...

2017-08-18 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4468
  
Thanks a lot for contributing this fix, @ymost! 👍 

I merged into master, could you please close this PR?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Closed] (FLINK-7347) "removeAll" is extremely inefficient in MessageAcknowledgingSourceBase.notifyCheckpointComplete

2017-08-18 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7347?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed FLINK-7347.
---
Resolution: Fixed

Implemented in 76f1022884fe7b291fe81028a29896fb5b5ca5c9 on master.

> "removeAll" is extremely inefficient in 
> MessageAcknowledgingSourceBase.notifyCheckpointComplete
> ---
>
> Key: FLINK-7347
> URL: https://issues.apache.org/jira/browse/FLINK-7347
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.3.1
>Reporter: Yonatan Most
>Assignee: Yonatan Most
> Fix For: 1.4.0
>
>
> Observe this line in 
> {{MessageAcknowledgingSourceBase.notifyCheckpointComplete}}:
> {code}
> idsProcessedButNotAcknowledged.removeAll(checkpoint.f1);
> {code}
> The implementation of {{removeAll}} is such that if the set is smaller than 
> the collection to remove, then the set is iterated and every item is checked 
> for containment in the collection. The type of {{checkpoint.f1}} here is 
> {{ArrayList}}, so the {{contains}} action is very inefficient, and it is 
> performed for every item in {{idsProcessedButNotAcknowledged}}.
> In our pipeline we had about 10 million events processed, and the checkpoint 
> was stuck on the {{removeAll}} call for hours.
> A simple solution is to make {{idsForCurrentCheckpoint}} a {{HashSet}} 
> instead of an {{ArrayList}}. The fact that it's a list is not really used 
> anywhere.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (FLINK-7347) "removeAll" is extremely inefficient in MessageAcknowledgingSourceBase.notifyCheckpointComplete

2017-08-18 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7347?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek reassigned FLINK-7347:
---

Assignee: Yonatan Most

> "removeAll" is extremely inefficient in 
> MessageAcknowledgingSourceBase.notifyCheckpointComplete
> ---
>
> Key: FLINK-7347
> URL: https://issues.apache.org/jira/browse/FLINK-7347
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.3.1
>Reporter: Yonatan Most
>Assignee: Yonatan Most
> Fix For: 1.4.0
>
>
> Observe this line in 
> {{MessageAcknowledgingSourceBase.notifyCheckpointComplete}}:
> {code}
> idsProcessedButNotAcknowledged.removeAll(checkpoint.f1);
> {code}
> The implementation of {{removeAll}} is such that if the set is smaller than 
> the collection to remove, then the set is iterated and every item is checked 
> for containment in the collection. The type of {{checkpoint.f1}} here is 
> {{ArrayList}}, so the {{contains}} action is very inefficient, and it is 
> performed for every item in {{idsProcessedButNotAcknowledged}}.
> In our pipeline we had about 10 million events processed, and the checkpoint 
> was stuck on the {{removeAll}} call for hours.
> A simple solution is to make {{idsForCurrentCheckpoint}} a {{HashSet}} 
> instead of an {{ArrayList}}. The fact that it's a list is not really used 
> anywhere.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7347) "removeAll" is extremely inefficient in MessageAcknowledgingSourceBase.notifyCheckpointComplete

2017-08-18 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7347?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-7347:

Fix Version/s: 1.4.0

> "removeAll" is extremely inefficient in 
> MessageAcknowledgingSourceBase.notifyCheckpointComplete
> ---
>
> Key: FLINK-7347
> URL: https://issues.apache.org/jira/browse/FLINK-7347
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.3.1
>Reporter: Yonatan Most
> Fix For: 1.4.0
>
>
> Observe this line in 
> {{MessageAcknowledgingSourceBase.notifyCheckpointComplete}}:
> {code}
> idsProcessedButNotAcknowledged.removeAll(checkpoint.f1);
> {code}
> The implementation of {{removeAll}} is such that if the set is smaller than 
> the collection to remove, then the set is iterated and every item is checked 
> for containment in the collection. The type of {{checkpoint.f1}} here is 
> {{ArrayList}}, so the {{contains}} action is very inefficient, and it is 
> performed for every item in {{idsProcessedButNotAcknowledged}}.
> In our pipeline we had about 10 million events processed, and the checkpoint 
> was stuck on the {{removeAll}} call for hours.
> A simple solution is to make {{idsForCurrentCheckpoint}} a {{HashSet}} 
> instead of an {{ArrayList}}. The fact that it's a list is not really used 
> anywhere.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7465) Add build-in BloomFilterCount on TableAPI

2017-08-18 Thread Jark Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7465?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16131948#comment-16131948
 ] 

Jark Wu commented on FLINK-7465:


I think the problem is how to design the state. Even if it is a bitarray or 
bitmap, it is expensive to de/serialize the state when every time call the 
{{accumulate}}.

> Add build-in BloomFilterCount on TableAPI
> -
>
> Key: FLINK-7465
> URL: https://issues.apache.org/jira/browse/FLINK-7465
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: sunjincheng
> Attachments: bloomfilter.png
>
>
> In this JIRA. use BloomFilter to implement counting functions.
> BloomFilter Algorithm description:
> An empty Bloom filter is a bit array of m bits, all set to 0. There must also 
> be k different hash functions defined, each of which maps or hashes some set 
> element to one of the m array positions, generating a uniform random 
> distribution. Typically, k is a constant, much smaller than m, which is 
> proportional to the number of elements to be added; the precise choice of k 
> and the constant of proportionality of m are determined by the intended false 
> positive rate of the filter.
> To add an element, feed it to each of the k hash functions to get k array 
> positions. Set the bits at all these positions to 1.
> To query for an element (test whether it is in the set), feed it to each of 
> the k hash functions to get k array positions. If any of the bits at these 
> positions is 0, the element is definitely not in the set – if it were, then 
> all the bits would have been set to 1 when it was inserted. If all are 1, 
> then either the element is in the set, or the bits have by chance been set to 
> 1 during the insertion of other elements, resulting in a false positive.
> An example of a Bloom filter, representing the set {x, y, z}. The colored 
> arrows show the positions in the bit array that each set element is mapped 
> to. The element w is not in the set {x, y, z}, because it hashes to one 
> bit-array position containing 0. For this figure, m = 18 and k = 3. The 
> sketch as follows:
> !bloomfilter.png!
> Reference:
> 1. https://en.wikipedia.org/wiki/Bloom_filter
> 2. 
> https://github.com/apache/hive/blob/master/storage-api/src/java/org/apache/hive/common/util/BloomFilter.java
> Hi [~fhueske] [~twalthr] I appreciated if you can give me some advice. :-)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7402) Ineffective null check in NettyMessage#write()

2017-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7402?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16131944#comment-16131944
 ] 

ASF GitHub Bot commented on FLINK-7402:
---

GitHub user zjureel opened a pull request:

https://github.com/apache/flink/pull/4562

[FLINK-7402] Fix ineffective null check in NettyMessage#write()

## What is the purpose of the change

Fix ineffective null check in NettyMessage#write()

## Brief change log

*(for example:)*
  - *Add null check and remove unnecessary null check*


## Verifying this change

*(Please pick either of the following options)*

No test case

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable / docs / 
JavaDocs / not documented)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zjureel/flink FLINK-7402

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4562.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4562


commit e289b37cf8adce56876b47ee33ed161058bf64e6
Author: zjureel 
Date:   2017-08-18T06:43:34Z

[FLINK-7402] Fix ineffective null check in NettyMessage#write()




> Ineffective null check in NettyMessage#write()
> --
>
> Key: FLINK-7402
> URL: https://issues.apache.org/jira/browse/FLINK-7402
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Priority: Minor
>
> Here is the null check in finally block:
> {code}
>   finally {
> if (buffer != null) {
>   buffer.recycle();
> }
> {code}
> But buffer has been dereferenced in the try block without guard.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4562: [FLINK-7402] Fix ineffective null check in NettyMe...

2017-08-18 Thread zjureel
GitHub user zjureel opened a pull request:

https://github.com/apache/flink/pull/4562

[FLINK-7402] Fix ineffective null check in NettyMessage#write()

## What is the purpose of the change

Fix ineffective null check in NettyMessage#write()

## Brief change log

*(for example:)*
  - *Add null check and remove unnecessary null check*


## Verifying this change

*(Please pick either of the following options)*

No test case

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable / docs / 
JavaDocs / not documented)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zjureel/flink FLINK-7402

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4562.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4562


commit e289b37cf8adce56876b47ee33ed161058bf64e6
Author: zjureel 
Date:   2017-08-18T06:43:34Z

[FLINK-7402] Fix ineffective null check in NettyMessage#write()




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7442) Add option for using a child-first classloader for loading user code

2017-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7442?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16131926#comment-16131926
 ] 

ASF GitHub Bot commented on FLINK-7442:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4554
  
Probably, yes. I also don't know what other stuff this could break, though. 
For example, I don't know if queryable state, which starts netty (and Akka) 
stuff will still work with this. Unfortunately we don't have any end-to-end 
tests for that.


> Add option for using a child-first classloader for loading user code
> 
>
> Key: FLINK-7442
> URL: https://issues.apache.org/jira/browse/FLINK-7442
> Project: Flink
>  Issue Type: Improvement
>  Components: Local Runtime
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4554: [FLINK-7442] Add option for using child-first classloader...

2017-08-18 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4554
  
Probably, yes. I also don't know what other stuff this could break, though. 
For example, I don't know if queryable state, which starts netty (and Akka) 
stuff will still work with this. Unfortunately we don't have any end-to-end 
tests for that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6988) Add Apache Kafka 0.11 connector

2017-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16131913#comment-16131913
 ] 

ASF GitHub Bot commented on FLINK-6988:
---

Github user pnowojski commented on the issue:

https://github.com/apache/flink/pull/4239
  
Implemented fixed size pool of producers, please check last commit.

If we run out of producers in the pool, exception is being thrown aborting 
ongoing snapshot.


> Add Apache Kafka 0.11 connector
> ---
>
> Key: FLINK-6988
> URL: https://issues.apache.org/jira/browse/FLINK-6988
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.3.1
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>
> Kafka 0.11 (it will be released very soon) add supports for transactions. 
> Thanks to that, Flink might be able to implement Kafka sink supporting 
> "exactly-once" semantic. API changes and whole transactions support is 
> described in 
> [KIP-98|https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging].
> The goal is to mimic implementation of existing BucketingSink. New 
> FlinkKafkaProducer011 would 
> * upon creation begin transaction, store transaction identifiers into the 
> state and would write all incoming data to an output Kafka topic using that 
> transaction
> * on `snapshotState` call, it would flush the data and write in state 
> information that current transaction is pending to be committed
> * on `notifyCheckpointComplete` we would commit this pending transaction
> * in case of crash between `snapshotState` and `notifyCheckpointComplete` we 
> either abort this pending transaction (if not every participant successfully 
> saved the snapshot) or restore and commit it. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4239: [FLINK-6988] flink-connector-kafka-0.11 with exactly-once...

2017-08-18 Thread pnowojski
Github user pnowojski commented on the issue:

https://github.com/apache/flink/pull/4239
  
Implemented fixed size pool of producers, please check last commit.

If we run out of producers in the pool, exception is being thrown aborting 
ongoing snapshot.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7476) Try to recover from failure in TwoPhaseCommitSinkFunction.beginTransaction

2017-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16131907#comment-16131907
 ] 

ASF GitHub Bot commented on FLINK-7476:
---

GitHub user pnowojski opened a pull request:

https://github.com/apache/flink/pull/4561

 [FLINK-7476][streaming] Continue using previous transaction on failures

First commit comes from #4557.

Previously when using TwoPhaseCommitSinkFunction, if there was some 
intermittent failure in "beginTransaction", not only the snapshot that 
triggered this call failed, but also any subsequent write requests would fail 
also. This caused such sink unusable until application restart.

This PR changes order of execution of the methods from `PublicEvolving` 
class that has not been yet released.

PR is covered by existing tests in `TwoPhaseCommitSinkFunctionTest` as well 
as two additional test cases (`testContinueWorkOnBeginTransactionFailure` would 
be failing before this PR).

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/pnowojski/flink 2phase-recover

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4561.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4561


commit 813c8121b406b60ea478f4765b33b7d75c221d1e
Author: Piotr Nowojski 
Date:   2017-08-14T14:40:45Z

[hotifx][streaming] Simplify state of TwoPhaseCommitSinkFunction

commit 249b6419af3d15414dc411838aba624d0ee2f3a1
Author: Piotr Nowojski 
Date:   2017-08-14T13:09:39Z

[hotfix][tests] Implement AutoCloseable in TestHarness

commit a0ae6324dcaded581a3352c8ff4bae6e86e01fde
Author: Piotr Nowojski 
Date:   2017-08-17T13:46:47Z

[hotfix][streaming] Refactor TwoPhaseCommitSinkFunctionTest

commit e3c7dc83ccbce2505be5769fa0827b09dfa54875
Author: Piotr Nowojski 
Date:   2017-08-17T10:29:16Z

[FLINK-7476][streaming] Continue using previous transaction on failures

Previuosly when using TwoPhaseCommitSinkFunction, if there was some 
intermittent failure
in "beginTransaction", not only the snapshot that triggered this call 
failed, but also
any subsequent write requests would fail also. This caused such sink 
unusable until
application restart.




> Try to recover from failure in TwoPhaseCommitSinkFunction.beginTransaction
> --
>
> Key: FLINK-7476
> URL: https://issues.apache.org/jira/browse/FLINK-7476
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.4.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
> Fix For: 1.4.0
>
>
> Currently when using TwoPhaseCommitSinkFunction, if there is some 
> intermittent failure in "beginTransaction", not only the snapshot that 
> triggered this call fail, but also any subsequent write requests will fail 
> also, rendering such sink unusable until application restart.
> This issue is in code that hasn't been released yet.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4561: [FLINK-7476][streaming] Continue using previous tr...

2017-08-18 Thread pnowojski
GitHub user pnowojski opened a pull request:

https://github.com/apache/flink/pull/4561

 [FLINK-7476][streaming] Continue using previous transaction on failures

First commit comes from #4557.

Previously when using TwoPhaseCommitSinkFunction, if there was some 
intermittent failure in "beginTransaction", not only the snapshot that 
triggered this call failed, but also any subsequent write requests would fail 
also. This caused such sink unusable until application restart.

This PR changes order of execution of the methods from `PublicEvolving` 
class that has not been yet released.

PR is covered by existing tests in `TwoPhaseCommitSinkFunctionTest` as well 
as two additional test cases (`testContinueWorkOnBeginTransactionFailure` would 
be failing before this PR).

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/pnowojski/flink 2phase-recover

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4561.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4561


commit 813c8121b406b60ea478f4765b33b7d75c221d1e
Author: Piotr Nowojski 
Date:   2017-08-14T14:40:45Z

[hotifx][streaming] Simplify state of TwoPhaseCommitSinkFunction

commit 249b6419af3d15414dc411838aba624d0ee2f3a1
Author: Piotr Nowojski 
Date:   2017-08-14T13:09:39Z

[hotfix][tests] Implement AutoCloseable in TestHarness

commit a0ae6324dcaded581a3352c8ff4bae6e86e01fde
Author: Piotr Nowojski 
Date:   2017-08-17T13:46:47Z

[hotfix][streaming] Refactor TwoPhaseCommitSinkFunctionTest

commit e3c7dc83ccbce2505be5769fa0827b09dfa54875
Author: Piotr Nowojski 
Date:   2017-08-17T10:29:16Z

[FLINK-7476][streaming] Continue using previous transaction on failures

Previuosly when using TwoPhaseCommitSinkFunction, if there was some 
intermittent failure
in "beginTransaction", not only the snapshot that triggered this call 
failed, but also
any subsequent write requests would fail also. This caused such sink 
unusable until
application restart.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-7476) Try to recover from failure in TwoPhaseCommitSinkFunction.beginTransaction

2017-08-18 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7476?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-7476:

Affects Version/s: 1.4.0

> Try to recover from failure in TwoPhaseCommitSinkFunction.beginTransaction
> --
>
> Key: FLINK-7476
> URL: https://issues.apache.org/jira/browse/FLINK-7476
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.4.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
> Fix For: 1.4.0
>
>
> Currently when using TwoPhaseCommitSinkFunction, if there is some 
> intermittent failure in "beginTransaction", not only the snapshot that 
> triggered this call fail, but also any subsequent write requests will fail 
> also, rendering such sink unusable until application restart.
> This issue is in code that hasn't been released yet.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7476) Try to recover from failure in TwoPhaseCommitSinkFunction.beginTransaction

2017-08-18 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7476?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-7476:

Fix Version/s: 1.4.0

> Try to recover from failure in TwoPhaseCommitSinkFunction.beginTransaction
> --
>
> Key: FLINK-7476
> URL: https://issues.apache.org/jira/browse/FLINK-7476
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.4.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
> Fix For: 1.4.0
>
>
> Currently when using TwoPhaseCommitSinkFunction, if there is some 
> intermittent failure in "beginTransaction", not only the snapshot that 
> triggered this call fail, but also any subsequent write requests will fail 
> also, rendering such sink unusable until application restart.
> This issue is in code that hasn't been released yet.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7442) Add option for using a child-first classloader for loading user code

2017-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7442?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16131889#comment-16131889
 ] 

ASF GitHub Bot commented on FLINK-7442:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/4554
  
Ah, I see - that is probably related to reading resources from a jar file 
(like the config), methods like `getResourceAsStream()`.
I can see that the "delegate to parent after child" lookup logic might not 
quite work for that, yes.

I think that particular bug you encountered there is almost an argument to 
not change the logic on the client, yet. Since we would run the entire Flink 
code through that classloader as well, the implications are trickier than in 
the runtime task, where we only instantiate the user code functions.


> Add option for using a child-first classloader for loading user code
> 
>
> Key: FLINK-7442
> URL: https://issues.apache.org/jira/browse/FLINK-7442
> Project: Flink
>  Issue Type: Improvement
>  Components: Local Runtime
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


  1   2   >