[jira] [Commented] (FLINK-10205) Batch Job: InputSplit Fault tolerant for DataSourceTask

2018-08-23 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16591218#comment-16591218
 ] 

vinoyang commented on FLINK-10205:
--

[~isunjin] Can you assign this issue to yourself? If not, you may need to apply 
for the Contributor permission first.

> Batch Job: InputSplit Fault tolerant for DataSourceTask
> ---
>
> Key: FLINK-10205
> URL: https://issues.apache.org/jira/browse/FLINK-10205
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Reporter: JIN SUN
>Priority: Major
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Today DataSource Task pull InputSplits from JobManager to achieve better 
> performance, however, when a DataSourceTask failed and rerun, it will not get 
> the same splits as its previous version. this will introduce inconsistent 
> result or even data corruption.
> Furthermore,  if there are two executions run at the same time (in batch 
> scenario), this two executions should process same splits.
> we need to fix the issue to make the inputs of a DataSourceTask 
> deterministic. The propose is save all splits into ExecutionVertex and 
> DataSourceTask will pull split from there.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] asfgit closed pull request #6609: [hotfix][docs] missing breaking metrics page

2018-08-23 Thread GitBox
asfgit closed pull request #6609: [hotfix][docs] missing  breaking metrics 
page
URL: https://github.com/apache/flink/pull/6609
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/monitoring/metrics.md b/docs/monitoring/metrics.md
index a93853c14dc..7d88a36393c 100644
--- a/docs/monitoring/metrics.md
+++ b/docs/monitoring/metrics.md
@@ -1486,6 +1486,7 @@ Thus, in order to infer the metric identifier:
   bytesRequestedPerFetch
   stream, shardId
   The bytes requested (2 Mbps / loopFrequencyHz) in a single call to 
getRecords.
+  
   Gauge
 
   


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-7243) Add ParquetInputFormat

2018-08-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16591208#comment-16591208
 ] 

ASF GitHub Bot commented on FLINK-7243:
---

docete commented on a change in pull request #6483: [FLINK-7243][flink-formats] 
Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r212531176
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetInputFormat.java
 ##
 @@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet;
+
+import org.apache.flink.api.common.io.CheckpointableInputFormat;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.parquet.utils.ParquetRecordReader;
+import org.apache.flink.formats.parquet.utils.ParquetSchemaConverter;
+import org.apache.flink.formats.parquet.utils.RowReadSupport;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.schema.MessageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * The base InputFormat class to read from Parquet files.
+ * For specific return types the {@link #convert(Row)} method need to be 
implemented.
+ *
+ * Using {@link ParquetRecordReader} to Read files instead of {@link 
org.apache.flink.core.fs.FSDataInputStream},
+ * we override {@link #open(FileInputSplit)} and {@link #close()} to change 
the behaviors.
+ *
+ * @param  The type of record to read.
+ */
+public abstract class ParquetInputFormat extends FileInputFormat 
implements
 
 Review comment:
   Should we support filter pushdown?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add ParquetInputFormat
> --
>
> Key: FLINK-7243
> URL: https://issues.apache.org/jira/browse/FLINK-7243
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: godfrey he
>Assignee: Zhenqiu Huang
>Priority: Major
>  Labels: pull-request-available
>
> Add a {{ParquetInputFormat}} to read data from a Apache Parquet file. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-7243) Add ParquetInputFormat

2018-08-23 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-7243?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-7243:
--
Labels: pull-request-available  (was: )

> Add ParquetInputFormat
> --
>
> Key: FLINK-7243
> URL: https://issues.apache.org/jira/browse/FLINK-7243
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: godfrey he
>Assignee: Zhenqiu Huang
>Priority: Major
>  Labels: pull-request-available
>
> Add a {{ParquetInputFormat}} to read data from a Apache Parquet file. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-7243) Add ParquetInputFormat

2018-08-23 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-7243?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-7243:
--
Labels: pull-request-available  (was: )

> Add ParquetInputFormat
> --
>
> Key: FLINK-7243
> URL: https://issues.apache.org/jira/browse/FLINK-7243
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: godfrey he
>Assignee: Zhenqiu Huang
>Priority: Major
>  Labels: pull-request-available
>
> Add a {{ParquetInputFormat}} to read data from a Apache Parquet file. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7243) Add ParquetInputFormat

2018-08-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16591206#comment-16591206
 ] 

ASF GitHub Bot commented on FLINK-7243:
---

docete commented on a change in pull request #6483: [FLINK-7243][flink-formats] 
Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r212530605
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverter.java
 ##
 @@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet.utils;
+
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+
+import org.apache.parquet.avro.AvroSchemaConverter;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
+import org.apache.parquet.schema.Types;
+
+import java.lang.reflect.Array;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Schema converter converts Parquet schema to and from Flink internal types.
+ */
+public class ParquetSchemaConverter {
+   public static final String MAP_KEY = "key";
+   public static final String MAP_VALUE = "value";
+   public static final String LIST_ELEMENT = "array";
+   public static final String MESSAGE_ROOT = "root";
+   private static final AvroSchemaConverter SCHEMA_CONVERTER = new 
AvroSchemaConverter();
+
+   public static TypeInformation fromParquetType(MessageType type) {
+   return convertFields(type.getFields());
+   }
+
+   public static MessageType toParquetType(TypeInformation 
typeInformation) {
+   return (MessageType) convertField(null, typeInformation, 
Type.Repetition.OPTIONAL);
+   }
+
+   private static TypeInformation convertFields(List 
parquetFields) {
+   List> types = new ArrayList<>();
+   List names = new ArrayList<>();
+   for (Type field : parquetFields) {
+   TypeInformation subType = convertField(field);
+   if (subType != null) {
+   types.add(subType);
+   names.add(field.getName());
+   }
+   }
+
+   return new RowTypeInfo(types.toArray(new 
TypeInformation[types.size()]),
+   names.toArray(new String[names.size()]));
+   }
+
+   private static TypeInformation convertField(final Type fieldType) {
+   TypeInformation typeInfo = null;
+   if (fieldType.isPrimitive()) {
+   PrimitiveType primitiveType = 
fieldType.asPrimitiveType();
 
 Review comment:
   How about support more types, SqlTimeTypeInfo, Decimal, etc


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add ParquetInputFormat
> --
>
> Key: FLINK-7243
> URL: https://issues.apache.org/jira/browse/FLINK-7243
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: godfrey he
>Assignee: Zhenqiu Huang
>Priority: Major
>  Labels: pull-request-available
>
> Add a {{ParquetInputFormat}} to read data from a Apache Parquet file. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-7243) Add ParquetInputFormat

2018-08-23 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-7243?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-7243:
--
Labels: pull-request-available  (was: )

> Add ParquetInputFormat
> --
>
> Key: FLINK-7243
> URL: https://issues.apache.org/jira/browse/FLINK-7243
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: godfrey he
>Assignee: Zhenqiu Huang
>Priority: Major
>  Labels: pull-request-available
>
> Add a {{ParquetInputFormat}} to read data from a Apache Parquet file. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7243) Add ParquetInputFormat

2018-08-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16591207#comment-16591207
 ] 

ASF GitHub Bot commented on FLINK-7243:
---

docete commented on a change in pull request #6483: [FLINK-7243][flink-formats] 
Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r212530857
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetRowInputFormat.java
 ##
 @@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet;
+
+import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.Row;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * A subclass of {@link ParquetInputFormat} to read from Parquet files and 
convert to {@link Row}.
+ * It is mainly used to integrate with table API and batch SQL.
+ */
+public class ParquetRowInputFormat extends ParquetInputFormat implements 
ResultTypeQueryable {
 
 Review comment:
   Offer a tablesource for SQL/Table API ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add ParquetInputFormat
> --
>
> Key: FLINK-7243
> URL: https://issues.apache.org/jira/browse/FLINK-7243
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: godfrey he
>Assignee: Zhenqiu Huang
>Priority: Major
>  Labels: pull-request-available
>
> Add a {{ParquetInputFormat}} to read data from a Apache Parquet file. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] docete commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format

2018-08-23 Thread GitBox
docete commented on a change in pull request #6483: [FLINK-7243][flink-formats] 
Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r212530605
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverter.java
 ##
 @@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet.utils;
+
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+
+import org.apache.parquet.avro.AvroSchemaConverter;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
+import org.apache.parquet.schema.Types;
+
+import java.lang.reflect.Array;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Schema converter converts Parquet schema to and from Flink internal types.
+ */
+public class ParquetSchemaConverter {
+   public static final String MAP_KEY = "key";
+   public static final String MAP_VALUE = "value";
+   public static final String LIST_ELEMENT = "array";
+   public static final String MESSAGE_ROOT = "root";
+   private static final AvroSchemaConverter SCHEMA_CONVERTER = new 
AvroSchemaConverter();
+
+   public static TypeInformation fromParquetType(MessageType type) {
+   return convertFields(type.getFields());
+   }
+
+   public static MessageType toParquetType(TypeInformation 
typeInformation) {
+   return (MessageType) convertField(null, typeInformation, 
Type.Repetition.OPTIONAL);
+   }
+
+   private static TypeInformation convertFields(List 
parquetFields) {
+   List> types = new ArrayList<>();
+   List names = new ArrayList<>();
+   for (Type field : parquetFields) {
+   TypeInformation subType = convertField(field);
+   if (subType != null) {
+   types.add(subType);
+   names.add(field.getName());
+   }
+   }
+
+   return new RowTypeInfo(types.toArray(new 
TypeInformation[types.size()]),
+   names.toArray(new String[names.size()]));
+   }
+
+   private static TypeInformation convertField(final Type fieldType) {
+   TypeInformation typeInfo = null;
+   if (fieldType.isPrimitive()) {
+   PrimitiveType primitiveType = 
fieldType.asPrimitiveType();
 
 Review comment:
   How about support more types, SqlTimeTypeInfo, Decimal, etc


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] docete commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format

2018-08-23 Thread GitBox
docete commented on a change in pull request #6483: [FLINK-7243][flink-formats] 
Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r212531176
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetInputFormat.java
 ##
 @@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet;
+
+import org.apache.flink.api.common.io.CheckpointableInputFormat;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.parquet.utils.ParquetRecordReader;
+import org.apache.flink.formats.parquet.utils.ParquetSchemaConverter;
+import org.apache.flink.formats.parquet.utils.RowReadSupport;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.schema.MessageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * The base InputFormat class to read from Parquet files.
+ * For specific return types the {@link #convert(Row)} method need to be 
implemented.
+ *
+ * Using {@link ParquetRecordReader} to Read files instead of {@link 
org.apache.flink.core.fs.FSDataInputStream},
+ * we override {@link #open(FileInputSplit)} and {@link #close()} to change 
the behaviors.
+ *
+ * @param  The type of record to read.
+ */
+public abstract class ParquetInputFormat extends FileInputFormat 
implements
 
 Review comment:
   Should we support filter pushdown?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] docete commented on a change in pull request #6483: [FLINK-7243][flink-formats] Add parquet input format

2018-08-23 Thread GitBox
docete commented on a change in pull request #6483: [FLINK-7243][flink-formats] 
Add parquet input format
URL: https://github.com/apache/flink/pull/6483#discussion_r212530857
 
 

 ##
 File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetRowInputFormat.java
 ##
 @@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet;
+
+import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.Row;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * A subclass of {@link ParquetInputFormat} to read from Parquet files and 
convert to {@link Row}.
+ * It is mainly used to integrate with table API and batch SQL.
+ */
+public class ParquetRowInputFormat extends ParquetInputFormat implements 
ResultTypeQueryable {
 
 Review comment:
   Offer a tablesource for SQL/Table API ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10202) Enable configuration for state.checkpoint.dir in StreamExecutionEnvironment

2018-08-23 Thread buptljy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10202?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16591188#comment-16591188
 ] 

buptljy commented on FLINK-10202:
-

[~StephanEwen] Here is my code. I think the checkpoint directory in 
CheckpointCoordinator is still empty even if you set direcotry in 
FsStateBackend.
{code:java}
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.enableCheckpointing(15000)
env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
env.setStateBackend(new FsStateBackend(checkpointDir)){code}
 1. If I don't set state.checkpoint.dir, it will throw exception above.

 2. If i remove the third line, it will be okay.

> Enable configuration for state.checkpoint.dir in StreamExecutionEnvironment
> ---
>
> Key: FLINK-10202
> URL: https://issues.apache.org/jira/browse/FLINK-10202
> Project: Flink
>  Issue Type: Improvement
>Reporter: buptljy
>Priority: Major
>
> Usually we can set state.checkpoint.dir in flink-conf.yaml, but sometimes we 
> run a flink job locally, and we're not able to set state.checkpoint.dir for 
> background wrapped cluster, which will cause 
> {code:java}
> throw new IllegalStateException("CheckpointConfig says to persist periodic " +
>   "checkpoints, but no checkpoint directory has been configured. You can 
> " +
>   "configure configure one via key '" + 
> ConfigConstants.CHECKPOINTS_DIRECTORY_KEY + "'.");
> {code}
> I wonder if we could provide a public method in *StreamExecutionEnvironment* 
> so that developers can use it to set state.checkpoint.dir for job.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10205) Batch Job: InputSplit Fault tolerant for DataSourceTask

2018-08-23 Thread JIN SUN (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16591066#comment-16591066
 ] 

JIN SUN commented on FLINK-10205:
-

I would like to work on this issue

> Batch Job: InputSplit Fault tolerant for DataSourceTask
> ---
>
> Key: FLINK-10205
> URL: https://issues.apache.org/jira/browse/FLINK-10205
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Reporter: JIN SUN
>Priority: Major
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Today DataSource Task pull InputSplits from JobManager to achieve better 
> performance, however, when a DataSourceTask failed and rerun, it will not get 
> the same splits as its previous version. this will introduce inconsistent 
> result or even data corruption.
> Furthermore,  if there are two executions run at the same time (in batch 
> scenario), this two executions should process same splits.
> we need to fix the issue to make the inputs of a DataSourceTask 
> deterministic. The propose is save all splits into ExecutionVertex and 
> DataSourceTask will pull split from there.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10205) Batch Job: InputSplit Fault tolerant for DataSourceTask

2018-08-23 Thread JIN SUN (JIRA)
JIN SUN created FLINK-10205:
---

 Summary: Batch Job: InputSplit Fault tolerant for DataSourceTask
 Key: FLINK-10205
 URL: https://issues.apache.org/jira/browse/FLINK-10205
 Project: Flink
  Issue Type: Sub-task
  Components: JobManager
Reporter: JIN SUN


Today DataSource Task pull InputSplits from JobManager to achieve better 
performance, however, when a DataSourceTask failed and rerun, it will not get 
the same splits as its previous version. this will introduce inconsistent 
result or even data corruption.

Furthermore,  if there are two executions run at the same time (in batch 
scenario), this two executions should process same splits.

we need to fix the issue to make the inputs of a DataSourceTask deterministic. 
The propose is save all splits into ExecutionVertex and DataSourceTask will 
pull split from there.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10204) Job is marked as FAILED after serialization exception

2018-08-23 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10204?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-10204:
---
Labels: pull-request-available  (was: )

> Job is marked as FAILED after serialization exception
> -
>
> Key: FLINK-10204
> URL: https://issues.apache.org/jira/browse/FLINK-10204
> Project: Flink
>  Issue Type: Bug
>Reporter: Ben La Monica
>Priority: Major
>  Labels: pull-request-available
>
> We have a long running flink job that eventually fails and is shut down due 
> to an internal serialization exception that we keep on getting. Here is the 
> stack trace:
> {code:java}
> 2018-08-23 18:39:48,199 INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph - Job NAV Estimation 
> (4b5463d76167f9f5aac83a890e8a867d) switched from state FAILING to FAILED.
> java.io.IOException: Corrupt stream, found tag: 127
> at 
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:219)
> at 
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:49)
> at 
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
> at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:140)
> at 
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:206)
> at 
> org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:115)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
> at java.lang.Thread.run(Thread.java:748){code}
>  
> I think I have tracked down the issue to a mismatch in the 
> serialization/deserialization/copy code in the StreamElementSerializer with 
> regards to the LATENCY_MARKER.
> The Serialization logic writes 3 LONGs and an INT. The copy logic only writes 
> (and reads) a LONG and 2 INTs. Adding a test for the LatencyMarker throws an 
> EOFException, and fixing the copy code causes the test to pass again.
> I've written a unit test that highlights the problem, and have written the 
> code to correct it.
> I'll submit a PR that goes along with it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] benlamonica opened a new pull request #6610: [FLINK-10204] - fix serialization/copy error for LatencyMarker records.

2018-08-23 Thread GitBox
benlamonica opened a new pull request #6610: [FLINK-10204] - fix 
serialization/copy error for LatencyMarker records.
URL: https://github.com/apache/flink/pull/6610
 
 
   *Thank you very much for contributing to Apache Flink - we are happy that 
you want to help us improve Flink. To help the community review your 
contribution in the best possible way, please go through the checklist below, 
which will get the contribution into a shape in which it can be best reviewed.*
   
   *Please understand that we do not do this to make contributions to Flink a 
hassle. In order to uphold a high standard of quality for code contributions, 
while at the same time managing a large number of contributions, we need 
contributors to prepare the contributions well, and give reviewers enough 
contextual information for the review. Please also understand that 
contributions that do not follow this guide will take longer to review and thus 
typically be picked up with lower priority by the community.*
   
   ## Contribution Checklist
   
 - Make sure that the pull request corresponds to a [JIRA 
issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are 
made for typos in JavaDoc or documentation files, which need no JIRA issue.
 
 - Name the pull request in the form "[FLINK-] [component] Title of the 
pull request", where *FLINK-* should be replaced by the actual issue 
number. Skip *component* if you are unsure about which is the best component.
 Typo fixes that have no associated JIRA issue should be named following 
this pattern: `[hotfix] [docs] Fix typo in event time introduction` or 
`[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`.
   
 - Fill out the template below to describe the changes contributed by the 
pull request. That will give reviewers the context they need to do the review.
 
 - Make sure that the change passes the automated tests, i.e., `mvn clean 
verify` passes. You can set up Travis CI to do that following [this 
guide](http://flink.apache.org/contribute-code.html#best-practices).
   
 - Each pull request should address only one issue, not mix up code from 
multiple issues.
 
 - Each commit in the pull request has a meaningful commit message 
(including the JIRA id)
   
 - Once all items of the checklist are addressed, remove the above text and 
this checklist, leaving only the filled out template below.
   
   
   **(The sections below can be removed for hotfixes of typos)**
   
   ## What is the purpose of the change
   
   Fixes a serialization error with regards to the LatencyMarker not having the 
same binary format when serializing, and when copying the byte stream.  The 
following exception occurs after a long running flink job has been running for 
a while:
   
   `2018-08-23 18:39:48,199 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- Job NAV 
Estimation (4b5463d76167f9f5aac83a890e8a867d) switched from state FAILING to 
FAILED.
   java.io.IOException: Corrupt stream, found tag: 127
at 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:219)
at 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:49)
at 
org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
at 
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:140)
at 
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:206)
at 
org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:115)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
at java.lang.Thread.run(Thread.java:748)`
   
   ## Brief change log
 - *Added LatencyMarker to the StreamElementSerializerTest and then fixed 
the failure by correcting the StreamElementSerializer::copy method.
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
- * Commenting out the fix in the class will show that when serializing and 
deserializing, you will encounter an EOFException due to the incorrect copy*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: yes (modified the equals/hashCode on the LatencyMarker 
class so that the test would pass)
 - The serializers: yes
 - The runtime per-record code paths (performance sensitive): don't know
 - Anything that affects deployment or recovery: JobManager (and its 
com

[jira] [Commented] (FLINK-10204) Job is marked as FAILED after serialization exception

2018-08-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10204?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16590863#comment-16590863
 ] 

ASF GitHub Bot commented on FLINK-10204:


benlamonica opened a new pull request #6610: [FLINK-10204] - fix 
serialization/copy error for LatencyMarker records.
URL: https://github.com/apache/flink/pull/6610
 
 
   *Thank you very much for contributing to Apache Flink - we are happy that 
you want to help us improve Flink. To help the community review your 
contribution in the best possible way, please go through the checklist below, 
which will get the contribution into a shape in which it can be best reviewed.*
   
   *Please understand that we do not do this to make contributions to Flink a 
hassle. In order to uphold a high standard of quality for code contributions, 
while at the same time managing a large number of contributions, we need 
contributors to prepare the contributions well, and give reviewers enough 
contextual information for the review. Please also understand that 
contributions that do not follow this guide will take longer to review and thus 
typically be picked up with lower priority by the community.*
   
   ## Contribution Checklist
   
 - Make sure that the pull request corresponds to a [JIRA 
issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are 
made for typos in JavaDoc or documentation files, which need no JIRA issue.
 
 - Name the pull request in the form "[FLINK-] [component] Title of the 
pull request", where *FLINK-* should be replaced by the actual issue 
number. Skip *component* if you are unsure about which is the best component.
 Typo fixes that have no associated JIRA issue should be named following 
this pattern: `[hotfix] [docs] Fix typo in event time introduction` or 
`[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`.
   
 - Fill out the template below to describe the changes contributed by the 
pull request. That will give reviewers the context they need to do the review.
 
 - Make sure that the change passes the automated tests, i.e., `mvn clean 
verify` passes. You can set up Travis CI to do that following [this 
guide](http://flink.apache.org/contribute-code.html#best-practices).
   
 - Each pull request should address only one issue, not mix up code from 
multiple issues.
 
 - Each commit in the pull request has a meaningful commit message 
(including the JIRA id)
   
 - Once all items of the checklist are addressed, remove the above text and 
this checklist, leaving only the filled out template below.
   
   
   **(The sections below can be removed for hotfixes of typos)**
   
   ## What is the purpose of the change
   
   Fixes a serialization error with regards to the LatencyMarker not having the 
same binary format when serializing, and when copying the byte stream.  The 
following exception occurs after a long running flink job has been running for 
a while:
   
   `2018-08-23 18:39:48,199 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- Job NAV 
Estimation (4b5463d76167f9f5aac83a890e8a867d) switched from state FAILING to 
FAILED.
   java.io.IOException: Corrupt stream, found tag: 127
at 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:219)
at 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:49)
at 
org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
at 
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:140)
at 
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:206)
at 
org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:115)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
at java.lang.Thread.run(Thread.java:748)`
   
   ## Brief change log
 - *Added LatencyMarker to the StreamElementSerializerTest and then fixed 
the failure by correcting the StreamElementSerializer::copy method.
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
- * Commenting out the fix in the class will show that when serializing and 
deserializing, you will encounter an EOFException due to the incorrect copy*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: yes (modified

[jira] [Created] (FLINK-10204) Job is marked as FAILED after serialization exception

2018-08-23 Thread Ben La Monica (JIRA)
Ben La Monica created FLINK-10204:
-

 Summary: Job is marked as FAILED after serialization exception
 Key: FLINK-10204
 URL: https://issues.apache.org/jira/browse/FLINK-10204
 Project: Flink
  Issue Type: Bug
Reporter: Ben La Monica


We have a long running flink job that eventually fails and is shut down due to 
an internal serialization exception that we keep on getting. Here is the stack 
trace:
{code:java}
2018-08-23 18:39:48,199 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph - Job NAV Estimation 
(4b5463d76167f9f5aac83a890e8a867d) switched from state FAILING to FAILED.
java.io.IOException: Corrupt stream, found tag: 127
at 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:219)
at 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:49)
at 
org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
at 
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:140)
at 
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:206)
at 
org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:115)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
at java.lang.Thread.run(Thread.java:748){code}
 

I think I have tracked down the issue to a mismatch in the 
serialization/deserialization/copy code in the StreamElementSerializer with 
regards to the LATENCY_MARKER.

The Serialization logic writes 3 LONGs and an INT. The copy logic only writes 
(and reads) a LONG and 2 INTs. Adding a test for the LatencyMarker throws an 
EOFException, and fixing the copy code causes the test to pass again.

I've written a unit test that highlights the problem, and have written the code 
to correct it.

I'll submit a PR that goes along with it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8532) RebalancePartitioner should use Random value for its first partition

2018-08-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16590701#comment-16590701
 ] 

ASF GitHub Bot commented on FLINK-8532:
---

Guibo-Pan edited a comment on issue #6544: [FLINK-8532] [Streaming] modify 
RebalancePartitioner to use a random partition as its first partition
URL: https://github.com/apache/flink/pull/6544#issuecomment-415499383
 
 
   Hi @StephanEwen , your suggestion lead me to deep thinks, and the extreme 
performance is exactly what we want. I am going to ask you for more 
suggestions. I prefer to initialize the partitioner instance with a random 
partition, however in the design ahead, the partitioner doesn't know the target 
range.
   The alternative is like this:
   ```
   private final int[] returnArray = new int[] 
{ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE - 1)};
   
   @Override
   public int[] selectChannels(SerializationDelegate> record,
int numberOfOutputChannels) {
this.returnArray[0] = (this.returnArray[0] + 1) % 
numberOfOutputChannels;
return this.returnArray;
   }
   ```
Please tell me how you think, thanks.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> RebalancePartitioner should use Random value for its first partition
> 
>
> Key: FLINK-8532
> URL: https://issues.apache.org/jira/browse/FLINK-8532
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Yuta Morisawa
>Assignee: Guibo Pan
>Priority: Major
>  Labels: pull-request-available
>
> In some conditions, RebalancePartitioner doesn't balance data correctly 
> because it use the same value for selecting next operators.
> RebalancePartitioner initializes its partition id using the same value in 
> every threads, so it indeed balances data, but at one moment the amount of 
> data in each operator is skew.
> Particularly, when the data rate of  former operators is equal , data skew 
> becomes severe.
>  
>  
> Example:
> Consider a simple operator chain.
> -> map1 -> rebalance -> map2 ->
> Each map operator(map1, map2) contains three subtasks(subtask 1, 2, 3, 4, 5, 
> 6).
> map1          map2
>  st1              st4
>  st2              st5
>  st3              st6
>  
> At the beginning, every subtasks in map1 sends data to st4 in map2 because 
> they use the same initial parition id.
> Next time the map1 receive data st1,2,3 send data to st5 because they 
> increment its partition id when they processed former data.
> In my environment,  it takes twice the time to process data when I use 
> RebalancePartitioner  as long as I use other partitioners(rescale, keyby).
>  
> To solve this problem, in my opinion, RebalancePartitioner should use its own 
> operator id for the initial value.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] Guibo-Pan edited a comment on issue #6544: [FLINK-8532] [Streaming] modify RebalancePartitioner to use a random partition as its first partition

2018-08-23 Thread GitBox
Guibo-Pan edited a comment on issue #6544: [FLINK-8532] [Streaming] modify 
RebalancePartitioner to use a random partition as its first partition
URL: https://github.com/apache/flink/pull/6544#issuecomment-415499383
 
 
   Hi @StephanEwen , your suggestion lead me to deep thinks, and the extreme 
performance is exactly what we want. I am going to ask you for more 
suggestions. I prefer to initialize the partitioner instance with a random 
partition, however in the design ahead, the partitioner doesn't know the target 
range.
   The alternative is like this:
   ```
   private final int[] returnArray = new int[] 
{ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE - 1)};
   
   @Override
   public int[] selectChannels(SerializationDelegate> record,
int numberOfOutputChannels) {
this.returnArray[0] = (this.returnArray[0] + 1) % 
numberOfOutputChannels;
return this.returnArray;
   }
   ```
Please tell me how you think, thanks.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10186) FindBugs warnings: Random object created and used only once

2018-08-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10186?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16590696#comment-16590696
 ] 

ASF GitHub Bot commented on FLINK-10186:


Guibo-Pan commented on a change in pull request #6591: [FLINK-10186] Fix 
FindBugs warnings: Random object created and used o…
URL: https://github.com/apache/flink/pull/6591#discussion_r212410516
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
 ##
 @@ -115,7 +115,7 @@ public BufferSpiller(IOManager ioManager, int pageSize) 
throws IOException {
this.tempDir = tempDirs[DIRECTORY_INDEX.getAndIncrement() % 
tempDirs.length];
 
byte[] rndBytes = new byte[32];
-   new Random().nextBytes(rndBytes);
 
 Review comment:
   It's inside constructor method, and just randoms for once in an instance.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> FindBugs warnings: Random object created and used only once
> ---
>
> Key: FLINK-10186
> URL: https://issues.apache.org/jira/browse/FLINK-10186
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Reporter: Hiroaki Yoshida
>Priority: Major
>  Labels: pull-request-available
>
> FindBugs-3.0.1 ([http://findbugs.sourceforge.net/]) reported a 
> DMI_RANDOM_USED_ONLY_ONCE warning on master:
> {code:java}
> H B DMI: Random object created and used only once in new 
> org.apache.flink.streaming.runtime.io.BufferSpiller(IOManager, int)  At 
> BufferSpiller.java:[line 118]
> {code}
> The description of the bug is as follows:
> {quote}*DMI: Random object created and used only once 
> (DMI_RANDOM_USED_ONLY_ONCE)*
> This code creates a java.util.Random object, uses it to generate one random 
> number, and then discards the Random object. This produces mediocre quality 
> random numbers and is inefficient. If possible, rewrite the code so that the 
> Random object is created once and saved, and each time a new random number is 
> required invoke a method on the existing Random object to obtain it.
> If it is important that the generated Random numbers not be guessable, you 
> must not create a new Random for each random number; the values are too 
> easily guessable. You should strongly consider using a 
> java.security.SecureRandom instead (and avoid allocating a new SecureRandom 
> for each random number needed).
> [http://findbugs.sourceforge.net/bugDescriptions.html#DMI_RANDOM_USED_ONLY_ONCE]
> {quote}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] Guibo-Pan commented on a change in pull request #6591: [FLINK-10186] Fix FindBugs warnings: Random object created and used o…

2018-08-23 Thread GitBox
Guibo-Pan commented on a change in pull request #6591: [FLINK-10186] Fix 
FindBugs warnings: Random object created and used o…
URL: https://github.com/apache/flink/pull/6591#discussion_r212410516
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
 ##
 @@ -115,7 +115,7 @@ public BufferSpiller(IOManager ioManager, int pageSize) 
throws IOException {
this.tempDir = tempDirs[DIRECTORY_INDEX.getAndIncrement() % 
tempDirs.length];
 
byte[] rndBytes = new byte[32];
-   new Random().nextBytes(rndBytes);
 
 Review comment:
   It's inside constructor method, and just randoms for once in an instance.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10203) Support truncate method for old Hadoop versions in HadoopRecoverableFsDataOutputStream

2018-08-23 Thread Kostas Kloudas (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10203?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16590683#comment-16590683
 ] 

Kostas Kloudas commented on FLINK-10203:


No problem [~artsem.semianenka]! 
I assigned the issue to you. 

> Support truncate method for old Hadoop versions in 
> HadoopRecoverableFsDataOutputStream
> --
>
> Key: FLINK-10203
> URL: https://issues.apache.org/jira/browse/FLINK-10203
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, filesystem-connector
>Affects Versions: 1.6.0, 1.6.1, 1.7.0
>Reporter: Artsem Semianenka
>Assignee: Artsem Semianenka
>Priority: Major
>  Labels: pull-request-available
>
> New StreamingFileSink ( introduced in 1.6 Flink version ) use 
> HadoopRecoverableFsDataOutputStream wrapper to write data in HDFS.
> HadoopRecoverableFsDataOutputStream is a wrapper for FSDataOutputStream to 
> have an ability to restore from certain point of file after failure and 
> continue write data. To achieve this recover functionality the 
> HadoopRecoverableFsDataOutputStream use "truncate" method which was 
> introduced only in Hadoop 2.7 .
> Unfortunately there are a few official Hadoop distributive which latest 
> version still use Hadoop 2.6 (This distributives: Cloudera, Pivotal HD ). As 
> the result Flinks Hadoop connector can't work with this distributives.
> Flink declares that supported Hadoop from version 2.4.0 upwards 
> ([https://ci.apache.org/projects/flink/flink-docs-release-1.6/start/building.html#hadoop-versions])
> I guess we should emulate the functionality of "truncate" method for older 
> Hadoop versions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-10203) Support truncate method for old Hadoop versions in HadoopRecoverableFsDataOutputStream

2018-08-23 Thread Kostas Kloudas (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10203?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kostas Kloudas reassigned FLINK-10203:
--

Assignee: Artsem Semianenka

> Support truncate method for old Hadoop versions in 
> HadoopRecoverableFsDataOutputStream
> --
>
> Key: FLINK-10203
> URL: https://issues.apache.org/jira/browse/FLINK-10203
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, filesystem-connector
>Affects Versions: 1.6.0, 1.6.1, 1.7.0
>Reporter: Artsem Semianenka
>Assignee: Artsem Semianenka
>Priority: Major
>  Labels: pull-request-available
>
> New StreamingFileSink ( introduced in 1.6 Flink version ) use 
> HadoopRecoverableFsDataOutputStream wrapper to write data in HDFS.
> HadoopRecoverableFsDataOutputStream is a wrapper for FSDataOutputStream to 
> have an ability to restore from certain point of file after failure and 
> continue write data. To achieve this recover functionality the 
> HadoopRecoverableFsDataOutputStream use "truncate" method which was 
> introduced only in Hadoop 2.7 .
> Unfortunately there are a few official Hadoop distributive which latest 
> version still use Hadoop 2.6 (This distributives: Cloudera, Pivotal HD ). As 
> the result Flinks Hadoop connector can't work with this distributives.
> Flink declares that supported Hadoop from version 2.4.0 upwards 
> ([https://ci.apache.org/projects/flink/flink-docs-release-1.6/start/building.html#hadoop-versions])
> I guess we should emulate the functionality of "truncate" method for older 
> Hadoop versions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10186) FindBugs warnings: Random object created and used only once

2018-08-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10186?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16590646#comment-16590646
 ] 

ASF GitHub Bot commented on FLINK-10186:


Guibo-Pan commented on a change in pull request #6591: [FLINK-10186] Fix 
FindBugs warnings: Random object created and used o…
URL: https://github.com/apache/flink/pull/6591#discussion_r212410516
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
 ##
 @@ -115,7 +115,7 @@ public BufferSpiller(IOManager ioManager, int pageSize) 
throws IOException {
this.tempDir = tempDirs[DIRECTORY_INDEX.getAndIncrement() % 
tempDirs.length];
 
byte[] rndBytes = new byte[32];
-   new Random().nextBytes(rndBytes);
 
 Review comment:
   It's inside constructor method, and just randoms for once in an instance.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> FindBugs warnings: Random object created and used only once
> ---
>
> Key: FLINK-10186
> URL: https://issues.apache.org/jira/browse/FLINK-10186
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Reporter: Hiroaki Yoshida
>Priority: Major
>  Labels: pull-request-available
>
> FindBugs-3.0.1 ([http://findbugs.sourceforge.net/]) reported a 
> DMI_RANDOM_USED_ONLY_ONCE warning on master:
> {code:java}
> H B DMI: Random object created and used only once in new 
> org.apache.flink.streaming.runtime.io.BufferSpiller(IOManager, int)  At 
> BufferSpiller.java:[line 118]
> {code}
> The description of the bug is as follows:
> {quote}*DMI: Random object created and used only once 
> (DMI_RANDOM_USED_ONLY_ONCE)*
> This code creates a java.util.Random object, uses it to generate one random 
> number, and then discards the Random object. This produces mediocre quality 
> random numbers and is inefficient. If possible, rewrite the code so that the 
> Random object is created once and saved, and each time a new random number is 
> required invoke a method on the existing Random object to obtain it.
> If it is important that the generated Random numbers not be guessable, you 
> must not create a new Random for each random number; the values are too 
> easily guessable. You should strongly consider using a 
> java.security.SecureRandom instead (and avoid allocating a new SecureRandom 
> for each random number needed).
> [http://findbugs.sourceforge.net/bugDescriptions.html#DMI_RANDOM_USED_ONLY_ONCE]
> {quote}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] Guibo-Pan commented on a change in pull request #6591: [FLINK-10186] Fix FindBugs warnings: Random object created and used o…

2018-08-23 Thread GitBox
Guibo-Pan commented on a change in pull request #6591: [FLINK-10186] Fix 
FindBugs warnings: Random object created and used o…
URL: https://github.com/apache/flink/pull/6591#discussion_r212410516
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
 ##
 @@ -115,7 +115,7 @@ public BufferSpiller(IOManager ioManager, int pageSize) 
throws IOException {
this.tempDir = tempDirs[DIRECTORY_INDEX.getAndIncrement() % 
tempDirs.length];
 
byte[] rndBytes = new byte[32];
-   new Random().nextBytes(rndBytes);
 
 Review comment:
   It's inside constructor method, and just randoms for once in an instance.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Comment Edited] (FLINK-10203) Support truncate method for old Hadoop versions in HadoopRecoverableFsDataOutputStream

2018-08-23 Thread Artsem Semianenka (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10203?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16590627#comment-16590627
 ] 

Artsem Semianenka edited comment on FLINK-10203 at 8/23/18 6:24 PM:


I'm so sorry guys, this is my first time when I try to submit an issue into 
Apache project.

But I guess I have no enough rights to assign this ticket to myself.


was (Author: artsem.semianenka):
I'm so sorry guys, this is my first time when I try to submit an issue into 
Apache project. 

> Support truncate method for old Hadoop versions in 
> HadoopRecoverableFsDataOutputStream
> --
>
> Key: FLINK-10203
> URL: https://issues.apache.org/jira/browse/FLINK-10203
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, filesystem-connector
>Affects Versions: 1.6.0, 1.6.1, 1.7.0
>Reporter: Artsem Semianenka
>Priority: Major
>  Labels: pull-request-available
>
> New StreamingFileSink ( introduced in 1.6 Flink version ) use 
> HadoopRecoverableFsDataOutputStream wrapper to write data in HDFS.
> HadoopRecoverableFsDataOutputStream is a wrapper for FSDataOutputStream to 
> have an ability to restore from certain point of file after failure and 
> continue write data. To achieve this recover functionality the 
> HadoopRecoverableFsDataOutputStream use "truncate" method which was 
> introduced only in Hadoop 2.7 .
> Unfortunately there are a few official Hadoop distributive which latest 
> version still use Hadoop 2.6 (This distributives: Cloudera, Pivotal HD ). As 
> the result Flinks Hadoop connector can't work with this distributives.
> Flink declares that supported Hadoop from version 2.4.0 upwards 
> ([https://ci.apache.org/projects/flink/flink-docs-release-1.6/start/building.html#hadoop-versions])
> I guess we should emulate the functionality of "truncate" method for older 
> Hadoop versions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10203) Support truncate method for old Hadoop versions in HadoopRecoverableFsDataOutputStream

2018-08-23 Thread Artsem Semianenka (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10203?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16590627#comment-16590627
 ] 

Artsem Semianenka commented on FLINK-10203:
---

I'm so sorry guys, this is my first time when I try to submit an issue into 
Apache project. 

> Support truncate method for old Hadoop versions in 
> HadoopRecoverableFsDataOutputStream
> --
>
> Key: FLINK-10203
> URL: https://issues.apache.org/jira/browse/FLINK-10203
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, filesystem-connector
>Affects Versions: 1.6.0, 1.6.1, 1.7.0
>Reporter: Artsem Semianenka
>Priority: Major
>  Labels: pull-request-available
>
> New StreamingFileSink ( introduced in 1.6 Flink version ) use 
> HadoopRecoverableFsDataOutputStream wrapper to write data in HDFS.
> HadoopRecoverableFsDataOutputStream is a wrapper for FSDataOutputStream to 
> have an ability to restore from certain point of file after failure and 
> continue write data. To achieve this recover functionality the 
> HadoopRecoverableFsDataOutputStream use "truncate" method which was 
> introduced only in Hadoop 2.7 .
> Unfortunately there are a few official Hadoop distributive which latest 
> version still use Hadoop 2.6 (This distributives: Cloudera, Pivotal HD ). As 
> the result Flinks Hadoop connector can't work with this distributives.
> Flink declares that supported Hadoop from version 2.4.0 upwards 
> ([https://ci.apache.org/projects/flink/flink-docs-release-1.6/start/building.html#hadoop-versions])
> I guess we should emulate the functionality of "truncate" method for older 
> Hadoop versions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8532) RebalancePartitioner should use Random value for its first partition

2018-08-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16590614#comment-16590614
 ] 

ASF GitHub Bot commented on FLINK-8532:
---

Guibo-Pan commented on issue #6544: [FLINK-8532] [Streaming] modify 
RebalancePartitioner to use a random partition as its first partition
URL: https://github.com/apache/flink/pull/6544#issuecomment-415513913
 
 
   I updated the code like mentioned above just now.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> RebalancePartitioner should use Random value for its first partition
> 
>
> Key: FLINK-8532
> URL: https://issues.apache.org/jira/browse/FLINK-8532
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Yuta Morisawa
>Assignee: Guibo Pan
>Priority: Major
>  Labels: pull-request-available
>
> In some conditions, RebalancePartitioner doesn't balance data correctly 
> because it use the same value for selecting next operators.
> RebalancePartitioner initializes its partition id using the same value in 
> every threads, so it indeed balances data, but at one moment the amount of 
> data in each operator is skew.
> Particularly, when the data rate of  former operators is equal , data skew 
> becomes severe.
>  
>  
> Example:
> Consider a simple operator chain.
> -> map1 -> rebalance -> map2 ->
> Each map operator(map1, map2) contains three subtasks(subtask 1, 2, 3, 4, 5, 
> 6).
> map1          map2
>  st1              st4
>  st2              st5
>  st3              st6
>  
> At the beginning, every subtasks in map1 sends data to st4 in map2 because 
> they use the same initial parition id.
> Next time the map1 receive data st1,2,3 send data to st5 because they 
> increment its partition id when they processed former data.
> In my environment,  it takes twice the time to process data when I use 
> RebalancePartitioner  as long as I use other partitioners(rescale, keyby).
>  
> To solve this problem, in my opinion, RebalancePartitioner should use its own 
> operator id for the initial value.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] Guibo-Pan commented on issue #6544: [FLINK-8532] [Streaming] modify RebalancePartitioner to use a random partition as its first partition

2018-08-23 Thread GitBox
Guibo-Pan commented on issue #6544: [FLINK-8532] [Streaming] modify 
RebalancePartitioner to use a random partition as its first partition
URL: https://github.com/apache/flink/pull/6544#issuecomment-415513913
 
 
   I updated the code like mentioned above just now.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9626) Possible resource leak in FileSystem

2018-08-23 Thread Stephan Ewen (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9626?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16590612#comment-16590612
 ] 

Stephan Ewen commented on FLINK-9626:
-

Note, that this happens only if there is no other FS factory in the classpath, 
which is not commonly the case. So in practice, this should not really happen 
outside test setups.

> Possible resource leak in FileSystem
> 
>
> Key: FLINK-9626
> URL: https://issues.apache.org/jira/browse/FLINK-9626
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Priority: Critical
>
> There is a potential resource leak in 
> org.apache.flink.core.fs.FileSystem#getUnguardedFileSystem.
> Inside it there is a code:
>  
> {code:java}
> // this "default" initialization makes sure that the FileSystem class works
> // even when not configured with an explicit Flink configuration, like on
> // JobManager or TaskManager setup
> if (FS_FACTORIES.isEmpty()) {
>initialize(new Configuration());
> }
> {code}
> which is executed on each cache miss. However this initialize method is also 
> doing
>  
>  
> {code:java}
> CACHE.clear();
> {code}
> without closing file systems in CACHE (this could be problematic for 
> HadoopFileSystem which is a wrapper around org.apache.hadoop.fs.FileSystem 
> which is closable).
> Now if for example we are constantly accessing two different file systems 
> (file systems are differentiated by combination of [schema and 
> authority|https://en.wikipedia.org/wiki/Uniform_Resource_Identifier#Generic_syntax]
>  part from the file system's URI) initialized from FALLBACK_FACTORY, each 
> time we call getUnguardedFileSystem for one of them, that call will clear 
> from CACHE entry for the other one. Thus we will constantly be creating new 
> FileSystems without closing them.
> Solution could be to either not clear the CACHE or make sure that FileSystems 
> are properly closed.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10203) Support truncate method for old Hadoop versions in HadoopRecoverableFsDataOutputStream

2018-08-23 Thread Kostas Kloudas (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10203?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16590607#comment-16590607
 ] 

Kostas Kloudas commented on FLINK-10203:


Hi [~artsem.semianenka]! 

Given that you are working on the issue, you should also assign it to yourself.

In addition, I link the discussion from the mailing list, so that we keep track 
of everything

https://mail-archives.apache.org/mod_mbox/flink-dev/201808.mbox/%3cf7af3ef2-1407-443e-a092-7b13a2467...@data-artisans.com%3E

Discussions are also better to happen in JIRA, before submitting PRs to Github. 
This is not only a personal opinion but I believe it is also a requirement from 
Apache 
as this is also Apache space, while Github is not.



> Support truncate method for old Hadoop versions in 
> HadoopRecoverableFsDataOutputStream
> --
>
> Key: FLINK-10203
> URL: https://issues.apache.org/jira/browse/FLINK-10203
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, filesystem-connector
>Affects Versions: 1.6.0, 1.6.1, 1.7.0
>Reporter: Artsem Semianenka
>Priority: Major
>  Labels: pull-request-available
>
> New StreamingFileSink ( introduced in 1.6 Flink version ) use 
> HadoopRecoverableFsDataOutputStream wrapper to write data in HDFS.
> HadoopRecoverableFsDataOutputStream is a wrapper for FSDataOutputStream to 
> have an ability to restore from certain point of file after failure and 
> continue write data. To achieve this recover functionality the 
> HadoopRecoverableFsDataOutputStream use "truncate" method which was 
> introduced only in Hadoop 2.7 .
> Unfortunately there are a few official Hadoop distributive which latest 
> version still use Hadoop 2.6 (This distributives: Cloudera, Pivotal HD ). As 
> the result Flinks Hadoop connector can't work with this distributives.
> Flink declares that supported Hadoop from version 2.4.0 upwards 
> ([https://ci.apache.org/projects/flink/flink-docs-release-1.6/start/building.html#hadoop-versions])
> I guess we should emulate the functionality of "truncate" method for older 
> Hadoop versions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-10203) Support truncate method for old Hadoop versions in HadoopRecoverableFsDataOutputStream

2018-08-23 Thread Kostas Kloudas (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10203?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kostas Kloudas reassigned FLINK-10203:
--

Assignee: (was: Kostas Kloudas)

> Support truncate method for old Hadoop versions in 
> HadoopRecoverableFsDataOutputStream
> --
>
> Key: FLINK-10203
> URL: https://issues.apache.org/jira/browse/FLINK-10203
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, filesystem-connector
>Affects Versions: 1.6.0, 1.6.1, 1.7.0
>Reporter: Artsem Semianenka
>Priority: Major
>  Labels: pull-request-available
>
> New StreamingFileSink ( introduced in 1.6 Flink version ) use 
> HadoopRecoverableFsDataOutputStream wrapper to write data in HDFS.
> HadoopRecoverableFsDataOutputStream is a wrapper for FSDataOutputStream to 
> have an ability to restore from certain point of file after failure and 
> continue write data. To achieve this recover functionality the 
> HadoopRecoverableFsDataOutputStream use "truncate" method which was 
> introduced only in Hadoop 2.7 .
> Unfortunately there are a few official Hadoop distributive which latest 
> version still use Hadoop 2.6 (This distributives: Cloudera, Pivotal HD ). As 
> the result Flinks Hadoop connector can't work with this distributives.
> Flink declares that supported Hadoop from version 2.4.0 upwards 
> ([https://ci.apache.org/projects/flink/flink-docs-release-1.6/start/building.html#hadoop-versions])
> I guess we should emulate the functionality of "truncate" method for older 
> Hadoop versions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-10203) Support truncate method for old Hadoop versions in HadoopRecoverableFsDataOutputStream

2018-08-23 Thread Kostas Kloudas (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10203?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kostas Kloudas reassigned FLINK-10203:
--

Assignee: Kostas Kloudas

> Support truncate method for old Hadoop versions in 
> HadoopRecoverableFsDataOutputStream
> --
>
> Key: FLINK-10203
> URL: https://issues.apache.org/jira/browse/FLINK-10203
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, filesystem-connector
>Affects Versions: 1.6.0, 1.6.1, 1.7.0
>Reporter: Artsem Semianenka
>Assignee: Kostas Kloudas
>Priority: Major
>  Labels: pull-request-available
>
> New StreamingFileSink ( introduced in 1.6 Flink version ) use 
> HadoopRecoverableFsDataOutputStream wrapper to write data in HDFS.
> HadoopRecoverableFsDataOutputStream is a wrapper for FSDataOutputStream to 
> have an ability to restore from certain point of file after failure and 
> continue write data. To achieve this recover functionality the 
> HadoopRecoverableFsDataOutputStream use "truncate" method which was 
> introduced only in Hadoop 2.7 .
> Unfortunately there are a few official Hadoop distributive which latest 
> version still use Hadoop 2.6 (This distributives: Cloudera, Pivotal HD ). As 
> the result Flinks Hadoop connector can't work with this distributives.
> Flink declares that supported Hadoop from version 2.4.0 upwards 
> ([https://ci.apache.org/projects/flink/flink-docs-release-1.6/start/building.html#hadoop-versions])
> I guess we should emulate the functionality of "truncate" method for older 
> Hadoop versions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10195) RabbitMQ Source With Checkpointing Doesn't Backpressure Correctly

2018-08-23 Thread Stephan Ewen (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10195?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16590591#comment-16590591
 ] 

Stephan Ewen commented on FLINK-10195:
--

Is that something that can be configured on RabbitMQ, for example a capacity 
bound on the queue in the QueuingConsumer?

> RabbitMQ Source With Checkpointing Doesn't Backpressure Correctly
> -
>
> Key: FLINK-10195
> URL: https://issues.apache.org/jira/browse/FLINK-10195
> Project: Flink
>  Issue Type: Bug
>  Components: RabbitMQ Connector
>Affects Versions: 1.4.0, 1.5.0, 1.5.1, 1.6.0
>Reporter: Luka Jurukovski
>Priority: Major
>
> The connection between the RabbitMQ server and the client does not 
> appropriately back pressure when auto acking is disabled. This becomes very 
> problematic when a downstream process throttles the data processing to slower 
> then RabbitMQ sends the data to the client.
> The difference in records ends up being stored in the flink's heap space, 
> which grows indefinitely (or technically to "Integer Max" Deliveries). 
> Looking at RabbitMQ's metrics the number of unacked messages looks like 
> steadily rising saw tooth shape.
> Upon further invesitgation it looks like this is due to how the 
> QueueingConsumer works, messages are added to the BlockingQueue faster then 
> they are being removed and processed, resulting in the previously described 
> behavior.
> This may be intended behavior, however this isn't explicitly obvious in the 
> documentation or any of the examples I have seen.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10203) Support truncate method for old Hadoop versions in HadoopRecoverableFsDataOutputStream

2018-08-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10203?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16590588#comment-16590588
 ] 

ASF GitHub Bot commented on FLINK-10203:


StephanEwen commented on issue #6608: [FLINK-10203]Support truncate method for 
old Hadoop versions in HadoopRecoverableFsDataOutputStream
URL: https://github.com/apache/flink/pull/6608#issuecomment-415507439
 
 
   I see, that is a fair reason. There are parallel efforts to add Parquet 
support to the old BucketingSink, but I see the point.
   
   Before going into a deep review,  can you update the description with how 
exactly the legacy truncater should be working: what copy and rename steps it 
does and how it behaves under failure / repeated calls. 
   
   Also, I would suggest to name it `Truncater` rather than `TruncateManager`. 
Too many managers all around already ;-)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support truncate method for old Hadoop versions in 
> HadoopRecoverableFsDataOutputStream
> --
>
> Key: FLINK-10203
> URL: https://issues.apache.org/jira/browse/FLINK-10203
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, filesystem-connector
>Affects Versions: 1.6.0, 1.6.1, 1.7.0
>Reporter: Artsem Semianenka
>Priority: Major
>  Labels: pull-request-available
>
> New StreamingFileSink ( introduced in 1.6 Flink version ) use 
> HadoopRecoverableFsDataOutputStream wrapper to write data in HDFS.
> HadoopRecoverableFsDataOutputStream is a wrapper for FSDataOutputStream to 
> have an ability to restore from certain point of file after failure and 
> continue write data. To achieve this recover functionality the 
> HadoopRecoverableFsDataOutputStream use "truncate" method which was 
> introduced only in Hadoop 2.7 .
> Unfortunately there are a few official Hadoop distributive which latest 
> version still use Hadoop 2.6 (This distributives: Cloudera, Pivotal HD ). As 
> the result Flinks Hadoop connector can't work with this distributives.
> Flink declares that supported Hadoop from version 2.4.0 upwards 
> ([https://ci.apache.org/projects/flink/flink-docs-release-1.6/start/building.html#hadoop-versions])
> I guess we should emulate the functionality of "truncate" method for older 
> Hadoop versions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] StephanEwen commented on issue #6608: [FLINK-10203]Support truncate method for old Hadoop versions in HadoopRecoverableFsDataOutputStream

2018-08-23 Thread GitBox
StephanEwen commented on issue #6608: [FLINK-10203]Support truncate method for 
old Hadoop versions in HadoopRecoverableFsDataOutputStream
URL: https://github.com/apache/flink/pull/6608#issuecomment-415507439
 
 
   I see, that is a fair reason. There are parallel efforts to add Parquet 
support to the old BucketingSink, but I see the point.
   
   Before going into a deep review,  can you update the description with how 
exactly the legacy truncater should be working: what copy and rename steps it 
does and how it behaves under failure / repeated calls. 
   
   Also, I would suggest to name it `Truncater` rather than `TruncateManager`. 
Too many managers all around already ;-)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10202) Enable configuration for state.checkpoint.dir in StreamExecutionEnvironment

2018-08-23 Thread Stephan Ewen (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10202?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16590577#comment-16590577
 ] 

Stephan Ewen commented on FLINK-10202:
--

You can do that by calling {{env.setStateBackend(new 
FsStateBackend(checkpointDir))}}.

> Enable configuration for state.checkpoint.dir in StreamExecutionEnvironment
> ---
>
> Key: FLINK-10202
> URL: https://issues.apache.org/jira/browse/FLINK-10202
> Project: Flink
>  Issue Type: Improvement
>Reporter: buptljy
>Priority: Major
>
> Usually we can set state.checkpoint.dir in flink-conf.yaml, but sometimes we 
> run a flink job locally, and we're not able to set state.checkpoint.dir for 
> background wrapped cluster, which will cause 
> {code:java}
> throw new IllegalStateException("CheckpointConfig says to persist periodic " +
>   "checkpoints, but no checkpoint directory has been configured. You can 
> " +
>   "configure configure one via key '" + 
> ConfigConstants.CHECKPOINTS_DIRECTORY_KEY + "'.");
> {code}
> I wonder if we could provide a public method in *StreamExecutionEnvironment* 
> so that developers can use it to set state.checkpoint.dir for job.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] alpinegizmo opened a new pull request #6609: [hotfix][docs] missing breaking metrics page

2018-08-23 Thread GitBox
alpinegizmo opened a new pull request #6609: [hotfix][docs] missing  
breaking metrics page
URL: https://github.com/apache/flink/pull/6609
 
 
   Adding a missing `` to metrics.md. 
   
   In both 
[master](https://ci.apache.org/projects/flink/flink-docs-master/monitoring/metrics.html#kinesis-connectors)
 and 
[release-1.6](https://ci.apache.org/projects/flink/flink-docs-release-1.6/monitoring/metrics.html#kinesis-connectors)
 the Kinesis Connectors section of the metrics doc page is broken, and appears 
like this:
   
   Kinesis Connectors
  ## Latency tracking Flink allows to track the 
latency of rec...


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-8532) RebalancePartitioner should use Random value for its first partition

2018-08-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16590561#comment-16590561
 ] 

ASF GitHub Bot commented on FLINK-8532:
---

Guibo-Pan commented on issue #6544: [FLINK-8532] [Streaming] modify 
RebalancePartitioner to use a random partition as its first partition
URL: https://github.com/apache/flink/pull/6544#issuecomment-415499383
 
 
   Hi @StephanEwen , your suggestion lead me to deep thinks, and the extreme 
performance is exactly what we want. I am going to ask you for more 
suggestions. I prefer to initialize the partitioner instance with a random 
partition, however in the design ahead, the partitioner doesn't know the target 
range.
   The alternative is like this:
   ```
   private final int[] returnArray = new int[] {new 
Random().nextInt(Integer.MAX_VALUE - 1)};
   
   @Override
   public int[] selectChannels(SerializationDelegate> record,
int numberOfOutputChannels) {
this.returnArray[0] = (this.returnArray[0] + 1) % 
numberOfOutputChannels;
return this.returnArray;
   }
   ```
Please tell me how you think, thanks.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> RebalancePartitioner should use Random value for its first partition
> 
>
> Key: FLINK-8532
> URL: https://issues.apache.org/jira/browse/FLINK-8532
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Yuta Morisawa
>Assignee: Guibo Pan
>Priority: Major
>  Labels: pull-request-available
>
> In some conditions, RebalancePartitioner doesn't balance data correctly 
> because it use the same value for selecting next operators.
> RebalancePartitioner initializes its partition id using the same value in 
> every threads, so it indeed balances data, but at one moment the amount of 
> data in each operator is skew.
> Particularly, when the data rate of  former operators is equal , data skew 
> becomes severe.
>  
>  
> Example:
> Consider a simple operator chain.
> -> map1 -> rebalance -> map2 ->
> Each map operator(map1, map2) contains three subtasks(subtask 1, 2, 3, 4, 5, 
> 6).
> map1          map2
>  st1              st4
>  st2              st5
>  st3              st6
>  
> At the beginning, every subtasks in map1 sends data to st4 in map2 because 
> they use the same initial parition id.
> Next time the map1 receive data st1,2,3 send data to st5 because they 
> increment its partition id when they processed former data.
> In my environment,  it takes twice the time to process data when I use 
> RebalancePartitioner  as long as I use other partitioners(rescale, keyby).
>  
> To solve this problem, in my opinion, RebalancePartitioner should use its own 
> operator id for the initial value.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] Guibo-Pan commented on issue #6544: [FLINK-8532] [Streaming] modify RebalancePartitioner to use a random partition as its first partition

2018-08-23 Thread GitBox
Guibo-Pan commented on issue #6544: [FLINK-8532] [Streaming] modify 
RebalancePartitioner to use a random partition as its first partition
URL: https://github.com/apache/flink/pull/6544#issuecomment-415499383
 
 
   Hi @StephanEwen , your suggestion lead me to deep thinks, and the extreme 
performance is exactly what we want. I am going to ask you for more 
suggestions. I prefer to initialize the partitioner instance with a random 
partition, however in the design ahead, the partitioner doesn't know the target 
range.
   The alternative is like this:
   ```
   private final int[] returnArray = new int[] {new 
Random().nextInt(Integer.MAX_VALUE - 1)};
   
   @Override
   public int[] selectChannels(SerializationDelegate> record,
int numberOfOutputChannels) {
this.returnArray[0] = (this.returnArray[0] + 1) % 
numberOfOutputChannels;
return this.returnArray;
   }
   ```
Please tell me how you think, thanks.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10201) The batchTestUtil was mistakenly used in some stream sql tests

2018-08-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10201?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16590544#comment-16590544
 ] 

ASF GitHub Bot commented on FLINK-10201:


fhueske commented on issue #6605: [FLINK-10201] [table] [test] The 
batchTestUtil was mistakenly used in some stream sql tests
URL: https://github.com/apache/flink/pull/6605#issuecomment-415496307
 
 
   Thanks for the PR @xccui.
   +1 to merge


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> The batchTestUtil was mistakenly used in some stream sql tests
> --
>
> Key: FLINK-10201
> URL: https://issues.apache.org/jira/browse/FLINK-10201
> Project: Flink
>  Issue Type: Test
>  Components: Table API & SQL
>Reporter: Xingcan Cui
>Assignee: Xingcan Cui
>Priority: Minor
>  Labels: pull-request-available
>
> The {{batchTestUtil}} was mistakenly used in stream sql tests 
> {{SetOperatorsTest.testValuesWithCast()}} and 
> {{CorrelateTest.testLeftOuterJoinAsSubQuery()}}. That should be fixed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] fhueske commented on issue #6605: [FLINK-10201] [table] [test] The batchTestUtil was mistakenly used in some stream sql tests

2018-08-23 Thread GitBox
fhueske commented on issue #6605: [FLINK-10201] [table] [test] The 
batchTestUtil was mistakenly used in some stream sql tests
URL: https://github.com/apache/flink/pull/6605#issuecomment-415496307
 
 
   Thanks for the PR @xccui.
   +1 to merge


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10011) Old job resurrected during HA failover

2018-08-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16590520#comment-16590520
 ] 

ASF GitHub Bot commented on FLINK-10011:


azagrebin commented on a change in pull request #6587: [FLINK-10011] Release 
JobGraph from SubmittedJobGraphStore
URL: https://github.com/apache/flink/pull/6587#discussion_r212321052
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
 ##
 @@ -237,7 +239,18 @@ public void addCheckpoint(final CompletedCheckpoint 
checkpoint) throws Exception
// Everything worked, let's remove a previous checkpoint if 
necessary.
while (completedCheckpoints.size() > 
maxNumberOfCheckpointsToRetain) {
try {
-   
removeSubsumed(completedCheckpoints.removeFirst());
+   final CompletedCheckpoint completedCheckpoint = 
completedCheckpoints.removeFirst();
 
 Review comment:
   I would try to move the whole try/catch into one method to deduplicate code 
with `shutdown()`, e.g.:
   ```
   void tryRemove(Runnable doRemove) {
 try {
   // ..
 doRemove.run(); // completedCheckpoint.discardOnSubsume(); or 
OnShutdown
   // ..
 } catch {
   // ...
 }
   }
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Old job resurrected during HA failover
> --
>
> Key: FLINK-10011
> URL: https://issues.apache.org/jira/browse/FLINK-10011
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.4.2, 1.5.2, 1.6.0
>Reporter: Elias Levy
>Assignee: Till Rohrmann
>Priority: Blocker
>  Labels: pull-request-available
>
> For the second time we've observed Flink resurrect an old job during 
> JobManager high-availability fail over.
> h4. Configuration
>  * AWS environment
>  * Flink 1.4.2 standalong cluster in HA mode
>  * 2 JMs, 3 TMs
>  * 3 node ZK ensemble
>  * 1 job consuming to/from Kafka
>  * Checkpoints in S3 using the Presto file system adaptor
> h4. Timeline 
>  * 15:18:10 JM 2 completes checkpoint 69256.
>  * 15:19:10 JM 2 completes checkpoint 69257.
>  * 15:19:57 ZK 1 (follower) loses connectivity to the leader as a result of a 
> SocketTimeoutException
>  * 15:19:57 ZK 1 closes connection to JM 2 (leader)
>  * 15:19:57 ZK 2 (leader) reports a network error and closes connection to ZK 
> 1
>  * 15:19:57 JM 2 reports it can't read data from ZK
>  ** {{Unable to read additional data from server sessionid 0x3003f4a0003, 
> likely server has closed socket, closing socket connection and attempting 
> reconnect)}}
>  ** {{org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn}}
>  * 15:19:57 JM 2 ZK Curator changes connection state to SUSPENDED
>  ** {{Connection to ZooKeeper suspended. Can no longer retrieve the leader 
> from ZooKeeper.}}
>  ** {{ZooKeeper connection SUSPENDED. }}{{Changes to the submitted job graphs 
> are not monitored (temporarily).}}
>  ** {{Connection to ZooKeeper suspended. The contender 
> akka.tcp://flink@flink-jm-2:6123/user/jobmanager no longer participates in 
> the leader election}}{{ }}
>  ** {{Connection to ZooKeeper suspended. Can no longer retrieve the leader 
> from ZooKeeper.}}
>  * 15:19:57 JM 2 gives up leadership
>  ** {{JobManager akka://flink/user/jobmanager#33755521 was revoked 
> leadership.}}
>  * 15:19:57 JM 2 changes job 
> {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color} status to SUSPENDED
>  ** {{Stopping checkpoint coordinator for job 
> {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}}}
>  * 15:19:57 TMs start disasociating with JM 2, but JM 2 discard the messages 
> because there is no leader
>  ** {{Discard message 
> LeaderSessionMessage(d29e9f38-a16d-4c87-b34f-5212caab0473,Disconnect(b97363d53ad22aedfebdc8e5ba3c672f,java.lang.Exception:
>  TaskManager akka://flink/user/taskmanager is disassociating)) because there 
> is currently no valid leader id known.}}
>  * 15:19:57 JM 2 connects to ZK 2 and renews its session
>  ** {{Opening socket connection to server 
> ip-10-210-43-221.ec2.internal/10.210.43.221:2181}}
>  ** {{Socket connection established to 
> ip-10-210-43-221.ec2.internal/10.210.43.221:2181, initiating session}}
>  ** {{Connection to ZooKeeper was reconnected. Leader retrieval can be 
> restarted.}}
>  ** {{Session establishment complete on server 
> ip-10-210-43-221.ec2.internal/10.210.43.221:2181, sessionid = 
> 0x3003f4a0003, ne

[jira] [Commented] (FLINK-10011) Old job resurrected during HA failover

2018-08-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16590519#comment-16590519
 ] 

ASF GitHub Bot commented on FLINK-10011:


azagrebin commented on a change in pull request #6587: [FLINK-10011] Release 
JobGraph from SubmittedJobGraphStore
URL: https://github.com/apache/flink/pull/6587#discussion_r212379451
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperHAJobManagerTest.java
 ##
 @@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.akka.ActorUtils;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.akka.ListeningBehaviour;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.dispatcher.DispatcherHATest;
+import org.apache.flink.runtime.dispatcher.NoOpSubmittedJobGraphListener;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
+import org.apache.flink.runtime.testingUtils.TestingJobManager;
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.util.ZooKeeperUtils;
+import org.apache.flink.runtime.zookeeper.ZooKeeperResource;
+import org.apache.flink.util.TestLogger;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.ExtendedActorSystem;
+import akka.actor.Identify;
+import akka.actor.Terminated;
+import akka.pattern.Patterns;
+import org.apache.curator.framework.CuratorFramework;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.util.Collection;
+import java.util.concurrent.TimeUnit;
+
+import scala.Option;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for the ZooKeeper HA service and {@link JobManager} interaction.
+ */
+public class ZooKeeperHAJobManagerTest extends TestLogger {
+
+   @ClassRule
+   public static final ZooKeeperResource ZOO_KEEPER_RESOURCE = new 
ZooKeeperResource();
+
+   @ClassRule
+   public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+
+   private static final FiniteDuration TIMEOUT = FiniteDuration.apply(10L, 
TimeUnit.SECONDS);
+
+   private static ActorSystem system;
+
+   @BeforeClass
+   public static void setup() {
+   system = AkkaUtils.createLocalActorSystem(new Configuration());
+   }
+
+   @AfterClass
+   public static void teardown() throws Exception {
+   final Future terminationFuture = system.terminate();
+   Await.ready(terminationFuture, TIMEOUT);
+   }
+
+   /**
+* Tests that the {@link JobManager} releases all locked {@link 
JobGraph} if it loses
+* leadership.
+*/
+   @Test
+   public void testJobGraphReleaseWhenLosingLeadership() throws Exception {
+   final Configuration configuration = new Configuration();
+   
configuration.setString(HighAvailabilityOptions.HA_Z

[GitHub] azagrebin commented on a change in pull request #6587: [FLINK-10011] Release JobGraph from SubmittedJobGraphStore

2018-08-23 Thread GitBox
azagrebin commented on a change in pull request #6587: [FLINK-10011] Release 
JobGraph from SubmittedJobGraphStore
URL: https://github.com/apache/flink/pull/6587#discussion_r212379451
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperHAJobManagerTest.java
 ##
 @@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.akka.ActorUtils;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.akka.ListeningBehaviour;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.dispatcher.DispatcherHATest;
+import org.apache.flink.runtime.dispatcher.NoOpSubmittedJobGraphListener;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
+import org.apache.flink.runtime.testingUtils.TestingJobManager;
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.util.ZooKeeperUtils;
+import org.apache.flink.runtime.zookeeper.ZooKeeperResource;
+import org.apache.flink.util.TestLogger;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.ExtendedActorSystem;
+import akka.actor.Identify;
+import akka.actor.Terminated;
+import akka.pattern.Patterns;
+import org.apache.curator.framework.CuratorFramework;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.util.Collection;
+import java.util.concurrent.TimeUnit;
+
+import scala.Option;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for the ZooKeeper HA service and {@link JobManager} interaction.
+ */
+public class ZooKeeperHAJobManagerTest extends TestLogger {
+
+   @ClassRule
+   public static final ZooKeeperResource ZOO_KEEPER_RESOURCE = new 
ZooKeeperResource();
+
+   @ClassRule
+   public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+
+   private static final FiniteDuration TIMEOUT = FiniteDuration.apply(10L, 
TimeUnit.SECONDS);
+
+   private static ActorSystem system;
+
+   @BeforeClass
+   public static void setup() {
+   system = AkkaUtils.createLocalActorSystem(new Configuration());
+   }
+
+   @AfterClass
+   public static void teardown() throws Exception {
+   final Future terminationFuture = system.terminate();
+   Await.ready(terminationFuture, TIMEOUT);
+   }
+
+   /**
+* Tests that the {@link JobManager} releases all locked {@link 
JobGraph} if it loses
+* leadership.
+*/
+   @Test
+   public void testJobGraphReleaseWhenLosingLeadership() throws Exception {
+   final Configuration configuration = new Configuration();
+   
configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, 
ZOO_KEEPER_RESOURCE.getConnectString());
+   
configuration.setString(HighAvailabilityOptions.HA_STORAGE_PATH, 
TEMPORARY_FOLDER.newFolder().getAbsolutePath());
+
+   final TestingHighAvailabilityServices highAvailabil

[GitHub] azagrebin commented on a change in pull request #6587: [FLINK-10011] Release JobGraph from SubmittedJobGraphStore

2018-08-23 Thread GitBox
azagrebin commented on a change in pull request #6587: [FLINK-10011] Release 
JobGraph from SubmittedJobGraphStore
URL: https://github.com/apache/flink/pull/6587#discussion_r212321052
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
 ##
 @@ -237,7 +239,18 @@ public void addCheckpoint(final CompletedCheckpoint 
checkpoint) throws Exception
// Everything worked, let's remove a previous checkpoint if 
necessary.
while (completedCheckpoints.size() > 
maxNumberOfCheckpointsToRetain) {
try {
-   
removeSubsumed(completedCheckpoints.removeFirst());
+   final CompletedCheckpoint completedCheckpoint = 
completedCheckpoints.removeFirst();
 
 Review comment:
   I would try to move the whole try/catch into one method to deduplicate code 
with `shutdown()`, e.g.:
   ```
   void tryRemove(Runnable doRemove) {
 try {
   // ..
 doRemove.run(); // completedCheckpoint.discardOnSubsume(); or 
OnShutdown
   // ..
 } catch {
   // ...
 }
   }
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10136) Add REPEAT supported in Table API and SQL

2018-08-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10136?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16590453#comment-16590453
 ] 

ASF GitHub Bot commented on FLINK-10136:


yanghua commented on issue #6597: [FLINK-10136] [table] Add REPEAT supported in 
Table API and SQL
URL: https://github.com/apache/flink/pull/6597#issuecomment-415478057
 
 
   cc @xccui 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add REPEAT supported in Table API and SQL
> -
>
> Key: FLINK-10136
> URL: https://issues.apache.org/jira/browse/FLINK-10136
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Minor
>  Labels: pull-request-available
>
> Oracle : 
> [https://docs.oracle.com/cd/E17952_01/mysql-5.1-en/string-functions.html#function_repeat]
> MySql: 
> https://dev.mysql.com/doc/refman/5.5/en/string-functions.html#function_repeat



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] yanghua commented on issue #6597: [FLINK-10136] [table] Add REPEAT supported in Table API and SQL

2018-08-23 Thread GitBox
yanghua commented on issue #6597: [FLINK-10136] [table] Add REPEAT supported in 
Table API and SQL
URL: https://github.com/apache/flink/pull/6597#issuecomment-415478057
 
 
   cc @xccui 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] StefanRRichter commented on issue #6604: [FLINK-9061] Optionally add entropy to checkpoint paths better S3 scalability

2018-08-23 Thread GitBox
StefanRRichter commented on issue #6604: [FLINK-9061] Optionally add entropy to 
checkpoint paths better S3 scalability
URL: https://github.com/apache/flink/pull/6604#issuecomment-415476287
 
 
   Modulo some test failures from NPEs.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9061) add entropy to s3 path for better scalability

2018-08-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16590443#comment-16590443
 ] 

ASF GitHub Bot commented on FLINK-9061:
---

StefanRRichter commented on issue #6604: [FLINK-9061] Optionally add entropy to 
checkpoint paths better S3 scalability
URL: https://github.com/apache/flink/pull/6604#issuecomment-415476287
 
 
   Modulo some test failures from NPEs.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> add entropy to s3 path for better scalability
> -
>
> Key: FLINK-9061
> URL: https://issues.apache.org/jira/browse/FLINK-9061
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.4.2
>Reporter: Jamie Grier
>Assignee: Indrajit Roychoudhury
>Priority: Critical
>  Labels: pull-request-available
>
> I think we need to modify the way we write checkpoints to S3 for high-scale 
> jobs (those with many total tasks).  The issue is that we are writing all the 
> checkpoint data under a common key prefix.  This is the worst case scenario 
> for S3 performance since the key is used as a partition key.
>  
> In the worst case checkpoints fail with a 500 status code coming back from S3 
> and an internal error type of TooBusyException.
>  
> One possible solution would be to add a hook in the Flink filesystem code 
> that allows me to "rewrite" paths.  For example say I have the checkpoint 
> directory set to:
>  
> s3://bucket/flink/checkpoints
>  
> I would hook that and rewrite that path to:
>  
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
> path
>  
> This would distribute the checkpoint write load around the S3 cluster evenly.
>  
> For reference: 
> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>  
> Any other people hit this issue?  Any other ideas for solutions?  This is a 
> pretty serious problem for people trying to checkpoint to S3.
>  
> -Jamie
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10203) Support truncate method for old Hadoop versions in HadoopRecoverableFsDataOutputStream

2018-08-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10203?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16590401#comment-16590401
 ] 

ASF GitHub Bot commented on FLINK-10203:


art4ul commented on issue #6608: [FLINK-10203]Support truncate method for old 
Hadoop versions in HadoopRecoverableFsDataOutputStream
URL: https://github.com/apache/flink/pull/6608#issuecomment-415465746
 
 
   @StephanEwen I try to explain you our case: the new Streaming FileSink 
ideally suitable for write into parquet files and we was so exited when founded 
it in the new 1.6 version , because we worked on our Sink implementation in 
parallel . But as I mention before we use latest Cloudera 5.15 distributive 
based on Hadoop 2.6 and unfortunately we can't upgrade it to higher version of 
Hadoop. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support truncate method for old Hadoop versions in 
> HadoopRecoverableFsDataOutputStream
> --
>
> Key: FLINK-10203
> URL: https://issues.apache.org/jira/browse/FLINK-10203
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, filesystem-connector
>Affects Versions: 1.6.0, 1.6.1, 1.7.0
>Reporter: Artsem Semianenka
>Priority: Major
>  Labels: pull-request-available
>
> New StreamingFileSink ( introduced in 1.6 Flink version ) use 
> HadoopRecoverableFsDataOutputStream wrapper to write data in HDFS.
> HadoopRecoverableFsDataOutputStream is a wrapper for FSDataOutputStream to 
> have an ability to restore from certain point of file after failure and 
> continue write data. To achieve this recover functionality the 
> HadoopRecoverableFsDataOutputStream use "truncate" method which was 
> introduced only in Hadoop 2.7 .
> Unfortunately there are a few official Hadoop distributive which latest 
> version still use Hadoop 2.6 (This distributives: Cloudera, Pivotal HD ). As 
> the result Flinks Hadoop connector can't work with this distributives.
> Flink declares that supported Hadoop from version 2.4.0 upwards 
> ([https://ci.apache.org/projects/flink/flink-docs-release-1.6/start/building.html#hadoop-versions])
> I guess we should emulate the functionality of "truncate" method for older 
> Hadoop versions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] art4ul commented on issue #6608: [FLINK-10203]Support truncate method for old Hadoop versions in HadoopRecoverableFsDataOutputStream

2018-08-23 Thread GitBox
art4ul commented on issue #6608: [FLINK-10203]Support truncate method for old 
Hadoop versions in HadoopRecoverableFsDataOutputStream
URL: https://github.com/apache/flink/pull/6608#issuecomment-415465746
 
 
   @StephanEwen I try to explain you our case: the new Streaming FileSink 
ideally suitable for write into parquet files and we was so exited when founded 
it in the new 1.6 version , because we worked on our Sink implementation in 
parallel . But as I mention before we use latest Cloudera 5.15 distributive 
based on Hadoop 2.6 and unfortunately we can't upgrade it to higher version of 
Hadoop. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9559) The type of a union of CHAR columns of different lengths should be VARCHAR

2018-08-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16590380#comment-16590380
 ] 

ASF GitHub Bot commented on FLINK-9559:
---

hequn8128 commented on issue #6519: [FLINK-9559] [table] The type of a union of 
CHAR columns of different lengths should be VARCHAR
URL: https://github.com/apache/flink/pull/6519#issuecomment-415459642
 
 
   @pnowojski Hi, thanks for your review.
   
   Yes, support VARCHAR(N) can solve the problem. As a workaround, we can cast 
the literals to varchar to solve the problem, like: `case 1 when 1 then 
cast('a' as varchar) when 2 then cast('bcd' as varchar) end`.
   
   The behavior in strict SQL standard mode(SQL:2003) returns a CHAR(N) type 
with blank-padded.
   However, I test Oracle/SqlServer/Mysql and they all return the value without 
blank-padded. Also, it seems that there is no way to turn blank-padded on. 
   This change will break implementations in Flink. Do you think we should make 
it configurable?
   
   Thanks, Hequn


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> The type of a union of CHAR columns of different lengths should be VARCHAR
> --
>
> Key: FLINK-9559
> URL: https://issues.apache.org/jira/browse/FLINK-9559
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>
> Currently, If the case-when expression has two branches which return string 
> literal, redundant white spaces will be appended to the short string literal. 
> For example, for the sql: case 1 when 1 then 'a' when 2 then 'bcd' end, the 
> return value will be 'a ' of CHAR(3) instead of 'a'.
> Although, this follows the behavior in strict SQL standard mode(SQL:2003). We 
> should get the pragmatic return type in a real scenario without blank-padded. 
> Happily, this problem has been fixed by 
> [CALCITE-2321|https://issues.apache.org/jira/browse/CALCITE-2321], we can 
> upgrade calcite to the next release(1.17.0) and override 
> {{RelDataTypeSystem}} in flink to configure the return type, i.e., making 
> {{shouldConvertRaggedUnionTypesToVarying()}} return true.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] hequn8128 commented on issue #6519: [FLINK-9559] [table] The type of a union of CHAR columns of different lengths should be VARCHAR

2018-08-23 Thread GitBox
hequn8128 commented on issue #6519: [FLINK-9559] [table] The type of a union of 
CHAR columns of different lengths should be VARCHAR
URL: https://github.com/apache/flink/pull/6519#issuecomment-415459642
 
 
   @pnowojski Hi, thanks for your review.
   
   Yes, support VARCHAR(N) can solve the problem. As a workaround, we can cast 
the literals to varchar to solve the problem, like: `case 1 when 1 then 
cast('a' as varchar) when 2 then cast('bcd' as varchar) end`.
   
   The behavior in strict SQL standard mode(SQL:2003) returns a CHAR(N) type 
with blank-padded.
   However, I test Oracle/SqlServer/Mysql and they all return the value without 
blank-padded. Also, it seems that there is no way to turn blank-padded on. 
   This change will break implementations in Flink. Do you think we should make 
it configurable?
   
   Thanks, Hequn


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9061) add entropy to s3 path for better scalability

2018-08-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16590366#comment-16590366
 ] 

ASF GitHub Bot commented on FLINK-9061:
---

StephanEwen commented on issue #6604: [FLINK-9061] Optionally add entropy to 
checkpoint paths better S3 scalability
URL: https://github.com/apache/flink/pull/6604#issuecomment-415454197
 
 
   Okay, updated the implementation. Moved the entropy injection code tho the 
FS implementation, which is much more clean.
   
   PR description updated...


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> add entropy to s3 path for better scalability
> -
>
> Key: FLINK-9061
> URL: https://issues.apache.org/jira/browse/FLINK-9061
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.4.2
>Reporter: Jamie Grier
>Assignee: Indrajit Roychoudhury
>Priority: Critical
>  Labels: pull-request-available
>
> I think we need to modify the way we write checkpoints to S3 for high-scale 
> jobs (those with many total tasks).  The issue is that we are writing all the 
> checkpoint data under a common key prefix.  This is the worst case scenario 
> for S3 performance since the key is used as a partition key.
>  
> In the worst case checkpoints fail with a 500 status code coming back from S3 
> and an internal error type of TooBusyException.
>  
> One possible solution would be to add a hook in the Flink filesystem code 
> that allows me to "rewrite" paths.  For example say I have the checkpoint 
> directory set to:
>  
> s3://bucket/flink/checkpoints
>  
> I would hook that and rewrite that path to:
>  
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
> path
>  
> This would distribute the checkpoint write load around the S3 cluster evenly.
>  
> For reference: 
> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>  
> Any other people hit this issue?  Any other ideas for solutions?  This is a 
> pretty serious problem for people trying to checkpoint to S3.
>  
> -Jamie
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] StephanEwen commented on issue #6604: [FLINK-9061] Optionally add entropy to checkpoint paths better S3 scalability

2018-08-23 Thread GitBox
StephanEwen commented on issue #6604: [FLINK-9061] Optionally add entropy to 
checkpoint paths better S3 scalability
URL: https://github.com/apache/flink/pull/6604#issuecomment-415454197
 
 
   Okay, updated the implementation. Moved the entropy injection code tho the 
FS implementation, which is much more clean.
   
   PR description updated...


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10203) Support truncate method for old Hadoop versions in HadoopRecoverableFsDataOutputStream

2018-08-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10203?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16590362#comment-16590362
 ] 

ASF GitHub Bot commented on FLINK-10203:


art4ul commented on issue #6608: [FLINK-10203]Support truncate method for old 
Hadoop versions in HadoopRecoverableFsDataOutputStream
URL: https://github.com/apache/flink/pull/6608#issuecomment-415453555
 
 
   @StephanEwen Yes it's critical because we need to write Parquet files .


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support truncate method for old Hadoop versions in 
> HadoopRecoverableFsDataOutputStream
> --
>
> Key: FLINK-10203
> URL: https://issues.apache.org/jira/browse/FLINK-10203
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, filesystem-connector
>Affects Versions: 1.6.0, 1.6.1, 1.7.0
>Reporter: Artsem Semianenka
>Priority: Major
>  Labels: pull-request-available
>
> New StreamingFileSink ( introduced in 1.6 Flink version ) use 
> HadoopRecoverableFsDataOutputStream wrapper to write data in HDFS.
> HadoopRecoverableFsDataOutputStream is a wrapper for FSDataOutputStream to 
> have an ability to restore from certain point of file after failure and 
> continue write data. To achieve this recover functionality the 
> HadoopRecoverableFsDataOutputStream use "truncate" method which was 
> introduced only in Hadoop 2.7 .
> Unfortunately there are a few official Hadoop distributive which latest 
> version still use Hadoop 2.6 (This distributives: Cloudera, Pivotal HD ). As 
> the result Flinks Hadoop connector can't work with this distributives.
> Flink declares that supported Hadoop from version 2.4.0 upwards 
> ([https://ci.apache.org/projects/flink/flink-docs-release-1.6/start/building.html#hadoop-versions])
> I guess we should emulate the functionality of "truncate" method for older 
> Hadoop versions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] art4ul commented on issue #6608: [FLINK-10203]Support truncate method for old Hadoop versions in HadoopRecoverableFsDataOutputStream

2018-08-23 Thread GitBox
art4ul commented on issue #6608: [FLINK-10203]Support truncate method for old 
Hadoop versions in HadoopRecoverableFsDataOutputStream
URL: https://github.com/apache/flink/pull/6608#issuecomment-415453555
 
 
   @StephanEwen Yes it's critical because we need to write Parquet files .


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-10203) Support truncate method for old Hadoop versions in HadoopRecoverableFsDataOutputStream

2018-08-23 Thread Artsem Semianenka (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10203?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Artsem Semianenka updated FLINK-10203:
--
Description: 
New StreamingFileSink ( introduced in 1.6 Flink version ) use 
HadoopRecoverableFsDataOutputStream wrapper to write data in HDFS.

HadoopRecoverableFsDataOutputStream is a wrapper for FSDataOutputStream to have 
an ability to restore from certain point of file after failure and continue 
write data. To achieve this recover functionality the 
HadoopRecoverableFsDataOutputStream use "truncate" method which was introduced 
only in Hadoop 2.7 .

Unfortunately there are a few official Hadoop distributive which latest version 
still use Hadoop 2.6 (This distributives: Cloudera, Pivotal HD ). As the result 
Flinks Hadoop connector can't work with this distributives.

Flink declares that supported Hadoop from version 2.4.0 upwards 
([https://ci.apache.org/projects/flink/flink-docs-release-1.6/start/building.html#hadoop-versions])

I guess we should emulate the functionality of "truncate" method for older 
Hadoop versions.

  was:
New StreamingFileSink ( introduced in 1.6 Flink version ) use 
HadoopRecoverableFsDataOutputStream wrapper to write data in HDFS.

HadoopRecoverableFsDataOutputStream is a wrapper for FSDataOutputStream to have 
an ability to restore from certain point of file after failure and continue 
write data. To achive this recover functionality the 
HadoopRecoverableFsDataOutputStream use "truncate" method which was introduced 
only in Hadoop 2.7 .

Unfortently there are a few official Hadoop distibutives which latast version 
still use Hadoop 2.6 (This distibutives: Cloudera, Pivotal HD ). As the result 
Flink can't work with this distibutives.

Flink declares that supported Hadoop from version 2.4.0 upwards 
([https://ci.apache.org/projects/flink/flink-docs-release-1.6/start/building.html#hadoop-versions])

I guess we should emulate the functionality of "truncate" method for older 
Hadoop versions.


> Support truncate method for old Hadoop versions in 
> HadoopRecoverableFsDataOutputStream
> --
>
> Key: FLINK-10203
> URL: https://issues.apache.org/jira/browse/FLINK-10203
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, filesystem-connector
>Affects Versions: 1.6.0, 1.6.1, 1.7.0
>Reporter: Artsem Semianenka
>Priority: Major
>  Labels: pull-request-available
>
> New StreamingFileSink ( introduced in 1.6 Flink version ) use 
> HadoopRecoverableFsDataOutputStream wrapper to write data in HDFS.
> HadoopRecoverableFsDataOutputStream is a wrapper for FSDataOutputStream to 
> have an ability to restore from certain point of file after failure and 
> continue write data. To achieve this recover functionality the 
> HadoopRecoverableFsDataOutputStream use "truncate" method which was 
> introduced only in Hadoop 2.7 .
> Unfortunately there are a few official Hadoop distributive which latest 
> version still use Hadoop 2.6 (This distributives: Cloudera, Pivotal HD ). As 
> the result Flinks Hadoop connector can't work with this distributives.
> Flink declares that supported Hadoop from version 2.4.0 upwards 
> ([https://ci.apache.org/projects/flink/flink-docs-release-1.6/start/building.html#hadoop-versions])
> I guess we should emulate the functionality of "truncate" method for older 
> Hadoop versions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10203) Support truncate method for old Hadoop versions in HadoopRecoverableFsDataOutputStream

2018-08-23 Thread Artsem Semianenka (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10203?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Artsem Semianenka updated FLINK-10203:
--
Description: 
New StreamingFileSink ( introduced in 1.6 Flink version ) use 
HadoopRecoverableFsDataOutputStream wrapper to write data in HDFS.

HadoopRecoverableFsDataOutputStream is a wrapper for FSDataOutputStream to have 
an ability to restore from certain point of file after failure and continue 
write data. To achive this recover functionality the 
HadoopRecoverableFsDataOutputStream use "truncate" method which was introduced 
only in Hadoop 2.7 .

Unfortently there are a few official Hadoop distibutives which latast version 
still use Hadoop 2.6 (This distibutives: Cloudera, Pivotal HD ). As the result 
Flink can't work with this distibutives.

Flink declares that supported Hadoop from version 2.4.0 upwards 
([https://ci.apache.org/projects/flink/flink-docs-release-1.6/start/building.html#hadoop-versions])

I guess we should emulate the functionality of "truncate" method for older 
Hadoop versions.

  was:
New StreamingFileSink ( introduced in 1.6 Flink version ) use 
HadoopRecoverableFsDataOutputStream wrapper to write data in HDFS.

HadoopRecoverableFsDataOutputStream is an wrapper on FSDataOutputStream to have 
an ability to restore from certain point of file after failure and continue 
write data. To achive this recover functionality the 
HadoopRecoverableFsDataOutputStream use "truncate" method which was introduced 
only in Hadoop 2.7 .

Unfortently there are a few official Hadoop distibutives which latast version 
still use Hadoop 2.6 (This distibutives: Cloudera, Pivotal HD ). As the result 
Flink can't work with this distibutives.

Flink declares that supported Hadoop from version 2.4.0 upwards 
(https://ci.apache.org/projects/flink/flink-docs-release-1.6/start/building.html#hadoop-versions)

I guess we should emulate the functionality of "truncate" method for older 
Hadoop versions.


> Support truncate method for old Hadoop versions in 
> HadoopRecoverableFsDataOutputStream
> --
>
> Key: FLINK-10203
> URL: https://issues.apache.org/jira/browse/FLINK-10203
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, filesystem-connector
>Affects Versions: 1.6.0, 1.6.1, 1.7.0
>Reporter: Artsem Semianenka
>Priority: Major
>  Labels: pull-request-available
>
> New StreamingFileSink ( introduced in 1.6 Flink version ) use 
> HadoopRecoverableFsDataOutputStream wrapper to write data in HDFS.
> HadoopRecoverableFsDataOutputStream is a wrapper for FSDataOutputStream to 
> have an ability to restore from certain point of file after failure and 
> continue write data. To achive this recover functionality the 
> HadoopRecoverableFsDataOutputStream use "truncate" method which was 
> introduced only in Hadoop 2.7 .
> Unfortently there are a few official Hadoop distibutives which latast version 
> still use Hadoop 2.6 (This distibutives: Cloudera, Pivotal HD ). As the 
> result Flink can't work with this distibutives.
> Flink declares that supported Hadoop from version 2.4.0 upwards 
> ([https://ci.apache.org/projects/flink/flink-docs-release-1.6/start/building.html#hadoop-versions])
> I guess we should emulate the functionality of "truncate" method for older 
> Hadoop versions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10203) Support truncate method for old Hadoop versions in HadoopRecoverableFsDataOutputStream

2018-08-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10203?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16590352#comment-16590352
 ] 

ASF GitHub Bot commented on FLINK-10203:


StephanEwen commented on issue #6608: [FLINK-10203]Support truncate method for 
old Hadoop versions in HadoopRecoverableFsDataOutputStream
URL: https://github.com/apache/flink/pull/6608#issuecomment-415451715
 
 
   @art4ul Initially, we wanted to keep the new StreamingFileSink code simple 
and clean, that's why we decided to only with Hadoop 2.7+ for HDFS and retained 
the old BucketingSink, to support prior Hadoop versions.
   
   Is it a critical issue on your side to not use the BucketingSink?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support truncate method for old Hadoop versions in 
> HadoopRecoverableFsDataOutputStream
> --
>
> Key: FLINK-10203
> URL: https://issues.apache.org/jira/browse/FLINK-10203
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, filesystem-connector
>Affects Versions: 1.6.0, 1.6.1, 1.7.0
>Reporter: Artsem Semianenka
>Priority: Major
>  Labels: pull-request-available
>
> New StreamingFileSink ( introduced in 1.6 Flink version ) use 
> HadoopRecoverableFsDataOutputStream wrapper to write data in HDFS.
> HadoopRecoverableFsDataOutputStream is an wrapper on FSDataOutputStream to 
> have an ability to restore from certain point of file after failure and 
> continue write data. To achive this recover functionality the 
> HadoopRecoverableFsDataOutputStream use "truncate" method which was 
> introduced only in Hadoop 2.7 .
> Unfortently there are a few official Hadoop distibutives which latast version 
> still use Hadoop 2.6 (This distibutives: Cloudera, Pivotal HD ). As the 
> result Flink can't work with this distibutives.
> Flink declares that supported Hadoop from version 2.4.0 upwards 
> (https://ci.apache.org/projects/flink/flink-docs-release-1.6/start/building.html#hadoop-versions)
> I guess we should emulate the functionality of "truncate" method for older 
> Hadoop versions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] StephanEwen commented on issue #6608: [FLINK-10203]Support truncate method for old Hadoop versions in HadoopRecoverableFsDataOutputStream

2018-08-23 Thread GitBox
StephanEwen commented on issue #6608: [FLINK-10203]Support truncate method for 
old Hadoop versions in HadoopRecoverableFsDataOutputStream
URL: https://github.com/apache/flink/pull/6608#issuecomment-415451715
 
 
   @art4ul Initially, we wanted to keep the new StreamingFileSink code simple 
and clean, that's why we decided to only with Hadoop 2.7+ for HDFS and retained 
the old BucketingSink, to support prior Hadoop versions.
   
   Is it a critical issue on your side to not use the BucketingSink?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10198) Set Env object in DBOptions for RocksDB

2018-08-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10198?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16590340#comment-16590340
 ] 

ASF GitHub Bot commented on FLINK-10198:


StephanEwen commented on issue #6603: [FLINK-10198][state] Set Env object in 
DBOptions for RocksDB
URL: https://github.com/apache/flink/pull/6603#issuecomment-415448851
 
 
   This has big implication on how users configure their memory. Previously, 
there was one buffer pool per state, now there is one buffer pool total. So it 
definitely breaks configurations (lowering overall memory usage, but impacting 
performance).
   
   Is there a concrete reason to add this now, or maybe at some point when we 
revisit the overall RocksDB configuration model?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Set Env object in DBOptions for RocksDB
> ---
>
> Key: FLINK-10198
> URL: https://issues.apache.org/jira/browse/FLINK-10198
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.7.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>
> I think we should consider to always set a default environment when we create 
> the DBOptions.
> See https://github.com/facebook/rocksdb/wiki/rocksdb-basics:
> *Support for Multiple Embedded Databases in the same process*
> A common use-case for RocksDB is that applications inherently partition their 
> data set into logical partitions or shards. This technique benefits 
> application load balancing and fast recovery from faults. This means that a 
> single server process should be able to operate multiple RocksDB databases 
> simultaneously. This is done via an environment object named Env. Among other 
> things, a thread pool is associated with an Env. If applications want to 
> share a common thread pool (for background compactions) among multiple 
> database instances, then it should use the same Env object for opening those 
> databases.
> Similarly, multiple database instances may share the same block cache.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] StephanEwen commented on issue #6603: [FLINK-10198][state] Set Env object in DBOptions for RocksDB

2018-08-23 Thread GitBox
StephanEwen commented on issue #6603: [FLINK-10198][state] Set Env object in 
DBOptions for RocksDB
URL: https://github.com/apache/flink/pull/6603#issuecomment-415448851
 
 
   This has big implication on how users configure their memory. Previously, 
there was one buffer pool per state, now there is one buffer pool total. So it 
definitely breaks configurations (lowering overall memory usage, but impacting 
performance).
   
   Is there a concrete reason to add this now, or maybe at some point when we 
revisit the overall RocksDB configuration model?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9559) The type of a union of CHAR columns of different lengths should be VARCHAR

2018-08-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16590334#comment-16590334
 ] 

ASF GitHub Bot commented on FLINK-9559:
---

pnowojski edited a comment on issue #6519: [FLINK-9559] [table] The type of a 
union of CHAR columns of different lengths should be VARCHAR
URL: https://github.com/apache/flink/pull/6519#issuecomment-415443589
 
 
   At least according to draft of 2008 SQL Standard (paragraph 9.3, section 3) 
iii) 3) ) the result type in this case should be `CHAR(N)` where N is the 
maximum of the inputs. If any of the inputs is `VARCHAR(x)`, the result should 
be also `VARCHAR`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> The type of a union of CHAR columns of different lengths should be VARCHAR
> --
>
> Key: FLINK-9559
> URL: https://issues.apache.org/jira/browse/FLINK-9559
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>
> Currently, If the case-when expression has two branches which return string 
> literal, redundant white spaces will be appended to the short string literal. 
> For example, for the sql: case 1 when 1 then 'a' when 2 then 'bcd' end, the 
> return value will be 'a ' of CHAR(3) instead of 'a'.
> Although, this follows the behavior in strict SQL standard mode(SQL:2003). We 
> should get the pragmatic return type in a real scenario without blank-padded. 
> Happily, this problem has been fixed by 
> [CALCITE-2321|https://issues.apache.org/jira/browse/CALCITE-2321], we can 
> upgrade calcite to the next release(1.17.0) and override 
> {{RelDataTypeSystem}} in flink to configure the return type, i.e., making 
> {{shouldConvertRaggedUnionTypesToVarying()}} return true.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] pnowojski edited a comment on issue #6519: [FLINK-9559] [table] The type of a union of CHAR columns of different lengths should be VARCHAR

2018-08-23 Thread GitBox
pnowojski edited a comment on issue #6519: [FLINK-9559] [table] The type of a 
union of CHAR columns of different lengths should be VARCHAR
URL: https://github.com/apache/flink/pull/6519#issuecomment-415443589
 
 
   At least according to draft of 2008 SQL Standard (paragraph 9.3, section 3) 
iii) 3) ) the result type in this case should be `CHAR(N)` where N is the 
maximum of the inputs. If any of the inputs is `VARCHAR(x)`, the result should 
be also `VARCHAR`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9559) The type of a union of CHAR columns of different lengths should be VARCHAR

2018-08-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16590333#comment-16590333
 ] 

ASF GitHub Bot commented on FLINK-9559:
---

pnowojski commented on issue #6519: [FLINK-9559] [table] The type of a union of 
CHAR columns of different lengths should be VARCHAR
URL: https://github.com/apache/flink/pull/6519#issuecomment-415443589
 
 
   At least according to draft of 2008 SQL Standard, this PR violates it 
(paragraph 9.3, section 3) iii) 3) says that the result type in this case 
should be `CHAR(N)` where N is the maximum of the inputs. If any of the inputs 
is `VARCHAR(x)`, the result should be also `VARCHAR`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> The type of a union of CHAR columns of different lengths should be VARCHAR
> --
>
> Key: FLINK-9559
> URL: https://issues.apache.org/jira/browse/FLINK-9559
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>
> Currently, If the case-when expression has two branches which return string 
> literal, redundant white spaces will be appended to the short string literal. 
> For example, for the sql: case 1 when 1 then 'a' when 2 then 'bcd' end, the 
> return value will be 'a ' of CHAR(3) instead of 'a'.
> Although, this follows the behavior in strict SQL standard mode(SQL:2003). We 
> should get the pragmatic return type in a real scenario without blank-padded. 
> Happily, this problem has been fixed by 
> [CALCITE-2321|https://issues.apache.org/jira/browse/CALCITE-2321], we can 
> upgrade calcite to the next release(1.17.0) and override 
> {{RelDataTypeSystem}} in flink to configure the return type, i.e., making 
> {{shouldConvertRaggedUnionTypesToVarying()}} return true.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] pnowojski commented on issue #6519: [FLINK-9559] [table] The type of a union of CHAR columns of different lengths should be VARCHAR

2018-08-23 Thread GitBox
pnowojski commented on issue #6519: [FLINK-9559] [table] The type of a union of 
CHAR columns of different lengths should be VARCHAR
URL: https://github.com/apache/flink/pull/6519#issuecomment-415443589
 
 
   At least according to draft of 2008 SQL Standard, this PR violates it 
(paragraph 9.3, section 3) iii) 3) says that the result type in this case 
should be `CHAR(N)` where N is the maximum of the inputs. If any of the inputs 
is `VARCHAR(x)`, the result should be also `VARCHAR`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-10203) Support truncate method for old Hadoop versions in HadoopRecoverableFsDataOutputStream

2018-08-23 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10203?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-10203:
---
Labels: pull-request-available  (was: )

> Support truncate method for old Hadoop versions in 
> HadoopRecoverableFsDataOutputStream
> --
>
> Key: FLINK-10203
> URL: https://issues.apache.org/jira/browse/FLINK-10203
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, filesystem-connector
>Affects Versions: 1.6.0, 1.6.1, 1.7.0
>Reporter: Artsem Semianenka
>Priority: Major
>  Labels: pull-request-available
>
> New StreamingFileSink ( introduced in 1.6 Flink version ) use 
> HadoopRecoverableFsDataOutputStream wrapper to write data in HDFS.
> HadoopRecoverableFsDataOutputStream is an wrapper on FSDataOutputStream to 
> have an ability to restore from certain point of file after failure and 
> continue write data. To achive this recover functionality the 
> HadoopRecoverableFsDataOutputStream use "truncate" method which was 
> introduced only in Hadoop 2.7 .
> Unfortently there are a few official Hadoop distibutives which latast version 
> still use Hadoop 2.6 (This distibutives: Cloudera, Pivotal HD ). As the 
> result Flink can't work with this distibutives.
> Flink declares that supported Hadoop from version 2.4.0 upwards 
> (https://ci.apache.org/projects/flink/flink-docs-release-1.6/start/building.html#hadoop-versions)
> I guess we should emulate the functionality of "truncate" method for older 
> Hadoop versions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10203) Support truncate method for old Hadoop versions in HadoopRecoverableFsDataOutputStream

2018-08-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10203?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16590284#comment-16590284
 ] 

ASF GitHub Bot commented on FLINK-10203:


art4ul opened a new pull request #6608: [FLINK-10203]Support truncate method 
for old Hadoop versions in HadoopRecoverableFsDataOutputStream
URL: https://github.com/apache/flink/pull/6608
 
 
   ## What is the purpose of the change
   
   New StreamingFileSink ( introduced in 1.6 Flink version ) use 
HadoopRecoverableFsDataOutputStream wrapper to write data in HDFS.
   
   HadoopRecoverableFsDataOutputStream is an wrapper on FSDataOutputStream to 
have an ability to restore from certain point of file after failure and 
continue write data. To achive this recover functionality the 
HadoopRecoverableFsDataOutputStream use "truncate" method which was introduced 
only in Hadoop 2.7 .
   
   Unfortently there are a few official Hadoop distibutives which latast 
version still use Hadoop 2.6 (This distibutives: Cloudera, Pivotal HD ). As the 
result Flink can't work with this distibutives.
   
   Flink declares that supported Hadoop from version 2.4.0 upwards 
(https://ci.apache.org/projects/flink/flink-docs-release-1.6/start/building.html#hadoop-versions)
   
   I guess we should emulate the functionality of "truncate" method for older 
Hadoop versions.
   
   This is possible fix . I would like to start discussion here. 
   The fix of this issue is vital for us as Hadoop 2.6 users.
   
   ## Brief change log
   
 - Add new abstraction TruncateManager
 - Add Implementation for old Hadoop version ( LegacyTruncateManager)
 - Add Implementation for Hadoop 2.7 and upwards
   
   
   ## Verifying this change
   
   This change contains only possible solution but currently without any test 
coverage.
   Tests will be added after final consensus about implementation .
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: ( no)
 - The runtime per-record code paths (performance sensitive): (don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
 - The S3 file system connector: ( no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no, but I think it 
should be documented)
 - If yes, how is the feature documented? (not documented yet)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support truncate method for old Hadoop versions in 
> HadoopRecoverableFsDataOutputStream
> --
>
> Key: FLINK-10203
> URL: https://issues.apache.org/jira/browse/FLINK-10203
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, filesystem-connector
>Affects Versions: 1.6.0, 1.6.1, 1.7.0
>Reporter: Artsem Semianenka
>Priority: Major
>  Labels: pull-request-available
>
> New StreamingFileSink ( introduced in 1.6 Flink version ) use 
> HadoopRecoverableFsDataOutputStream wrapper to write data in HDFS.
> HadoopRecoverableFsDataOutputStream is an wrapper on FSDataOutputStream to 
> have an ability to restore from certain point of file after failure and 
> continue write data. To achive this recover functionality the 
> HadoopRecoverableFsDataOutputStream use "truncate" method which was 
> introduced only in Hadoop 2.7 .
> Unfortently there are a few official Hadoop distibutives which latast version 
> still use Hadoop 2.6 (This distibutives: Cloudera, Pivotal HD ). As the 
> result Flink can't work with this distibutives.
> Flink declares that supported Hadoop from version 2.4.0 upwards 
> (https://ci.apache.org/projects/flink/flink-docs-release-1.6/start/building.html#hadoop-versions)
> I guess we should emulate the functionality of "truncate" method for older 
> Hadoop versions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] art4ul opened a new pull request #6608: [FLINK-10203]Support truncate method for old Hadoop versions in HadoopRecoverableFsDataOutputStream

2018-08-23 Thread GitBox
art4ul opened a new pull request #6608: [FLINK-10203]Support truncate method 
for old Hadoop versions in HadoopRecoverableFsDataOutputStream
URL: https://github.com/apache/flink/pull/6608
 
 
   ## What is the purpose of the change
   
   New StreamingFileSink ( introduced in 1.6 Flink version ) use 
HadoopRecoverableFsDataOutputStream wrapper to write data in HDFS.
   
   HadoopRecoverableFsDataOutputStream is an wrapper on FSDataOutputStream to 
have an ability to restore from certain point of file after failure and 
continue write data. To achive this recover functionality the 
HadoopRecoverableFsDataOutputStream use "truncate" method which was introduced 
only in Hadoop 2.7 .
   
   Unfortently there are a few official Hadoop distibutives which latast 
version still use Hadoop 2.6 (This distibutives: Cloudera, Pivotal HD ). As the 
result Flink can't work with this distibutives.
   
   Flink declares that supported Hadoop from version 2.4.0 upwards 
(https://ci.apache.org/projects/flink/flink-docs-release-1.6/start/building.html#hadoop-versions)
   
   I guess we should emulate the functionality of "truncate" method for older 
Hadoop versions.
   
   This is possible fix . I would like to start discussion here. 
   The fix of this issue is vital for us as Hadoop 2.6 users.
   
   ## Brief change log
   
 - Add new abstraction TruncateManager
 - Add Implementation for old Hadoop version ( LegacyTruncateManager)
 - Add Implementation for Hadoop 2.7 and upwards
   
   
   ## Verifying this change
   
   This change contains only possible solution but currently without any test 
coverage.
   Tests will be added after final consensus about implementation .
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: ( no)
 - The runtime per-record code paths (performance sensitive): (don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
 - The S3 file system connector: ( no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no, but I think it 
should be documented)
 - If yes, how is the feature documented? (not documented yet)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10050) Support 'allowedLateness' in CoGroupedStreams

2018-08-23 Thread Aljoscha Krettek (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16590245#comment-16590245
 ] 

Aljoscha Krettek commented on FLINK-10050:
--

I think {{DataStream.join()}} and {{DataStream.coGroup()}} are a bit of a dead 
end because they don't allow getting any information about what window the 
result is in, or other meta information about the window that you would get 
from a {{ProcessWindowFunction}}.

I'm interested if you have a use case for this, where you don't need to know 
what window your result is in. 

> Support 'allowedLateness' in CoGroupedStreams
> -
>
> Key: FLINK-10050
> URL: https://issues.apache.org/jira/browse/FLINK-10050
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.5.1, 1.6.0
>Reporter: eugen yushin
>Priority: Major
>  Labels: ready-to-commit, windows
>
> WindowedStream has a support of 'allowedLateness' feature, while 
> CoGroupedStreams are not. At the mean time, WindowedStream is an inner part 
> of CoGroupedStreams and all main functionality (like evictor/trigger/...) is 
> simply delegated to WindowedStream.
> There's no chance to operate with late arriving data from previous steps in 
> cogroups (and joins). Consider the following flow:
> a. read data from source1 -> aggregate data with allowed lateness
> b. read data from source2 -> aggregate data with allowed lateness
> c. cogroup/join streams a and b, and compare aggregated values
> Step c doesn't accept any late data from steps a/b due to lack of 
> `allowedLateness` API call in CoGroupedStreams.java.
> Scope: add method `WithWindow.allowedLateness` to Java API 
> (flink-streaming-java) and extend scala API (flink-streaming-scala).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9559) The type of a union of CHAR columns of different lengths should be VARCHAR

2018-08-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16590230#comment-16590230
 ] 

ASF GitHub Bot commented on FLINK-9559:
---

pnowojski commented on issue #6519: [FLINK-9559] [table] The type of a union of 
CHAR columns of different lengths should be VARCHAR
URL: https://github.com/apache/flink/pull/6519#issuecomment-415420332
 
 
   What does the SQL standard have to say about this? MySQL is not the most 
standard compliant db out there.
   
   Maybe the problem here is that string literals are of `CHAR(N)` type, while 
we do not support `CHAR` anywhere else? Maybe our literals should also have a 
type of `VARCHAR(N)`? That would also solve this problem.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> The type of a union of CHAR columns of different lengths should be VARCHAR
> --
>
> Key: FLINK-9559
> URL: https://issues.apache.org/jira/browse/FLINK-9559
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>
> Currently, If the case-when expression has two branches which return string 
> literal, redundant white spaces will be appended to the short string literal. 
> For example, for the sql: case 1 when 1 then 'a' when 2 then 'bcd' end, the 
> return value will be 'a ' of CHAR(3) instead of 'a'.
> Although, this follows the behavior in strict SQL standard mode(SQL:2003). We 
> should get the pragmatic return type in a real scenario without blank-padded. 
> Happily, this problem has been fixed by 
> [CALCITE-2321|https://issues.apache.org/jira/browse/CALCITE-2321], we can 
> upgrade calcite to the next release(1.17.0) and override 
> {{RelDataTypeSystem}} in flink to configure the return type, i.e., making 
> {{shouldConvertRaggedUnionTypesToVarying()}} return true.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] pnowojski commented on issue #6519: [FLINK-9559] [table] The type of a union of CHAR columns of different lengths should be VARCHAR

2018-08-23 Thread GitBox
pnowojski commented on issue #6519: [FLINK-9559] [table] The type of a union of 
CHAR columns of different lengths should be VARCHAR
URL: https://github.com/apache/flink/pull/6519#issuecomment-415420332
 
 
   What does the SQL standard have to say about this? MySQL is not the most 
standard compliant db out there.
   
   Maybe the problem here is that string literals are of `CHAR(N)` type, while 
we do not support `CHAR` anywhere else? Maybe our literals should also have a 
type of `VARCHAR(N)`? That would also solve this problem.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-10203) Support truncate method for old Hadoop versions in HadoopRecoverableFsDataOutputStream

2018-08-23 Thread Artsem Semianenka (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10203?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Artsem Semianenka updated FLINK-10203:
--
Summary: Support truncate method for old Hadoop versions in 
HadoopRecoverableFsDataOutputStream  (was: HadoopRecoverableFsDataOutputStream 
does not support Hadoop 2.6)

> Support truncate method for old Hadoop versions in 
> HadoopRecoverableFsDataOutputStream
> --
>
> Key: FLINK-10203
> URL: https://issues.apache.org/jira/browse/FLINK-10203
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, filesystem-connector
>Affects Versions: 1.6.0, 1.6.1, 1.7.0
>Reporter: Artsem Semianenka
>Priority: Major
>
> New StreamingFileSink ( introduced in 1.6 Flink version ) use 
> HadoopRecoverableFsDataOutputStream wrapper to write data in HDFS.
> HadoopRecoverableFsDataOutputStream is an wrapper on FSDataOutputStream to 
> have an ability to restore from certain point of file after failure and 
> continue write data. To achive this recover functionality the 
> HadoopRecoverableFsDataOutputStream use "truncate" method which was 
> introduced only in Hadoop 2.7 .
> Unfortently there are a few official Hadoop distibutives which latast version 
> still use Hadoop 2.6 (This distibutives: Cloudera, Pivotal HD ). As the 
> result Flink can't work with this distibutives.
> Flink declares that supported Hadoop from version 2.4.0 upwards 
> (https://ci.apache.org/projects/flink/flink-docs-release-1.6/start/building.html#hadoop-versions)
> I guess we should emulate the functionality of "truncate" method for older 
> Hadoop versions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10203) HadoopRecoverableFsDataOutputStream does not support Hadoop 2.6

2018-08-23 Thread Artsem Semianenka (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10203?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Artsem Semianenka updated FLINK-10203:
--
Summary: HadoopRecoverableFsDataOutputStream does not support Hadoop 2.6  
(was: HadoopRecoverableWriter does not support Hadoop )

> HadoopRecoverableFsDataOutputStream does not support Hadoop 2.6
> ---
>
> Key: FLINK-10203
> URL: https://issues.apache.org/jira/browse/FLINK-10203
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, filesystem-connector
>Affects Versions: 1.6.0, 1.6.1, 1.7.0
>Reporter: Artsem Semianenka
>Priority: Major
>
> New StreamingFileSink ( introduced in 1.6 Flink version ) use 
> HadoopRecoverableFsDataOutputStream wrapper to write data in HDFS.
> HadoopRecoverableFsDataOutputStream is an wrapper on FSDataOutputStream to 
> have an ability to restore from certain point of file after failure and 
> continue write data. To achive this recover functionality the 
> HadoopRecoverableFsDataOutputStream use "truncate" method which was 
> introduced only in Hadoop 2.7 .
> Unfortently there are a few official Hadoop distibutives which latast version 
> still use Hadoop 2.6 (This distibutives: Cloudera, Pivotal HD ). As the 
> result Flink can't work with this distibutives.
> Flink declares that supported Hadoop from version 2.4.0 upwards 
> (https://ci.apache.org/projects/flink/flink-docs-release-1.6/start/building.html#hadoop-versions)
> I guess we should emulate the functionality of "truncate" method for older 
> Hadoop versions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10203) HadoopRecoverableWriter does not support Hadoop

2018-08-23 Thread Artsem Semianenka (JIRA)
Artsem Semianenka created FLINK-10203:
-

 Summary: HadoopRecoverableWriter does not support Hadoop 
 Key: FLINK-10203
 URL: https://issues.apache.org/jira/browse/FLINK-10203
 Project: Flink
  Issue Type: Bug
  Components: DataStream API, filesystem-connector
Affects Versions: 1.6.0, 1.6.1, 1.7.0
Reporter: Artsem Semianenka


New StreamingFileSink ( introduced in 1.6 Flink version ) use 
HadoopRecoverableFsDataOutputStream wrapper to write data in HDFS.

HadoopRecoverableFsDataOutputStream is an wrapper on FSDataOutputStream to have 
an ability to restore from certain point of file after failure and continue 
write data. To achive this recover functionality the 
HadoopRecoverableFsDataOutputStream use "truncate" method which was introduced 
only in Hadoop 2.7 .

Unfortently there are a few official Hadoop distibutives which latast version 
still use Hadoop 2.6 (This distibutives: Cloudera, Pivotal HD ). As the result 
Flink can't work with this distibutives.

Flink declares that supported Hadoop from version 2.4.0 upwards 
(https://ci.apache.org/projects/flink/flink-docs-release-1.6/start/building.html#hadoop-versions)

I guess we should emulate the functionality of "truncate" method for older 
Hadoop versions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] FredTing commented on a change in pull request #6105: [FLINK-8500] Get the timestamp of the Kafka message from kafka consumer

2018-08-23 Thread GitBox
FredTing commented on a change in pull request #6105: [FLINK-8500] Get the 
timestamp of the Kafka message from kafka consumer
URL: https://github.com/apache/flink/pull/6105#discussion_r212309444
 
 

 ##
 File path: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java
 ##
 @@ -45,6 +45,22 @@
 */
T deserialize(byte[] messageKey, byte[] message, String topic, int 
partition, long offset) throws IOException;
 
+   /**
+* Deserializes the byte message.
+*
+* @param messageKey the key as a byte array (null if no key has been 
set).
+* @param message The message, as a byte array (null if the message was 
empty or deleted).
+* @param partition The partition the message has originated from.
+* @param offset the offset of the message in the original source (for 
example the Kafka offset).
+* @param timestamp the timestamp of the consumer record
+* @param timestampType The timestamp type, could be NO_TIMESTAMP, 
CREATE_TIME or INGEST_TIME.
+*
+* @return The deserialized message as an object (null if the message 
cannot be deserialized).
+*/
+   default T deserialize(byte[] messageKey, byte[] message, String topic, 
int partition, long offset, long timestamp, TimestampType timestampType) throws 
IOException {
 
 Review comment:
   I've pushed the changes as we discussed above.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-08-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16590215#comment-16590215
 ] 

ASF GitHub Bot commented on FLINK-8500:
---

FredTing commented on a change in pull request #6105: [FLINK-8500] Get the 
timestamp of the Kafka message from kafka consumer
URL: https://github.com/apache/flink/pull/6105#discussion_r212309444
 
 

 ##
 File path: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java
 ##
 @@ -45,6 +45,22 @@
 */
T deserialize(byte[] messageKey, byte[] message, String topic, int 
partition, long offset) throws IOException;
 
+   /**
+* Deserializes the byte message.
+*
+* @param messageKey the key as a byte array (null if no key has been 
set).
+* @param message The message, as a byte array (null if the message was 
empty or deleted).
+* @param partition The partition the message has originated from.
+* @param offset the offset of the message in the original source (for 
example the Kafka offset).
+* @param timestamp the timestamp of the consumer record
+* @param timestampType The timestamp type, could be NO_TIMESTAMP, 
CREATE_TIME or INGEST_TIME.
+*
+* @return The deserialized message as an object (null if the message 
cannot be deserialized).
+*/
+   default T deserialize(byte[] messageKey, byte[] message, String topic, 
int partition, long offset, long timestamp, TimestampType timestampType) throws 
IOException {
 
 Review comment:
   I've pushed the changes as we discussed above.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10198) Set Env object in DBOptions for RocksDB

2018-08-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10198?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16590108#comment-16590108
 ] 

ASF GitHub Bot commented on FLINK-10198:


NicoK commented on issue #6603: [FLINK-10198][state] Set Env object in 
DBOptions for RocksDB
URL: https://github.com/apache/flink/pull/6603#issuecomment-415383492
 
 
   In general, I like this idea, because before we would potentially end up 
with a log of additional RocksDB threads that might not be needed - they are 
all probably blocking on the same disk anyway!
   
   Did you measure the performance changes in some scenarios (1 RocksDB 
instances vs. 10 or even more) to verify that we are actually not making things 
worse with this change?
   
   Previously, with (for example) `FLASH_SSD_OPTIMIZED`, every RocksDB instance 
was using 4 threads and now all together will use 4?
   -> Should we tweak the default settings for the number of threads, also in 
the other profiles?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Set Env object in DBOptions for RocksDB
> ---
>
> Key: FLINK-10198
> URL: https://issues.apache.org/jira/browse/FLINK-10198
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.7.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>
> I think we should consider to always set a default environment when we create 
> the DBOptions.
> See https://github.com/facebook/rocksdb/wiki/rocksdb-basics:
> *Support for Multiple Embedded Databases in the same process*
> A common use-case for RocksDB is that applications inherently partition their 
> data set into logical partitions or shards. This technique benefits 
> application load balancing and fast recovery from faults. This means that a 
> single server process should be able to operate multiple RocksDB databases 
> simultaneously. This is done via an environment object named Env. Among other 
> things, a thread pool is associated with an Env. If applications want to 
> share a common thread pool (for background compactions) among multiple 
> database instances, then it should use the same Env object for opening those 
> databases.
> Similarly, multiple database instances may share the same block cache.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] NicoK commented on issue #6603: [FLINK-10198][state] Set Env object in DBOptions for RocksDB

2018-08-23 Thread GitBox
NicoK commented on issue #6603: [FLINK-10198][state] Set Env object in 
DBOptions for RocksDB
URL: https://github.com/apache/flink/pull/6603#issuecomment-415383492
 
 
   In general, I like this idea, because before we would potentially end up 
with a log of additional RocksDB threads that might not be needed - they are 
all probably blocking on the same disk anyway!
   
   Did you measure the performance changes in some scenarios (1 RocksDB 
instances vs. 10 or even more) to verify that we are actually not making things 
worse with this change?
   
   Previously, with (for example) `FLASH_SSD_OPTIMIZED`, every RocksDB instance 
was using 4 threads and now all together will use 4?
   -> Should we tweak the default settings for the number of threads, also in 
the other profiles?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9391) Support UNNEST in Table API

2018-08-23 Thread Mikhail (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16590055#comment-16590055
 ] 

Mikhail commented on FLINK-9391:


Hello [~twalthr],

As I know, [~ipatina] is on another project. So I'd like to continue Alina's 
work.

I already found out that if we want to select with unnest from table:
{code:java}
@Test
def testUnnest(): Unit = {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env, config)

val data = new mutable.MutableList[(Int, Long, Array[String])]
data.+=((1, 1L, Array("Hi", "w")))
data.+=((2, 2L, Array("Hello", "k")))
data.+=((3, 2L, Array("Hello world", "x")))
val input = env.fromCollection(Random.shuffle(data)).toTable(tEnv).as('a, 'b, 
'c)

val unnested = input.select('a, 'b, 'c.unnest())

val actual = unnested.toDataSet[Row].collect()

val expected = List("1,Hi", "1,w", "2,Hello", "2,k", "3,Hello world", 
"3,x").mkString("\n")
TestBaseUtils.compareResultAsText(actual.asJava, expected)
}
{code}
Then there will be generated code for a function in DataSetCalc#translateToPlan 
for processing input data. That code will be compiled and executed.

That code will be generated using CommonCalc#generateFunction which will 
process each element in a Row separately.

So final output for a Row in a table will contain single Row.

But for our case with UNNEST we need to have another flow.

[~twalthr], could you please suggest where to look at?

> Support UNNEST in Table API
> ---
>
> Key: FLINK-9391
> URL: https://issues.apache.org/jira/browse/FLINK-9391
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Alina Ipatina
>Priority: Major
>
> FLINK-6033 introduced the UNNEST function for SQL. We should also add this 
> function to the Table API to keep the APIs in sync. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] zentol closed pull request #6607: [docs] Remove the redundant tag

2018-08-23 Thread GitBox
zentol closed pull request #6607: [docs] Remove the redundant  tag
URL: https://github.com/apache/flink/pull/6607
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/monitoring/metrics.md b/docs/monitoring/metrics.md
index f1afa4e93b2..a93853c14dc 100644
--- a/docs/monitoring/metrics.md
+++ b/docs/monitoring/metrics.md
@@ -1223,7 +1223,7 @@ Thus, in order to infer the metric identifier:
   Histogram
 
 
-  Task
+  Task
   numBytesInLocal
   The total number of bytes this task has read from a local 
source.
   Counter
@@ -1244,7 +1244,6 @@ Thus, in order to infer the metric identifier:
   Meter
 
 
-  Task
   numBuffersInLocal
   The total number of network buffers this task has read from a local 
source.
   Counter


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9713) Support versioned joins in planning phase

2018-08-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16590035#comment-16590035
 ] 

ASF GitHub Bot commented on FLINK-9713:
---

pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] 
Support processing time versioned joins
URL: https://github.com/apache/flink/pull/6299#discussion_r212257550
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TableVersionFunction.scala
 ##
 @@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions
+
+import java.sql.Timestamp
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.table.api.Table
+import org.apache.flink.types.Row
+
+/**
+  * Class represnting table version functions over some history table.
+  */
+class TableVersionFunction private(
+@transient private val _table: Table,
+private[flink] val versionField: String,
+private[flink] val primaryKey: String,
+private[flink] val resultType: RowTypeInfo,
+private[flink] val isProctime: Boolean)
+  extends TableFunction[Row] {
+
+  def eval(row: Timestamp): Unit = {
 
 Review comment:
   Without it I'm getting:
   ```
   org.apache.flink.table.api.ValidationException: Function class 
'org.apache.flink.table.functions.TemporalTableFunction' does not implement at 
least one method named 'eval' which is public, not abstract and (in case of 
table functions) not static.
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support versioned joins in planning phase
> -
>
> Key: FLINK-9713
> URL: https://issues.apache.org/jira/browse/FLINK-9713
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>
> Queries like:
> {code:java}
> SELECT 
>   o.amount * r.rate 
> FROM 
>   Orders AS o, 
>   LATERAL TABLE (Rates(o.rowtime)) AS r 
> WHERE o.currency = r.currency{code}
> should evaluate to valid plan with versioned joins plan node.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] Support processing time versioned joins

2018-08-23 Thread GitBox
pnowojski commented on a change in pull request #6299: [FLINK-9713][table][sql] 
Support processing time versioned joins
URL: https://github.com/apache/flink/pull/6299#discussion_r212257550
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TableVersionFunction.scala
 ##
 @@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions
+
+import java.sql.Timestamp
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.table.api.Table
+import org.apache.flink.types.Row
+
+/**
+  * Class represnting table version functions over some history table.
+  */
+class TableVersionFunction private(
+@transient private val _table: Table,
+private[flink] val versionField: String,
+private[flink] val primaryKey: String,
+private[flink] val resultType: RowTypeInfo,
+private[flink] val isProctime: Boolean)
+  extends TableFunction[Row] {
+
+  def eval(row: Timestamp): Unit = {
 
 Review comment:
   Without it I'm getting:
   ```
   org.apache.flink.table.api.ValidationException: Function class 
'org.apache.flink.table.functions.TemporalTableFunction' does not implement at 
least one method named 'eval' which is public, not abstract and (in case of 
table functions) not static.
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-08-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16590017#comment-16590017
 ] 

ASF GitHub Bot commented on FLINK-8500:
---

tzulitai commented on a change in pull request #6105: [FLINK-8500] Get the 
timestamp of the Kafka message from kafka consumer
URL: https://github.com/apache/flink/pull/6105#discussion_r212252904
 
 

 ##
 File path: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java
 ##
 @@ -45,6 +45,22 @@
 */
T deserialize(byte[] messageKey, byte[] message, String topic, int 
partition, long offset) throws IOException;
 
+   /**
+* Deserializes the byte message.
+*
+* @param messageKey the key as a byte array (null if no key has been 
set).
+* @param message The message, as a byte array (null if the message was 
empty or deleted).
+* @param partition The partition the message has originated from.
+* @param offset the offset of the message in the original source (for 
example the Kafka offset).
+* @param timestamp the timestamp of the consumer record
+* @param timestampType The timestamp type, could be NO_TIMESTAMP, 
CREATE_TIME or INGEST_TIME.
+*
+* @return The deserialized message as an object (null if the message 
cannot be deserialized).
+*/
+   default T deserialize(byte[] messageKey, byte[] message, String topic, 
int partition, long offset, long timestamp, TimestampType timestampType) throws 
IOException {
 
 Review comment:
   I would be ok with proceeding with the above.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] tzulitai commented on a change in pull request #6105: [FLINK-8500] Get the timestamp of the Kafka message from kafka consumer

2018-08-23 Thread GitBox
tzulitai commented on a change in pull request #6105: [FLINK-8500] Get the 
timestamp of the Kafka message from kafka consumer
URL: https://github.com/apache/flink/pull/6105#discussion_r212252904
 
 

 ##
 File path: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java
 ##
 @@ -45,6 +45,22 @@
 */
T deserialize(byte[] messageKey, byte[] message, String topic, int 
partition, long offset) throws IOException;
 
+   /**
+* Deserializes the byte message.
+*
+* @param messageKey the key as a byte array (null if no key has been 
set).
+* @param message The message, as a byte array (null if the message was 
empty or deleted).
+* @param partition The partition the message has originated from.
+* @param offset the offset of the message in the original source (for 
example the Kafka offset).
+* @param timestamp the timestamp of the consumer record
+* @param timestampType The timestamp type, could be NO_TIMESTAMP, 
CREATE_TIME or INGEST_TIME.
+*
+* @return The deserialized message as an object (null if the message 
cannot be deserialized).
+*/
+   default T deserialize(byte[] messageKey, byte[] message, String topic, 
int partition, long offset, long timestamp, TimestampType timestampType) throws 
IOException {
 
 Review comment:
   I would be ok with proceeding with the above.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9061) add entropy to s3 path for better scalability

2018-08-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16590003#comment-16590003
 ] 

ASF GitHub Bot commented on FLINK-9061:
---

StephanEwen commented on issue #6604: [FLINK-9061] Optionally add entropy to 
checkpoint paths better S3 scalability
URL: https://github.com/apache/flink/pull/6604#issuecomment-415362329
 
 
   Good points - these were actually problems in the original PR as well...
   Will try to come up with a solution for that...


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> add entropy to s3 path for better scalability
> -
>
> Key: FLINK-9061
> URL: https://issues.apache.org/jira/browse/FLINK-9061
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.4.2
>Reporter: Jamie Grier
>Assignee: Indrajit Roychoudhury
>Priority: Critical
>  Labels: pull-request-available
>
> I think we need to modify the way we write checkpoints to S3 for high-scale 
> jobs (those with many total tasks).  The issue is that we are writing all the 
> checkpoint data under a common key prefix.  This is the worst case scenario 
> for S3 performance since the key is used as a partition key.
>  
> In the worst case checkpoints fail with a 500 status code coming back from S3 
> and an internal error type of TooBusyException.
>  
> One possible solution would be to add a hook in the Flink filesystem code 
> that allows me to "rewrite" paths.  For example say I have the checkpoint 
> directory set to:
>  
> s3://bucket/flink/checkpoints
>  
> I would hook that and rewrite that path to:
>  
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
> path
>  
> This would distribute the checkpoint write load around the S3 cluster evenly.
>  
> For reference: 
> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>  
> Any other people hit this issue?  Any other ideas for solutions?  This is a 
> pretty serious problem for people trying to checkpoint to S3.
>  
> -Jamie
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] StephanEwen commented on issue #6604: [FLINK-9061] Optionally add entropy to checkpoint paths better S3 scalability

2018-08-23 Thread GitBox
StephanEwen commented on issue #6604: [FLINK-9061] Optionally add entropy to 
checkpoint paths better S3 scalability
URL: https://github.com/apache/flink/pull/6604#issuecomment-415362329
 
 
   Good points - these were actually problems in the original PR as well...
   Will try to come up with a solution for that...


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] kisimple opened a new pull request #6607: [docs] Remove the redundant tag

2018-08-23 Thread GitBox
kisimple opened a new pull request #6607: [docs] Remove the redundant  tag
URL: https://github.com/apache/flink/pull/6607
 
 
   Remove the redundant `` tag in 
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#io


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-08-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16589904#comment-16589904
 ] 

ASF GitHub Bot commented on FLINK-8500:
---

aljoscha commented on a change in pull request #6105: [FLINK-8500] Get the 
timestamp of the Kafka message from kafka consumer
URL: https://github.com/apache/flink/pull/6105#discussion_r21385
 
 

 ##
 File path: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java
 ##
 @@ -45,6 +45,22 @@
 */
T deserialize(byte[] messageKey, byte[] message, String topic, int 
partition, long offset) throws IOException;
 
+   /**
+* Deserializes the byte message.
+*
+* @param messageKey the key as a byte array (null if no key has been 
set).
+* @param message The message, as a byte array (null if the message was 
empty or deleted).
+* @param partition The partition the message has originated from.
+* @param offset the offset of the message in the original source (for 
example the Kafka offset).
+* @param timestamp the timestamp of the consumer record
+* @param timestampType The timestamp type, could be NO_TIMESTAMP, 
CREATE_TIME or INGEST_TIME.
+*
+* @return The deserialized message as an object (null if the message 
cannot be deserialized).
+*/
+   default T deserialize(byte[] messageKey, byte[] message, String topic, 
int partition, long offset, long timestamp, TimestampType timestampType) throws 
IOException {
 
 Review comment:
   Yes, to unblock this I thing we can go with this approach, basically the 
schema becomes this:
   ```
   @PublicEvolving
   public interface KeyedDeserializationSchema extends Serializable, 
ResultTypeQueryable {
   
   @Deprecated
default T deserialize(byte[] messageKey, byte[] message, String topic, 
int partition, long offset) {
   throw new RuntimeException("blammo");
   }
   
   default T deserialize(byte[] messageKey, byte[] message, String 
topic, int partition, long offset, long timestamp) {
   return deserialize(/* call the other method */);
   } 
boolean isEndOfStream(T nextElement);
   }
   ```
   
   With this, if you have an existing implementation of 
`KeyedDeserializationSchema` it will continue to work without any changes. If 
you implement a new one you have to implement one of the methods, otherwise the 
exception is thrown. And all Flink code only calls the version that takes the 
timestamp.
   
   What do you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] aljoscha commented on a change in pull request #6105: [FLINK-8500] Get the timestamp of the Kafka message from kafka consumer

2018-08-23 Thread GitBox
aljoscha commented on a change in pull request #6105: [FLINK-8500] Get the 
timestamp of the Kafka message from kafka consumer
URL: https://github.com/apache/flink/pull/6105#discussion_r21385
 
 

 ##
 File path: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java
 ##
 @@ -45,6 +45,22 @@
 */
T deserialize(byte[] messageKey, byte[] message, String topic, int 
partition, long offset) throws IOException;
 
+   /**
+* Deserializes the byte message.
+*
+* @param messageKey the key as a byte array (null if no key has been 
set).
+* @param message The message, as a byte array (null if the message was 
empty or deleted).
+* @param partition The partition the message has originated from.
+* @param offset the offset of the message in the original source (for 
example the Kafka offset).
+* @param timestamp the timestamp of the consumer record
+* @param timestampType The timestamp type, could be NO_TIMESTAMP, 
CREATE_TIME or INGEST_TIME.
+*
+* @return The deserialized message as an object (null if the message 
cannot be deserialized).
+*/
+   default T deserialize(byte[] messageKey, byte[] message, String topic, 
int partition, long offset, long timestamp, TimestampType timestampType) throws 
IOException {
 
 Review comment:
   Yes, to unblock this I thing we can go with this approach, basically the 
schema becomes this:
   ```
   @PublicEvolving
   public interface KeyedDeserializationSchema extends Serializable, 
ResultTypeQueryable {
   
   @Deprecated
default T deserialize(byte[] messageKey, byte[] message, String topic, 
int partition, long offset) {
   throw new RuntimeException("blammo");
   }
   
   default T deserialize(byte[] messageKey, byte[] message, String 
topic, int partition, long offset, long timestamp) {
   return deserialize(/* call the other method */);
   } 
boolean isEndOfStream(T nextElement);
   }
   ```
   
   With this, if you have an existing implementation of 
`KeyedDeserializationSchema` it will continue to work without any changes. If 
you implement a new one you have to implement one of the methods, otherwise the 
exception is thrown. And all Flink code only calls the version that takes the 
timestamp.
   
   What do you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-10202) Enable configuration for state.checkpoint.dir in StreamExecutionEnvironment

2018-08-23 Thread buptljy (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10202?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

buptljy updated FLINK-10202:

Description: 
Usually we can set state.checkpoint.dir in flink-conf.yaml, but sometimes we 
run a flink job locally, and we're not able to set state.checkpoint.dir for 
background wrapped cluster, which will cause 
{code:java}
throw new IllegalStateException("CheckpointConfig says to persist periodic " +
  "checkpoints, but no checkpoint directory has been configured. You can " +
  "configure configure one via key '" + 
ConfigConstants.CHECKPOINTS_DIRECTORY_KEY + "'.");
{code}
I wonder if we could provide a public method in *StreamExecutionEnvironment* so 
that developers can use it to set state.checkpoint.dir for job.

  was:
Usually we can set state.checkpoint.dir in flink-conf.yaml, but sometimes we 
run a flink job locally, and we're not able to set state.checkpoint.dir, which 
will cause 
{code:java}
throw new IllegalStateException("CheckpointConfig says to persist periodic " +
  "checkpoints, but no checkpoint directory has been configured. You can " +
  "configure configure one via key '" + 
ConfigConstants.CHECKPOINTS_DIRECTORY_KEY + "'.");
{code}
I wonder if we could provide a public method in *StreamExecutionEnvironment* so 
that developers can use it to set state.checkpoint.dir for job.


> Enable configuration for state.checkpoint.dir in StreamExecutionEnvironment
> ---
>
> Key: FLINK-10202
> URL: https://issues.apache.org/jira/browse/FLINK-10202
> Project: Flink
>  Issue Type: Improvement
>Reporter: buptljy
>Priority: Major
>
> Usually we can set state.checkpoint.dir in flink-conf.yaml, but sometimes we 
> run a flink job locally, and we're not able to set state.checkpoint.dir for 
> background wrapped cluster, which will cause 
> {code:java}
> throw new IllegalStateException("CheckpointConfig says to persist periodic " +
>   "checkpoints, but no checkpoint directory has been configured. You can 
> " +
>   "configure configure one via key '" + 
> ConfigConstants.CHECKPOINTS_DIRECTORY_KEY + "'.");
> {code}
> I wonder if we could provide a public method in *StreamExecutionEnvironment* 
> so that developers can use it to set state.checkpoint.dir for job.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10202) Enable configuration for state.checkpoint.dir in StreamExecutionEnvironment

2018-08-23 Thread buptljy (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10202?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

buptljy updated FLINK-10202:

Summary: Enable configuration for state.checkpoint.dir in 
StreamExecutionEnvironment  (was: Enable configuration for state.checkpoint.dir 
in environment)

> Enable configuration for state.checkpoint.dir in StreamExecutionEnvironment
> ---
>
> Key: FLINK-10202
> URL: https://issues.apache.org/jira/browse/FLINK-10202
> Project: Flink
>  Issue Type: Improvement
>Reporter: buptljy
>Priority: Major
>
> Usually we can set state.checkpoint.dir in flink-conf.yaml, but sometimes we 
> run a flink job locally, and we're not able to set state.checkpoint.dir, 
> which will cause 
> {code:java}
> throw new IllegalStateException("CheckpointConfig says to persist periodic " +
>   "checkpoints, but no checkpoint directory has been configured. You can 
> " +
>   "configure configure one via key '" + 
> ConfigConstants.CHECKPOINTS_DIRECTORY_KEY + "'.");
> {code}
> I wonder if we could provide a public method in *StreamExecutionEnvironment* 
> so that developers can use it to set state.checkpoint.dir for job.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10202) Enable configuration for state.checkpoint.dir with environment

2018-08-23 Thread buptljy (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10202?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

buptljy updated FLINK-10202:

Description: 
Usually we can set state.checkpoint.dir in flink-conf.yaml, but sometimes we 
run a flink job locally, and we're not able to set state.checkpoint.dir, which 
will cause 
{code:java}
throw new IllegalStateException("CheckpointConfig says to persist periodic " +
  "checkpoints, but no checkpoint directory has been configured. You can " +
  "configure configure one via key '" + 
ConfigConstants.CHECKPOINTS_DIRECTORY_KEY + "'.");
{code}
I wonder if we could provide a public method in *StreamExecutionEnvironment* so 
that developers can use it to set state.checkpoint.dir for job.

  was:
Usually we can set state.checkpoint.dir in flink-conf.yaml, but sometimes we 
run a flink job locally, and we're not able to set state.checkpoint.dir, which 
will cause 
{code:java}
throw new IllegalStateException("CheckpointConfig says to persist periodic " +
  "checkpoints, but no checkpoint directory has been configured. You can " +
  "configure configure one via key '" + 
ConfigConstants.CHECKPOINTS_DIRECTORY_KEY + "'.");

{code}


> Enable configuration for state.checkpoint.dir with environment
> --
>
> Key: FLINK-10202
> URL: https://issues.apache.org/jira/browse/FLINK-10202
> Project: Flink
>  Issue Type: Improvement
>Reporter: buptljy
>Priority: Major
>
> Usually we can set state.checkpoint.dir in flink-conf.yaml, but sometimes we 
> run a flink job locally, and we're not able to set state.checkpoint.dir, 
> which will cause 
> {code:java}
> throw new IllegalStateException("CheckpointConfig says to persist periodic " +
>   "checkpoints, but no checkpoint directory has been configured. You can 
> " +
>   "configure configure one via key '" + 
> ConfigConstants.CHECKPOINTS_DIRECTORY_KEY + "'.");
> {code}
> I wonder if we could provide a public method in *StreamExecutionEnvironment* 
> so that developers can use it to set state.checkpoint.dir for job.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10202) Enable configuration for state.checkpoint.dir in environment

2018-08-23 Thread buptljy (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10202?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

buptljy updated FLINK-10202:

Summary: Enable configuration for state.checkpoint.dir in environment  
(was: Enable configuration for state.checkpoint.dir with environment)

> Enable configuration for state.checkpoint.dir in environment
> 
>
> Key: FLINK-10202
> URL: https://issues.apache.org/jira/browse/FLINK-10202
> Project: Flink
>  Issue Type: Improvement
>Reporter: buptljy
>Priority: Major
>
> Usually we can set state.checkpoint.dir in flink-conf.yaml, but sometimes we 
> run a flink job locally, and we're not able to set state.checkpoint.dir, 
> which will cause 
> {code:java}
> throw new IllegalStateException("CheckpointConfig says to persist periodic " +
>   "checkpoints, but no checkpoint directory has been configured. You can 
> " +
>   "configure configure one via key '" + 
> ConfigConstants.CHECKPOINTS_DIRECTORY_KEY + "'.");
> {code}
> I wonder if we could provide a public method in *StreamExecutionEnvironment* 
> so that developers can use it to set state.checkpoint.dir for job.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10202) Enable configuration for state.checkpoint.dir with environment

2018-08-23 Thread buptljy (JIRA)
buptljy created FLINK-10202:
---

 Summary: Enable configuration for state.checkpoint.dir with 
environment
 Key: FLINK-10202
 URL: https://issues.apache.org/jira/browse/FLINK-10202
 Project: Flink
  Issue Type: Improvement
Reporter: buptljy


Usually we can set state.checkpoint.dir in flink-conf.yaml, but sometimes we 
run a flink job locally, and we're not able to set state.checkpoint.dir, which 
will cause 
{code:java}
throw new IllegalStateException("CheckpointConfig says to persist periodic " +
  "checkpoints, but no checkpoint directory has been configured. You can " +
  "configure configure one via key '" + 
ConfigConstants.CHECKPOINTS_DIRECTORY_KEY + "'.");

{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9693) Possible memory leak in jobmanager retaining archived checkpoints

2018-08-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16589897#comment-16589897
 ] 

ASF GitHub Bot commented on FLINK-9693:
---

TisonKun commented on issue #6251: [FLINK-9693] Set Execution#taskRestore to 
null after deployment
URL: https://github.com/apache/flink/pull/6251#issuecomment-415333090
 
 
   Thanks till, this saves my weekend :-)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Possible memory leak in jobmanager retaining archived checkpoints
> -
>
> Key: FLINK-9693
> URL: https://issues.apache.org/jira/browse/FLINK-9693
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager, State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.6.0
> Environment: !image.png!!image (1).png!
>Reporter: Steven Zhen Wu
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.4.3, 1.5.3, 1.6.1, 1.7.0
>
> Attachments: 20180725_jm_mem_leak.png, 
> 41K_ExecutionVertex_objs_retained_9GB.png, ExecutionVertexZoomIn.png
>
>
> First, some context about the job
>  * Flink 1.4.1
>  * stand-alone deployment mode
>  * embarrassingly parallel: all operators are chained together
>  * parallelism is over 1,000
>  * stateless except for Kafka source operators. checkpoint size is 8.4 MB.
>  * set "state.backend.fs.memory-threshold" so that only jobmanager writes to 
> S3 to checkpoint
>  * internal checkpoint with 10 checkpoints retained in history
>  
> Summary of the observations
>  * 41,567 ExecutionVertex objects retained 9+ GB of memory
>  * Expanded in one ExecutionVertex. it seems to storing the kafka offsets for 
> source operator



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] TisonKun commented on issue #6251: [FLINK-9693] Set Execution#taskRestore to null after deployment

2018-08-23 Thread GitBox
TisonKun commented on issue #6251: [FLINK-9693] Set Execution#taskRestore to 
null after deployment
URL: https://github.com/apache/flink/pull/6251#issuecomment-415333090
 
 
   Thanks till, this saves my weekend :-)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


  1   2   >