[GitHub] [flink] tzulitai edited a comment on issue #8570: [FLINK-12688] [state] Make serializer lazy initialization thread safe in StateDescriptor

2019-05-30 Thread GitBox
tzulitai edited a comment on issue #8570: [FLINK-12688] [state] Make serializer 
lazy initialization thread safe in StateDescriptor
URL: https://github.com/apache/flink/pull/8570#issuecomment-497585843
 
 
   One slight clarification regarding a comment that was mentioned in the 
description:
   ```
   The change here assures only one single serializer instance will be returned 
by StateDescriptor.getSerializer after lazy initialization while previously no 
such assurance.
   ```
   If I understood this correctly, this should be incorrect. `getSerializer` 
always creates a duplicate of the wrapped serializer. This is required since 
`TypeSerializer` isn't thread-safe.
   So, with this change, the returned serializer is still always a duplicate, 
but the one maintained within `StateDescriptor` is a singleton.
   
   @carp84 can you confirm this?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tzulitai edited a comment on issue #8570: [FLINK-12688] [state] Make serializer lazy initialization thread safe in StateDescriptor

2019-05-30 Thread GitBox
tzulitai edited a comment on issue #8570: [FLINK-12688] [state] Make serializer 
lazy initialization thread safe in StateDescriptor
URL: https://github.com/apache/flink/pull/8570#issuecomment-497585843
 
 
   One slight clarification regarding a comment that was mentioned in the 
description:
   ```
   The change here assures only one single serializer instance will be returned 
by StateDescriptor.getSerializer after lazy initialization while previously no 
such assurance.
   ```
   If I understood this correctly, this should be incorrect. `getSerializer` 
always creates a duplicate of the wrapped serializer. This is required since 
`TypeSerializer` isn't thread-safe.
   So, with this change, the returned serializer is still always a duplicate, 
but the one maintained within `StateDescriptor` is always a singleton.
   
   @carp84 can you confirm this?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tzulitai edited a comment on issue #8570: [FLINK-12688] [state] Make serializer lazy initialization thread safe in StateDescriptor

2019-05-30 Thread GitBox
tzulitai edited a comment on issue #8570: [FLINK-12688] [state] Make serializer 
lazy initialization thread safe in StateDescriptor
URL: https://github.com/apache/flink/pull/8570#issuecomment-497585843
 
 
   One slight clarification regarding a comment that was mentioned in the 
description:
   ```
   The change here assures only one single serializer instance will be returned 
by StateDescriptor.getSerializer after lazy initialization while previously no 
such assurance.
   ```
   If I understood this correctly, this should be incorrect. `getSerializer` 
always creates a duplicate of the wrapped serializer. This is required since 
`TypeSerializer` isn't thread-safe.
   So, the returned serializer is always a duplicate, but the one maintained 
within `StateDescriptor` is always a singleton.
   
   @carp84 can you confirm this?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tzulitai commented on issue #8570: [FLINK-12688] [state] Make serializer lazy initialization thread safe in StateDescriptor

2019-05-30 Thread GitBox
tzulitai commented on issue #8570: [FLINK-12688] [state] Make serializer lazy 
initialization thread safe in StateDescriptor
URL: https://github.com/apache/flink/pull/8570#issuecomment-497585843
 
 
   One slight clarification regarding a comment that was mentioned in the 
description:
   ```
   The change here assures only one single serializer instance will be returned 
by StateDescriptor.getSerializer after lazy initialization while previously no 
such assurance.
   ```
   If I understood this correctly, this should be incorrect. `getSerializer` 
always creates a duplicate of the wrapped serializer. This is required since 
`TypeSerializer` isn't thread-safe.
   So, the returned serializer is always a duplicate, but the one maintained 
within `StateDescriptor` is always a singleton.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] liyafan82 commented on issue #8579: [FLINK-12687][Runtime / Operators] ByteHashSet is always in dense mode

2019-05-30 Thread GitBox
liyafan82 commented on issue #8579: [FLINK-12687][Runtime / Operators] 
ByteHashSet is always in dense mode
URL: https://github.com/apache/flink/pull/8579#issuecomment-497585646
 
 
   Hi @JingsongLi, would you please take a look?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface

2019-05-30 Thread GitBox
zhijiangW commented on a change in pull request #8362: [FLINK-11391] Introduce 
shuffle master interface
URL: https://github.com/apache/flink/pull/8362#discussion_r289264560
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleMaster.java
 ##
 @@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.shuffle;
+
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Default {@link ShuffleMaster} for netty and local file based shuffle 
implementation.
+ */
+public enum NettyShuffleMaster implements 
ShuffleMaster {
+   INSTANCE;
+
+   @Override
+   public CompletableFuture 
registerPartitionWithProducer(
+   PartitionDescriptor partitionDescriptor,
+   ProducerDescriptor producerDescriptor) {
+   ResultPartitionID resultPartitionID = new ResultPartitionID(
+   partitionDescriptor.getPartitionId(),
+   producerDescriptor.getProducerExecutionId());
+   NettyShuffleDescriptor shuffleDeploymentDescriptor = new 
NettyShuffleDescriptor(
+   producerDescriptor.getProducerResourceId(),
+   createProducerLocation(producerDescriptor, 
partitionDescriptor.getConnectionIndex()),
+   resultPartitionID);
+   return 
CompletableFuture.completedFuture(shuffleDeploymentDescriptor);
+   }
+
+   private static NettyShuffleDescriptor.PartitionLocation 
createProducerLocation(
 
 Review comment:
   why use static here?
   createProducerLocation -> createPartitionLocation


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface

2019-05-30 Thread GitBox
zhijiangW commented on a change in pull request #8362: [FLINK-11391] Introduce 
shuffle master interface
URL: https://github.com/apache/flink/pull/8362#discussion_r289264560
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleMaster.java
 ##
 @@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.shuffle;
+
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Default {@link ShuffleMaster} for netty and local file based shuffle 
implementation.
+ */
+public enum NettyShuffleMaster implements 
ShuffleMaster {
+   INSTANCE;
+
+   @Override
+   public CompletableFuture 
registerPartitionWithProducer(
+   PartitionDescriptor partitionDescriptor,
+   ProducerDescriptor producerDescriptor) {
+   ResultPartitionID resultPartitionID = new ResultPartitionID(
+   partitionDescriptor.getPartitionId(),
+   producerDescriptor.getProducerExecutionId());
+   NettyShuffleDescriptor shuffleDeploymentDescriptor = new 
NettyShuffleDescriptor(
+   producerDescriptor.getProducerResourceId(),
+   createProducerLocation(producerDescriptor, 
partitionDescriptor.getConnectionIndex()),
+   resultPartitionID);
+   return 
CompletableFuture.completedFuture(shuffleDeploymentDescriptor);
+   }
+
+   private static NettyShuffleDescriptor.PartitionLocation 
createProducerLocation(
 
 Review comment:
   why use static here


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tzulitai commented on a change in pull request #8570: [FLINK-12688] [state] Make serializer lazy initialization thread safe in StateDescriptor

2019-05-30 Thread GitBox
tzulitai commented on a change in pull request #8570: [FLINK-12688] [state] 
Make serializer lazy initialization thread safe in StateDescriptor
URL: https://github.com/apache/flink/pull/8570#discussion_r289262686
 
 

 ##
 File path: 
flink-core/src/test/java/org/apache/flink/api/common/state/StateDescriptorTest.java
 ##
 @@ -200,6 +204,31 @@ public void testEqualsSameNameAndTypeDifferentClass() 
throws Exception {
assertNotEquals(descr1, descr2);
}
 
+   @Test
+   public void testSerializerLazyInitializeInParallel() throws Exception {
+   final String name = "testSerializerLazyInitializeInParallel";
+   // use PojoTypeInfo which will create a new serializer when 
createSerializer is invoked.
+   final TestStateDescriptor desc =
+   new TestStateDescriptor<>(name, new 
PojoTypeInfo<>(String.class, new ArrayList<>()));
+   final int threadNumber = 20;
+   final ArrayList threads = new ArrayList<>(threadNumber);
+   final ExecutionConfig executionConfig = 
Mockito.mock(ExecutionConfig.class);
 
 Review comment:
   As a rule of thumb in Flink, we try to avoid mocks as much as possible.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tzulitai commented on a change in pull request #8570: [FLINK-12688] [state] Make serializer lazy initialization thread safe in StateDescriptor

2019-05-30 Thread GitBox
tzulitai commented on a change in pull request #8570: [FLINK-12688] [state] 
Make serializer lazy initialization thread safe in StateDescriptor
URL: https://github.com/apache/flink/pull/8570#discussion_r289264163
 
 

 ##
 File path: 
flink-core/src/test/java/org/apache/flink/api/common/state/StateDescriptorTest.java
 ##
 @@ -200,6 +204,31 @@ public void testEqualsSameNameAndTypeDifferentClass() 
throws Exception {
assertNotEquals(descr1, descr2);
}
 
+   @Test
+   public void testSerializerLazyInitializeInParallel() throws Exception {
+   final String name = "testSerializerLazyInitializeInParallel";
+   // use PojoTypeInfo which will create a new serializer when 
createSerializer is invoked.
+   final TestStateDescriptor desc =
+   new TestStateDescriptor<>(name, new 
PojoTypeInfo<>(String.class, new ArrayList<>()));
+   final int threadNumber = 20;
+   final ArrayList threads = new ArrayList<>(threadNumber);
+   final ExecutionConfig executionConfig = 
Mockito.mock(ExecutionConfig.class);
+   final ConcurrentHashMap> 
serializers = new ConcurrentHashMap<>();
+   for (int i = 0; i < threadNumber; i++) {
+   threads.add(new Thread(() -> {
+   
desc.initializeSerializerUnlessSet(executionConfig);
+   TypeSerializer serializer = 
desc.getOriginalSerializer();
+   
serializers.put(System.identityHashCode(serializer), serializer);
+   }));
+   }
+   threads.forEach(Thread::start);
+   for (Thread t : threads) {
+   t.join();
 
 Review comment:
   We should maybe also check for exceptions in these threads. You can use 
Flink's `CheckedThread` for this purpose.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tzulitai commented on a change in pull request #8570: [FLINK-12688] [state] Make serializer lazy initialization thread safe in StateDescriptor

2019-05-30 Thread GitBox
tzulitai commented on a change in pull request #8570: [FLINK-12688] [state] 
Make serializer lazy initialization thread safe in StateDescriptor
URL: https://github.com/apache/flink/pull/8570#discussion_r289262391
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
 ##
 @@ -376,6 +387,9 @@ private void readObject(final ObjectInputStream in) throws 
IOException, ClassNot
 
in.readFully(buffer);
 
+   TypeSerializer serializer = 
serializerAtomicReference.get();
+   checkNotNull(serializer, "Serializer not yet 
initialized.");
 
 Review comment:
   Same here: move this to the beginning of the if-branch.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tzulitai commented on a change in pull request #8570: [FLINK-12688] [state] Make serializer lazy initialization thread safe in StateDescriptor

2019-05-30 Thread GitBox
tzulitai commented on a change in pull request #8570: [FLINK-12688] [state] 
Make serializer lazy initialization thread safe in StateDescriptor
URL: https://github.com/apache/flink/pull/8570#discussion_r289263899
 
 

 ##
 File path: 
flink-core/src/test/java/org/apache/flink/api/common/state/StateDescriptorTest.java
 ##
 @@ -224,6 +253,15 @@ public void testEqualsSameNameAndTypeDifferentClass() 
throws Exception {
public Type getType() {
return Type.VALUE;
}
+
+   public TypeSerializer getOriginalSerializer() {
 
 Review comment:
   This would not be necessary if we move this to the `StateDescriptor` class 
itself, and make the atomic reference field private. IMO, this is safer (please 
see my other comment above).


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tzulitai commented on a change in pull request #8570: [FLINK-12688] [state] Make serializer lazy initialization thread safe in StateDescriptor

2019-05-30 Thread GitBox
tzulitai commented on a change in pull request #8570: [FLINK-12688] [state] 
Make serializer lazy initialization thread safe in StateDescriptor
URL: https://github.com/apache/flink/pull/8570#discussion_r289262323
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
 ##
 @@ -343,6 +351,9 @@ private void writeObject(final ObjectOutputStream out) 
throws IOException {
// we have a default value
out.writeBoolean(true);
 
+   TypeSerializer serializer = 
serializerAtomicReference.get();
+   checkNotNull(serializer, "Serializer not yet 
initialized.");
 
 Review comment:
   IMO, would be cleaner to move this check to the beginning of else-branch.
   Moving early-returns / validations as early as possible would improve 
readability.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tzulitai commented on a change in pull request #8570: [FLINK-12688] [state] Make serializer lazy initialization thread safe in StateDescriptor

2019-05-30 Thread GitBox
tzulitai commented on a change in pull request #8570: [FLINK-12688] [state] 
Make serializer lazy initialization thread safe in StateDescriptor
URL: https://github.com/apache/flink/pull/8570#discussion_r289262617
 
 

 ##
 File path: 
flink-core/src/test/java/org/apache/flink/api/common/state/StateDescriptorTest.java
 ##
 @@ -200,6 +204,31 @@ public void testEqualsSameNameAndTypeDifferentClass() 
throws Exception {
assertNotEquals(descr1, descr2);
}
 
+   @Test
+   public void testSerializerLazyInitializeInParallel() throws Exception {
+   final String name = "testSerializerLazyInitializeInParallel";
+   // use PojoTypeInfo which will create a new serializer when 
createSerializer is invoked.
+   final TestStateDescriptor desc =
+   new TestStateDescriptor<>(name, new 
PojoTypeInfo<>(String.class, new ArrayList<>()));
+   final int threadNumber = 20;
+   final ArrayList threads = new ArrayList<>(threadNumber);
+   final ExecutionConfig executionConfig = 
Mockito.mock(ExecutionConfig.class);
 
 Review comment:
   No need to use mockito here. `new ExecutionConfig()` would suffice.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tzulitai commented on a change in pull request #8570: [FLINK-12688] [state] Make serializer lazy initialization thread safe in StateDescriptor

2019-05-30 Thread GitBox
tzulitai commented on a change in pull request #8570: [FLINK-12688] [state] 
Make serializer lazy initialization thread safe in StateDescriptor
URL: https://github.com/apache/flink/pull/8570#discussion_r289263588
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
 ##
 @@ -82,8 +88,8 @@
/** The serializer for the type. May be eagerly initialized in the 
constructor,
 * or lazily once the {@link 
#initializeSerializerUnlessSet(ExecutionConfig)} method
 * is called. */
-   @Nullable
-   protected TypeSerializer serializer;
+   @VisibleForTesting
+   protected final AtomicReference> 
serializerAtomicReference = new AtomicReference<>();
 
 Review comment:
   It's still a bit dangerous to make this `protected`, IMO. We can't guarantee 
subclasses would not mess around with this.
   
   If we want to make this visible for testing, a separate package-private 
final method for getting the original wrapped serializer is better.
   
   WDYT?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on issue #8567: [FLINK-12676][table][sql-client] Add descriptor, validator, and factory of GenericInMemoryCatalog for table discovery service

2019-05-30 Thread GitBox
bowenli86 commented on issue #8567: [FLINK-12676][table][sql-client] Add 
descriptor, validator, and factory of GenericInMemoryCatalog for table 
discovery service
URL: https://github.com/apache/flink/pull/8567#issuecomment-497580939
 
 
   @xuefuz can you take another look?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8522: [FLINK-12572][hive]Implement HiveInputFormat to read Hive tables

2019-05-30 Thread GitBox
bowenli86 commented on a change in pull request #8522: 
[FLINK-12572][hive]Implement HiveInputFormat to read Hive tables
URL: https://github.com/apache/flink/pull/8522#discussion_r289260306
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveRecordSerDe.java
 ##
 @@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.batch.connectors.hive;
 
 Review comment:
   +1


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8522: [FLINK-12572][hive]Implement HiveInputFormat to read Hive tables

2019-05-30 Thread GitBox
bowenli86 commented on a change in pull request #8522: 
[FLINK-12572][hive]Implement HiveInputFormat to read Hive tables
URL: https://github.com/apache/flink/pull/8522#discussion_r289260559
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSourceTest.java
 ##
 @@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.batch.connectors.hive;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableConfigOptions;
+import org.apache.flink.table.api.TableImpl;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.table.catalog.hive.HiveCatalog;
+import org.apache.flink.table.catalog.hive.HiveTestUtils;
+import org.apache.flink.table.catalog.hive.util.HiveTableUtil;
+import org.apache.flink.table.runtime.utils.TableUtil;
+import org.apache.flink.types.Row;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.mapred.JobConf;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+import scala.collection.Seq;
+
+/**
+ * Tests {@link HiveTableSource}.
+ */
+public class HiveTableSourceTest {
+
+   public static final String DEFAULT_SERDE_CLASS = 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.class.getName();
+   public static final String DEFAULT_INPUT_FORMAT_CLASS = 
org.apache.hadoop.mapred.TextInputFormat.class.getName();
+   public static final String DEFAULT_OUTPUT_FORMAT_CLASS = 
org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat.class.getName();
+
+   private static HiveCatalog hiveCatalog;
+   private static HiveConf hiveConf;
+
+   @BeforeClass
+   public static void createCatalog() throws IOException {
+   hiveConf = HiveTestUtils.getHiveConf();
+   hiveCatalog = HiveTestUtils.createHiveCatalog(hiveConf);
+   hiveCatalog.open();
+   }
+
+   @AfterClass
+   public static void closeCatalog() {
+   if (null != hiveCatalog) {
+   hiveCatalog.close();
+   }
+   }
+
+   @Test
+   public void testReadNonPartitionedTable() throws Exception {
+   final String dbName = "default";
+   final String tblName = "test";
+   TableSchema tableSchema = new TableSchema(
+   new String[]{"a", "b", "c", "d", "e"},
+   new TypeInformation[]{
+   BasicTypeInfo.INT_TYPE_INFO,
+   BasicTypeInfo.INT_TYPE_INFO,
+   BasicTypeInfo.STRING_TYPE_INFO,
+   BasicTypeInfo.LONG_TYPE_INFO,
+   BasicTypeInfo.DOUBLE_TYPE_INFO}
+   );
+   //Now we used metaStore client to create hive table instead of 
using hiveCatalog for it doesn't support set
+   //serDe temporarily.
+   IMetaStoreClient client = 
RetryingMetaStoreClient.getProxy(hiveConf, null, null, 
HiveMetaStoreClient.class.getName(), true);
+   org.apache.hadoop.hive.metastore.api.Table tbl = new 
org.apache.hadoop.hive.metastore.api.Table();
+   tbl.setDbName(dbName);
+   tbl.setTableName(tblName);
+   

[GitHub] [flink] bowenli86 commented on a change in pull request #8522: [FLINK-12572][hive]Implement HiveInputFormat to read Hive tables

2019-05-30 Thread GitBox
bowenli86 commented on a change in pull request #8522: 
[FLINK-12572][hive]Implement HiveInputFormat to read Hive tables
URL: https://github.com/apache/flink/pull/8522#discussion_r289260297
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/FlinkHiveException.java
 ##
 @@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.batch.connectors.hive;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * Exception used by {@link HiveTableInputFormat} and {@link 
HiveTableOutputFormat}.
+ */
+@PublicEvolving
+public class FlinkHiveException extends RuntimeException {
+
+   private static final long serialVersionUID = 920269130311214200L;
 
 Review comment:
   do we need to serialize FlinkHiveException ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8536: [FLINK-12568][hive] Implement OutputFormat to write Hive tables

2019-05-30 Thread GitBox
bowenli86 commented on a change in pull request #8536: [FLINK-12568][hive] 
Implement OutputFormat to write Hive tables
URL: https://github.com/apache/flink/pull/8536#discussion_r289259721
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormat.java
 ##
 @@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.batch.connectors.hive;
+
+import org.apache.flink.api.common.io.FinalizeOnMaster;
+import org.apache.flink.api.common.io.InitializeOnMaster;
+import org.apache.flink.api.java.hadoop.common.HadoopInputFormatCommonBase;
+import org.apache.flink.api.java.hadoop.common.HadoopOutputFormatCommonBase;
+import org.apache.flink.api.java.hadoop.mapreduce.utils.HadoopUtils;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.hive.HMSClientFactory;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.FileUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobContext;
+import org.apache.hadoop.mapred.JobContextImpl;
+import org.apache.hadoop.mapred.JobID;
+import org.apache.hadoop.mapred.OutputCommitter;
+import org.apache.hadoop.mapred.OutputFormat;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.mapred.TaskAttemptContext;
+import org.apache.hadoop.mapred.TaskAttemptContextImpl;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.OUTDIR;
+
+/**
+ * HiveTableOutputFormat used to write data to hive table, including 
non-partition and partitioned table.
+ */
+public class HiveTableOutputFormat extends HadoopOutputFormatCommonBase 
implements InitializeOnMaster,
+   FinalizeOnMaster {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(HiveTableOutputFormat.class);
+
+   private static final long serialVersionUID = 5167529504848109023L;
+
+   private transient JobConf jobConf;
+   private transient String dbName;
+   private transient String tableName;
+   private transient List partitionCols;
+   private transient RowTypeInfo rowTypeInfo;
+   private transient HiveTablePartition 

[GitHub] [flink] bowenli86 commented on a change in pull request #8536: [FLINK-12568][hive] Implement OutputFormat to write Hive tables

2019-05-30 Thread GitBox
bowenli86 commented on a change in pull request #8536: [FLINK-12568][hive] 
Implement OutputFormat to write Hive tables
URL: https://github.com/apache/flink/pull/8536#discussion_r289259721
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/batch/connectors/hive/HiveTableOutputFormat.java
 ##
 @@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.batch.connectors.hive;
+
+import org.apache.flink.api.common.io.FinalizeOnMaster;
+import org.apache.flink.api.common.io.InitializeOnMaster;
+import org.apache.flink.api.java.hadoop.common.HadoopInputFormatCommonBase;
+import org.apache.flink.api.java.hadoop.common.HadoopOutputFormatCommonBase;
+import org.apache.flink.api.java.hadoop.mapreduce.utils.HadoopUtils;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.hive.HMSClientFactory;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.FileUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobContext;
+import org.apache.hadoop.mapred.JobContextImpl;
+import org.apache.hadoop.mapred.JobID;
+import org.apache.hadoop.mapred.OutputCommitter;
+import org.apache.hadoop.mapred.OutputFormat;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.mapred.TaskAttemptContext;
+import org.apache.hadoop.mapred.TaskAttemptContextImpl;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.OUTDIR;
+
+/**
+ * HiveTableOutputFormat used to write data to hive table, including 
non-partition and partitioned table.
+ */
+public class HiveTableOutputFormat extends HadoopOutputFormatCommonBase 
implements InitializeOnMaster,
+   FinalizeOnMaster {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(HiveTableOutputFormat.class);
+
+   private static final long serialVersionUID = 5167529504848109023L;
+
+   private transient JobConf jobConf;
+   private transient String dbName;
+   private transient String tableName;
+   private transient List partitionCols;
+   private transient RowTypeInfo rowTypeInfo;
+   private transient HiveTablePartition 

[jira] [Commented] (FLINK-5243) Implement an example for BipartiteGraph

2019-05-30 Thread Jasleen Kaur (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-5243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16852672#comment-16852672
 ] 

Jasleen Kaur commented on FLINK-5243:
-

[~greghogan] Thanks for your reply. I was planning to implement 
[https://en.wikipedia.org/wiki/Hopcroft%E2%80%93Karp_algorithm] using GSA of 
Flink gelly. Unfortunately those api's are not there for bipartite graph. Even 
if I use Flink's top & bottom projections( to apply GSA)  that will not give 
the correct adjacency list. I guess using Graph data structure for implementing 
bipartite matching would be easier. I will spend some more time on this over 
the weekend.

> Implement an example for BipartiteGraph
> ---
>
> Key: FLINK-5243
> URL: https://issues.apache.org/jira/browse/FLINK-5243
> Project: Flink
>  Issue Type: Sub-task
>  Components: Library / Graph Processing (Gelly)
>Reporter: Ivan Mushketyk
>Priority: Major
>  Labels: beginner
>
> Should implement example for BipartiteGraph in gelly-examples project 
> similarly to examples for Graph class.
> Depends on this: https://issues.apache.org/jira/browse/FLINK-2254



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] tzulitai edited a comment on issue #8570: [FLINK-12688] [state] Make serializer lazy initialization thread safe in StateDescriptor

2019-05-30 Thread GitBox
tzulitai edited a comment on issue #8570: [FLINK-12688] [state] Make serializer 
lazy initialization thread safe in StateDescriptor
URL: https://github.com/apache/flink/pull/8570#issuecomment-497577479
 
 
   @flinkbot approve consensus
   
   I'm approving the consensus to fix the problem like this, before we attempt 
to tackle the larger-scale problem of serializers required to be lazily 
instantiated due to lack of `ExecutionConfig` when the descriptor is created.
   
   Just as a food for thought, there was a similar attempt in the past to make 
the `StateDescriptor` safe to be used across different threads w.r.t. accessing 
the state serializer: https://issues.apache.org/jira/browse/FLINK-6775. 
Apparently, that only partially solved the issue.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tzulitai edited a comment on issue #8570: [FLINK-12688] [state] Make serializer lazy initialization thread safe in StateDescriptor

2019-05-30 Thread GitBox
tzulitai edited a comment on issue #8570: [FLINK-12688] [state] Make serializer 
lazy initialization thread safe in StateDescriptor
URL: https://github.com/apache/flink/pull/8570#issuecomment-497577479
 
 
   @flinkbot approve consensus
   
   I'm approving the consensus to fix the problem like this, before we attempt 
to tackle the larger-scale problem of serializers required to be lazily 
instantiated due to lack of `ExecutionConfig`.
   
   Just as a food for thought, there was a similar attempt in the past to make 
the `StateDescriptor` safe to be used across different threads w.r.t. accessing 
the state serializer: https://issues.apache.org/jira/browse/FLINK-6775. 
Apparently, that only partially solved the issue.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-12683) Provide task manager's location information for checkpoint coordinator specific log messages

2019-05-30 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16852668#comment-16852668
 ] 

vinoyang commented on FLINK-12683:
--

I think for logging, the good practice is that it should provide enough message 
to location the problem, not through the whole log stream or the context log 
information. Because not everyone has a good knowledge of all the log message's 
meaning. In addition, if there are thousands of subtasks, it's hard to find the 
mapping relationship and we try to collect some key issue messages from job 
manager log file, we do not try to collect all the log messages. cc 
[~till.rohrmann]

> Provide task manager's location information for checkpoint coordinator 
> specific log messages
> 
>
> Key: FLINK-12683
> URL: https://issues.apache.org/jira/browse/FLINK-12683
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Currently, the {{AcknowledgeCheckpoint}} does not contain the task manager's 
> location information. When a task's snapshot task sends an ack message to the 
> coordinator, we can only log this message:
> {code:java}
> Received late message for now expired checkpoint attempt 6035 from 
> ccd88d08bf82245f3466c9480fb5687a of job 775ef8ff0159b071da7804925bbd362f.
> {code}
> Sometimes we need to get this sub task's location information to do the 
> further debug work, e.g. stack trace dump. But, without the location 
> information, It will not help to quickly locate the problem.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] tzulitai commented on issue #8570: [FLINK-12688] [state] Make serializer lazy initialization thread safe in StateDescriptor

2019-05-30 Thread GitBox
tzulitai commented on issue #8570: [FLINK-12688] [state] Make serializer lazy 
initialization thread safe in StateDescriptor
URL: https://github.com/apache/flink/pull/8570#issuecomment-497577479
 
 
   @flinkbot approve consensus
   
   I'm approving the consensus to fix the problem like this, before we attempt 
to tackle the problem that serializers can only be lazily instantiated due to 
lack of `ExecutionConfig`.
   
   Just as a food for thought, there was a similar attempt in the past to make 
the `StateDescriptor` safe to be used across different threads w.r.t. accessing 
the state serializer: https://issues.apache.org/jira/browse/FLINK-6775. 
Apparently, that only partially solved the issue.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8564: [FLINK-12649][hive] Add a shim layer to support multiple versions of HMS

2019-05-30 Thread GitBox
bowenli86 commented on a change in pull request #8564: [FLINK-12649][hive] Add 
a shim layer to support multiple versions of HMS
URL: https://github.com/apache/flink/pull/8564#discussion_r289258274
 
 

 ##
 File path: flink-connectors/flink-connector-hive/pom.xml
 ##
 @@ -423,4 +423,22 @@ under the License.



+
+   
+   
+   hive-1
+   
+   1.2.1
+   
2.6.0
+   
+   
+   
 
 Review comment:
   it's excluded because it's not used previously. ok, I will open a followup 
PR to make hive-metastore 'provided' and remove this dependency


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8564: [FLINK-12649][hive] Add a shim layer to support multiple versions of HMS

2019-05-30 Thread GitBox
bowenli86 commented on a change in pull request #8564: [FLINK-12649][hive] Add 
a shim layer to support multiple versions of HMS
URL: https://github.com/apache/flink/pull/8564#discussion_r289258387
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveMetastoreClientWrapper.java
 ##
 @@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.hive;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
+import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.Function;
+import org.apache.hadoop.hive.metastore.api.InvalidInputException;
+import org.apache.hadoop.hive.metastore.api.InvalidObjectException;
+import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.UnknownDBException;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Wrapper class for Hive Metastore Client, which embeds a HiveShim layer to 
handle different Hive versions.
+ * Methods provided mostly conforms to IMetaStoreClient interfaces except 
those that require shims.
+ */
+public class HiveMetastoreClientWrapper implements AutoCloseable {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(HiveMetastoreClientWrapper.class);
+
+   private final IMetaStoreClient client;
+   private final HiveConf hiveConf;
+
+   public HiveMetastoreClientWrapper(HiveConf hiveConf) {
+   this.hiveConf = hiveConf;
 
 Review comment:
   nit: can be this.hiveConf = Preconditions.checkNotNull(hiveConf, "HiveConf 
cannot be null"); No worries, I can fix this when merging code


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8564: [FLINK-12649][hive] Add a shim layer to support multiple versions of HMS

2019-05-30 Thread GitBox
bowenli86 commented on a change in pull request #8564: [FLINK-12649][hive] Add 
a shim layer to support multiple versions of HMS
URL: https://github.com/apache/flink/pull/8564#discussion_r289258244
 
 

 ##
 File path: flink-connectors/flink-connector-hive/pom.xml
 ##
 @@ -423,4 +423,22 @@ under the License.



+
+   
+   
 
 Review comment:
   We can just a comment here as ?
   
   BTW, shall we name this profile to hive-1.2.1 to match its hive version?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8564: [FLINK-12649][hive] Add a shim layer to support multiple versions of HMS

2019-05-30 Thread GitBox
bowenli86 commented on a change in pull request #8564: [FLINK-12649][hive] Add 
a shim layer to support multiple versions of HMS
URL: https://github.com/apache/flink/pull/8564#discussion_r289258274
 
 

 ##
 File path: flink-connectors/flink-connector-hive/pom.xml
 ##
 @@ -423,4 +423,22 @@ under the License.



+
+   
+   
+   hive-1
+   
+   1.2.1
+   
2.6.0
+   
+   
+   
 
 Review comment:
   it's excluded because it's not used previously. ok, I will have open a 
followup PR to make hive-metastore 'provided' and remove this dependency


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8564: [FLINK-12649][hive] Add a shim layer to support multiple versions of HMS

2019-05-30 Thread GitBox
bowenli86 commented on a change in pull request #8564: [FLINK-12649][hive] Add 
a shim layer to support multiple versions of HMS
URL: https://github.com/apache/flink/pull/8564#discussion_r289258121
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveMetastoreClientWrapper.java
 ##
 @@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.hive;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
+import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.Function;
+import org.apache.hadoop.hive.metastore.api.InvalidInputException;
+import org.apache.hadoop.hive.metastore.api.InvalidObjectException;
+import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.UnknownDBException;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Wrapper class for Hive Metastore Client, which embeds a HiveShim layer to 
handle different Hive versions.
+ * Methods provided mostly conforms to IMetaStoreClient interfaces except 
those that require shims.
+ */
+public class HiveMetastoreClientWrapper implements AutoCloseable {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(HiveMetastoreClientWrapper.class);
+
+   private final IMetaStoreClient client;
+   private final HiveConf hiveConf;
+
+   public HiveMetastoreClientWrapper(HiveConf hiveConf) {
+   this.hiveConf = hiveConf;
 
 Review comment:
   nit: can be `this.hiveConf = Preconditions.checkNotNull(hiveConf, "HiveConf 
cannot be null");`  No worries, I can fix this when merging code


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8564: [FLINK-12649][hive] Add a shim layer to support multiple versions of HMS

2019-05-30 Thread GitBox
bowenli86 commented on a change in pull request #8564: [FLINK-12649][hive] Add 
a shim layer to support multiple versions of HMS
URL: https://github.com/apache/flink/pull/8564#discussion_r289257720
 
 

 ##
 File path: flink-connectors/flink-connector-hive/pom.xml
 ##
 @@ -423,4 +423,22 @@ under the License.



+
+   
+   
+   hive-1
+   
+   1.2.1
+   
2.6.0
+   
+   
+   
 
 Review comment:
   it's excluded because it's not used previously. I think we can remove this 
as well as the exclusion of it in hive-metastore. I will have open a followup 
PR to make hive-metastore 'provided'


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8564: [FLINK-12649][hive] Add a shim layer to support multiple versions of HMS

2019-05-30 Thread GitBox
bowenli86 commented on a change in pull request #8564: [FLINK-12649][hive] Add 
a shim layer to support multiple versions of HMS
URL: https://github.com/apache/flink/pull/8564#discussion_r289257545
 
 

 ##
 File path: flink-connectors/flink-connector-hive/pom.xml
 ##
 @@ -423,4 +423,22 @@ under the License.



+
+   
+   
 
 Review comment:
   We can just a comment here as ``?
   
   BTW, shall we name this profile to `hive-1.2.1` to match its hive version?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8564: [FLINK-12649][hive] Add a shim layer to support multiple versions of HMS

2019-05-30 Thread GitBox
bowenli86 commented on a change in pull request #8564: [FLINK-12649][hive] Add 
a shim layer to support multiple versions of HMS
URL: https://github.com/apache/flink/pull/8564#discussion_r289258121
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveMetastoreClientWrapper.java
 ##
 @@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.hive;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
+import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.Function;
+import org.apache.hadoop.hive.metastore.api.InvalidInputException;
+import org.apache.hadoop.hive.metastore.api.InvalidObjectException;
+import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.UnknownDBException;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Wrapper class for Hive Metastore Client, which embeds a HiveShim layer to 
handle different Hive versions.
+ * Methods provided mostly conforms to IMetaStoreClient interfaces except 
those that require shims.
+ */
+public class HiveMetastoreClientWrapper implements AutoCloseable {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(HiveMetastoreClientWrapper.class);
+
+   private final IMetaStoreClient client;
+   private final HiveConf hiveConf;
+
+   public HiveMetastoreClientWrapper(HiveConf hiveConf) {
+   this.hiveConf = hiveConf;
 
 Review comment:
   nit: can be `this.hiveConf = Preconditions.checkNotNull(hiveConf, "HiveConf 
cannot be null");`  No worries, I can fix this when merging code


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8564: [FLINK-12649][hive] Add a shim layer to support multiple versions of HMS

2019-05-30 Thread GitBox
bowenli86 commented on a change in pull request #8564: [FLINK-12649][hive] Add 
a shim layer to support multiple versions of HMS
URL: https://github.com/apache/flink/pull/8564#discussion_r289257720
 
 

 ##
 File path: flink-connectors/flink-connector-hive/pom.xml
 ##
 @@ -423,4 +423,22 @@ under the License.



+
+   
+   
+   hive-1
+   
+   1.2.1
+   
2.6.0
+   
+   
+   
 
 Review comment:
   it's excluded because it's not used previously. I think we can remove this 
as well as the exclusion of it in hive-metastore. I will have open a followup 
PR to make hive-metastore 'provided'


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8564: [FLINK-12649][hive] Add a shim layer to support multiple versions of HMS

2019-05-30 Thread GitBox
bowenli86 commented on a change in pull request #8564: [FLINK-12649][hive] Add 
a shim layer to support multiple versions of HMS
URL: https://github.com/apache/flink/pull/8564#discussion_r289257545
 
 

 ##
 File path: flink-connectors/flink-connector-hive/pom.xml
 ##
 @@ -423,4 +423,22 @@ under the License.



+
+   
+   
 
 Review comment:
   We can just a comment here as ``?
   
   BTW, shall we name this profile to `hive-1.2.1` to match its hive version?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11987) Kafka producer occasionally throws NullpointerException

2019-05-30 Thread Yu Li (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11987?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16852663#comment-16852663
 ] 

Yu Li commented on FLINK-11987:
---

Regarding the issue mentioned by [~chethanuk], from the posted stack the NPE 
was thrown from inside the user-defined function (UDF) thus probably not a 
problem of Flink. Could you double check your UDF codes [~chethanuk]? Thanks.

> Kafka producer occasionally throws NullpointerException
> ---
>
> Key: FLINK-11987
> URL: https://issues.apache.org/jira/browse/FLINK-11987
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.6.3, 1.6.4, 1.7.2
> Environment: Flink 1.6.2 (Standalone Cluster)
> Oracle JDK 1.8u151
> Centos 7.4
>Reporter: LIU Xiao
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.7.3, 1.9.0, 1.6.5, 1.8.1
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> We are using Flink 1.6.2 in our production environment, and kafka producer 
> occasionally throws NullpointerException.
> We found in line 175 of 
> flink/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java,
>  NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR was created as a static variable.
> Then in line 837, 
> {code:java}
> context.getOperatorStateStore().getUnionListState(NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR);
> {code}
> was called, and that leads to line 734 of 
>  
> flink/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java:
>  
> {code:java}
> stateDescriptor.initializeSerializerUnlessSet(getExecutionConfig());
> {code}
> In function initializeSerializerUnlessSet(line 283 of 
> flink/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java):
> {code:java}
> if (serializer == null) {
>   checkState(typeInfo != null, "no serializer and no type info");
>   // instantiate the serializer
>   serializer = typeInfo.createSerializer(executionConfig);
>   // we can drop the type info now, no longer needed
>   typeInfo  = null;
> }
> "serializer = typeInfo.createSerializer(executionConfig);" is the line which 
> throws the exception.
> {code}
> We think that's because multiple subtasks of the same producer in a same 
> TaskManager share a same NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12683) Provide task manager's location information for checkpoint coordinator specific log messages

2019-05-30 Thread Congxian Qiu(klion26) (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16852658#comment-16852658
 ] 

Congxian Qiu(klion26) commented on FLINK-12683:
---

In my opinion, the jm log of current version(1.8) is enough(we can find the 
task location in jm log now), so I don't think we need this patch.

> Provide task manager's location information for checkpoint coordinator 
> specific log messages
> 
>
> Key: FLINK-12683
> URL: https://issues.apache.org/jira/browse/FLINK-12683
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Currently, the {{AcknowledgeCheckpoint}} does not contain the task manager's 
> location information. When a task's snapshot task sends an ack message to the 
> coordinator, we can only log this message:
> {code:java}
> Received late message for now expired checkpoint attempt 6035 from 
> ccd88d08bf82245f3466c9480fb5687a of job 775ef8ff0159b071da7804925bbd362f.
> {code}
> Sometimes we need to get this sub task's location information to do the 
> further debug work, e.g. stack trace dump. But, without the location 
> information, It will not help to quickly locate the problem.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11987) Kafka producer occasionally throws NullpointerException

2019-05-30 Thread Yu Li (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11987?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16852656#comment-16852656
 ] 

Yu Li commented on FLINK-11987:
---

Have created FLINK-12688 to fix the issue from inside {{StateDescriptor}} as 
suggested above and marked as blocker to this one.

> Kafka producer occasionally throws NullpointerException
> ---
>
> Key: FLINK-11987
> URL: https://issues.apache.org/jira/browse/FLINK-11987
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.6.3, 1.6.4, 1.7.2
> Environment: Flink 1.6.2 (Standalone Cluster)
> Oracle JDK 1.8u151
> Centos 7.4
>Reporter: LIU Xiao
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.7.3, 1.9.0, 1.6.5, 1.8.1
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> We are using Flink 1.6.2 in our production environment, and kafka producer 
> occasionally throws NullpointerException.
> We found in line 175 of 
> flink/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java,
>  NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR was created as a static variable.
> Then in line 837, 
> {code:java}
> context.getOperatorStateStore().getUnionListState(NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR);
> {code}
> was called, and that leads to line 734 of 
>  
> flink/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java:
>  
> {code:java}
> stateDescriptor.initializeSerializerUnlessSet(getExecutionConfig());
> {code}
> In function initializeSerializerUnlessSet(line 283 of 
> flink/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java):
> {code:java}
> if (serializer == null) {
>   checkState(typeInfo != null, "no serializer and no type info");
>   // instantiate the serializer
>   serializer = typeInfo.createSerializer(executionConfig);
>   // we can drop the type info now, no longer needed
>   typeInfo  = null;
> }
> "serializer = typeInfo.createSerializer(executionConfig);" is the line which 
> throws the exception.
> {code}
> We think that's because multiple subtasks of the same producer in a same 
> TaskManager share a same NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] flinkbot commented on issue #8580: [FLINK-12683] Provide task manager location information for checkpoint coordinator specific log messages

2019-05-30 Thread GitBox
flinkbot commented on issue #8580: [FLINK-12683] Provide task manager location 
information for checkpoint coordinator specific log messages
URL: https://github.com/apache/flink/pull/8580#issuecomment-497573926
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-12688) Make serializer lazy initialization thread safe in StateDescriptor

2019-05-30 Thread Yu Li (JIRA)
Yu Li created FLINK-12688:
-

 Summary: Make serializer lazy initialization thread safe in 
StateDescriptor
 Key: FLINK-12688
 URL: https://issues.apache.org/jira/browse/FLINK-12688
 Project: Flink
  Issue Type: Improvement
  Components: API / DataStream, API / Type Serialization System
Affects Versions: 1.8.0, 1.7.2, 1.6.4
Reporter: Yu Li
Assignee: Yu Li
 Fix For: 1.9.0, 1.8.1


As per 
[discussed|https://issues.apache.org/jira/browse/FLINK-11987?focusedCommentId=16832335=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-16832335]
 in FLINK-11987, this JIRA aims at resolving the issue of 
{{StateDescriptor#initializeSerializerUnlessSet}} under race condition. More 
specified, we will assure no NPE thrown as well as the singleton of 
{{serializer}} when invoking {{initializeSerializerUnlessSet}} in parallel.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12683) Provide task manager's location information for checkpoint coordinator specific log messages

2019-05-30 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12683?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-12683:
---
Labels: pull-request-available  (was: )

> Provide task manager's location information for checkpoint coordinator 
> specific log messages
> 
>
> Key: FLINK-12683
> URL: https://issues.apache.org/jira/browse/FLINK-12683
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> Currently, the {{AcknowledgeCheckpoint}} does not contain the task manager's 
> location information. When a task's snapshot task sends an ack message to the 
> coordinator, we can only log this message:
> {code:java}
> Received late message for now expired checkpoint attempt 6035 from 
> ccd88d08bf82245f3466c9480fb5687a of job 775ef8ff0159b071da7804925bbd362f.
> {code}
> Sometimes we need to get this sub task's location information to do the 
> further debug work, e.g. stack trace dump. But, without the location 
> information, It will not help to quickly locate the problem.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] yanghua opened a new pull request #8580: [FLINK-12683] Provide task manager location information for checkpoint coordinator specific log messages

2019-05-30 Thread GitBox
yanghua opened a new pull request #8580: [FLINK-12683] Provide task manager 
location information for checkpoint coordinator specific log messages
URL: https://github.com/apache/flink/pull/8580
 
 
   ## What is the purpose of the change
   
   *This pull request provides task manager location information for checkpoint 
coordinator specific log messages*
   
   
   ## Brief change log
   
 - *Added a new parameter `TaskManagerLocation taskManagerLocation` for 
method `receiveAcknowledgeMessage` and `receiveDeclineMessage`*
 - *Changed specific test case*
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / **no**)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ **not documented**)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-12687) ByteHashSet is always in dense mode

2019-05-30 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12687?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-12687:
---
Labels: pull-request-available  (was: )

> ByteHashSet is always in dense mode
> ---
>
> Key: FLINK-12687
> URL: https://issues.apache.org/jira/browse/FLINK-12687
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Operators
>Reporter: Liya Fan
>Assignee: Liya Fan
>Priority: Minor
>  Labels: pull-request-available
>
> Since there are only 256 possible byte values, the largest possible range is 
> 255, and the condition 
> range < OptimizableHashSet.DENSE_THRESHOLD
> must be satisfied. So ByteHashSet must be in dense mode.
> We can make use of this to improve the performance and code structure.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] flinkbot commented on issue #8579: [FLINK-12687][Runtime / Operators] ByteHashSet is always in dense mode

2019-05-30 Thread GitBox
flinkbot commented on issue #8579: [FLINK-12687][Runtime / Operators] 
ByteHashSet is always in dense mode
URL: https://github.com/apache/flink/pull/8579#issuecomment-497573552
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] liyafan82 opened a new pull request #8579: [FLINK-12687][Runtime / Operators] ByteHashSet is always in dense mode

2019-05-30 Thread GitBox
liyafan82 opened a new pull request #8579: [FLINK-12687][Runtime / Operators] 
ByteHashSet is always in dense mode
URL: https://github.com/apache/flink/pull/8579
 
 
   
   
   ## What is the purpose of the change
   
   Improve the performance and code structure for 
org.apache.flink.table.runtime.util.collections.ByteHashSet.
   
   Since there are only 256 possible byte values, the largest possible range is 
255, and the condition 
   
   range < OptimizableHashSet.DENSE_THRESHOLD
   
   must be satisfied. So ByteHashSet must be in dense mode.
   
   We can make use of this to improve the performance and code structure.
   
   
   ## Brief change log
   
 - Change the code so it only considers the case for dense mode.
 
   
   ## Verifying this change
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-12687) ByteHashSet is always in dense mode

2019-05-30 Thread Liya Fan (JIRA)
Liya Fan created FLINK-12687:


 Summary: ByteHashSet is always in dense mode
 Key: FLINK-12687
 URL: https://issues.apache.org/jira/browse/FLINK-12687
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Operators
Reporter: Liya Fan
Assignee: Liya Fan


Since there are only 256 possible byte values, the largest possible range is 
255, and the condition 

range < OptimizableHashSet.DENSE_THRESHOLD

must be satisfied. So ByteHashSet must be in dense mode.

We can make use of this to improve the performance and code structure.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12686) StreamGraph Supports Blink Batch Mode

2019-05-30 Thread Biao Liu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12686?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Biao Liu updated FLINK-12686:
-
Description: This is a part of merging Blink batch runner. I would like to 
improvement the StreamGraph to support both streaming job and batch job. There 
is a google doc to describe this proposal, here is the 
[link|https://docs.google.com/document/d/17X6csUWKUdVn55c47YbOTEipHDWFt_1upyTPrBD6Fjc/edit?usp=sharing].
 This proposal is under discussion, any feedback is welcome!  (was: This a part 
of merging Blink batch runner. I would like to improvement the StreamGraph to 
support both streaming job and batch job. There is a google doc to describe 
this proposal, here is the 
[link|https://docs.google.com/document/d/17X6csUWKUdVn55c47YbOTEipHDWFt_1upyTPrBD6Fjc/edit?usp=sharing].
 This proposal is under discussion, any feedback is welcome!)

> StreamGraph Supports Blink Batch Mode
> -
>
> Key: FLINK-12686
> URL: https://issues.apache.org/jira/browse/FLINK-12686
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream
>Reporter: Biao Liu
>Priority: Major
> Fix For: 1.9.0
>
>
> This is a part of merging Blink batch runner. I would like to improvement the 
> StreamGraph to support both streaming job and batch job. There is a google 
> doc to describe this proposal, here is the 
> [link|https://docs.google.com/document/d/17X6csUWKUdVn55c47YbOTEipHDWFt_1upyTPrBD6Fjc/edit?usp=sharing].
>  This proposal is under discussion, any feedback is welcome!



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12686) StreamGraph Supports Blink Batch Mode

2019-05-30 Thread Biao Liu (JIRA)
Biao Liu created FLINK-12686:


 Summary: StreamGraph Supports Blink Batch Mode
 Key: FLINK-12686
 URL: https://issues.apache.org/jira/browse/FLINK-12686
 Project: Flink
  Issue Type: Improvement
  Components: API / DataStream
Reporter: Biao Liu
 Fix For: 1.9.0


This a part of merging Blink batch runner. I would like to improvement the 
StreamGraph to support both streaming job and batch job. There is a google doc 
to describe this proposal, here is the 
[link|https://docs.google.com/document/d/17X6csUWKUdVn55c47YbOTEipHDWFt_1upyTPrBD6Fjc/edit?usp=sharing].
 This proposal is under discussion, any feedback is welcome!



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] zhijiangW commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface

2019-05-30 Thread GitBox
zhijiangW commented on a change in pull request #8362: [FLINK-11391] Introduce 
shuffle master interface
URL: https://github.com/apache/flink/pull/8362#discussion_r289249491
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/PartitionDescriptor.java
 ##
 @@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.shuffle;
+
+import org.apache.flink.runtime.executiongraph.ExecutionEdge;
+import org.apache.flink.runtime.executiongraph.IntermediateResult;
+import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+
+import java.io.Serializable;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Partition descriptor for {@link ShuffleMaster} to obtain {@link 
ShuffleDescriptor}.
+ */
+public class PartitionDescriptor implements Serializable {
+
+   private static final long serialVersionUID = 6343547936086963705L;
+
+   /** The ID of the result this partition belongs to. */
+   private final IntermediateDataSetID resultId;
+
+   /** The ID of the partition. */
+   private final IntermediateResultPartitionID partitionId;
+
+   /** The type of the partition. */
+   private final ResultPartitionType partitionType;
+
+   /** The number of subpartitions. */
+   private final int numberOfSubpartitions;
+
+   /** Connection index to identify this partition of intermediate result. 
*/
+   private final int connectionIndex;
+
+   public PartitionDescriptor(
+   IntermediateDataSetID resultId,
+   IntermediateResultPartitionID partitionId,
+   ResultPartitionType partitionType,
+   int numberOfSubpartitions,
+   int connectionIndex) {
+   this.resultId = resultId;
+   this.partitionId = partitionId;
+   this.partitionType = partitionType;
+   checkArgument(numberOfSubpartitions >= 1);
+   this.numberOfSubpartitions = numberOfSubpartitions;
+   this.connectionIndex = connectionIndex;
+   }
+
+   public IntermediateDataSetID getResultId() {
+   return resultId;
+   }
+
+   public IntermediateResultPartitionID getPartitionId() {
+   return partitionId;
+   }
+
+   public ResultPartitionType getPartitionType() {
+   return partitionType;
+   }
+
+   public int getNumberOfSubpartitions() {
+   return numberOfSubpartitions;
+   }
+
+   int getConnectionIndex() {
+   return connectionIndex;
+   }
+
+   @Override
+   public String toString() {
+   return String.format(
+   "PartitionDescriptor [result id: %s, partition id: %s, 
partition type: %s, " +
+   "subpartitions: %d, connection index: %d]",
+   resultId,
+   partitionId,
+   partitionType,
+   numberOfSubpartitions,
+   connectionIndex);
+   }
+
+   public static PartitionDescriptor from(IntermediateResultPartition 
partition) {
+   // The produced data is partitioned among a number of 
subpartitions.
 
 Review comment:
   checkNotNull(partition)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface

2019-05-30 Thread GitBox
zhijiangW commented on a change in pull request #8362: [FLINK-11391] Introduce 
shuffle master interface
URL: https://github.com/apache/flink/pull/8362#discussion_r289249256
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/PartitionDescriptor.java
 ##
 @@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.shuffle;
+
+import org.apache.flink.runtime.executiongraph.ExecutionEdge;
+import org.apache.flink.runtime.executiongraph.IntermediateResult;
+import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+
+import java.io.Serializable;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Partition descriptor for {@link ShuffleMaster} to obtain {@link 
ShuffleDescriptor}.
+ */
+public class PartitionDescriptor implements Serializable {
+
+   private static final long serialVersionUID = 6343547936086963705L;
+
+   /** The ID of the result this partition belongs to. */
+   private final IntermediateDataSetID resultId;
+
+   /** The ID of the partition. */
+   private final IntermediateResultPartitionID partitionId;
+
+   /** The type of the partition. */
+   private final ResultPartitionType partitionType;
+
+   /** The number of subpartitions. */
+   private final int numberOfSubpartitions;
+
+   /** Connection index to identify this partition of intermediate result. 
*/
+   private final int connectionIndex;
+
+   public PartitionDescriptor(
 
 Review comment:
   @VisibleForTesting


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface

2019-05-30 Thread GitBox
zhijiangW commented on a change in pull request #8362: [FLINK-11391] Introduce 
shuffle master interface
URL: https://github.com/apache/flink/pull/8362#discussion_r289247475
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ProducerDescriptor.java
 ##
 @@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.shuffle;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+import java.net.InetAddress;
+
+/**
+ * Partition producer descriptor for {@link ShuffleMaster} to obtain {@link 
ShuffleDescriptor}.
+ *
+ * The producer descriptor contains a general producer specific information 
relevant for shuffle implementation:
+ * the producer location as {@link ResourceID}, {@link ExecutionAttemptID} and 
the network connection information
+ * for shuffle data exchange (address and port).
+ */
+public class ProducerDescriptor {
+   /** The resource ID to identify the container where the producer 
execution is deployed. */
+   private final ResourceID producerResourceId;
+
+   /** The ID of the producer execution attempt. */
+   private final ExecutionAttemptID producerExecutionId;
+
+   /** The address to connect to the producer. */
+   private final InetAddress address;
+
+   /**
+* The port to connect to the producer for shuffle exchange.
+*
+* Negative value means local execution.
+*/
+   private final int dataPort;
+
+   public ProducerDescriptor(
+   ResourceID producerResourceId,
+   ExecutionAttemptID producerExecutionId,
+   InetAddress address,
+   int dataPort) {
+   this.producerResourceId = producerResourceId;
+   this.producerExecutionId = producerExecutionId;
+   this.address = address;
+   this.dataPort = dataPort;
+   }
+
+   ResourceID getProducerResourceId() {
+   return producerResourceId;
+   }
+
+   ExecutionAttemptID getProducerExecutionId() {
+   return producerExecutionId;
+   }
+
+   public InetAddress getAddress() {
+   return address;
+   }
+
+   public int getDataPort() {
+   return dataPort;
+   }
+
+   public static ProducerDescriptor create(TaskManagerLocation 
producerLocation, ExecutionAttemptID attemptId) {
+   return new ProducerDescriptor(
+   producerLocation.getResourceID(),
 
 Review comment:
   checkNotNull for related parameters


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface

2019-05-30 Thread GitBox
zhijiangW commented on a change in pull request #8362: [FLINK-11391] Introduce 
shuffle master interface
URL: https://github.com/apache/flink/pull/8362#discussion_r289247275
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ProducerDescriptor.java
 ##
 @@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.shuffle;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+import java.net.InetAddress;
+
+/**
+ * Partition producer descriptor for {@link ShuffleMaster} to obtain {@link 
ShuffleDescriptor}.
+ *
+ * The producer descriptor contains a general producer specific information 
relevant for shuffle implementation:
+ * the producer location as {@link ResourceID}, {@link ExecutionAttemptID} and 
the network connection information
+ * for shuffle data exchange (address and port).
+ */
+public class ProducerDescriptor {
+   /** The resource ID to identify the container where the producer 
execution is deployed. */
+   private final ResourceID producerResourceId;
+
+   /** The ID of the producer execution attempt. */
+   private final ExecutionAttemptID producerExecutionId;
+
+   /** The address to connect to the producer. */
+   private final InetAddress address;
+
+   /**
+* The port to connect to the producer for shuffle exchange.
+*
+* Negative value means local execution.
+*/
+   private final int dataPort;
+
+   public ProducerDescriptor(
 
 Review comment:
   @VisibleForTesting


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface

2019-05-30 Thread GitBox
zhijiangW commented on a change in pull request #8362: [FLINK-11391] Introduce 
shuffle master interface
URL: https://github.com/apache/flink/pull/8362#discussion_r289247275
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ProducerDescriptor.java
 ##
 @@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.shuffle;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+import java.net.InetAddress;
+
+/**
+ * Partition producer descriptor for {@link ShuffleMaster} to obtain {@link 
ShuffleDescriptor}.
+ *
+ * The producer descriptor contains a general producer specific information 
relevant for shuffle implementation:
+ * the producer location as {@link ResourceID}, {@link ExecutionAttemptID} and 
the network connection information
+ * for shuffle data exchange (address and port).
+ */
+public class ProducerDescriptor {
+   /** The resource ID to identify the container where the producer 
execution is deployed. */
+   private final ResourceID producerResourceId;
+
+   /** The ID of the producer execution attempt. */
+   private final ExecutionAttemptID producerExecutionId;
+
+   /** The address to connect to the producer. */
+   private final InetAddress address;
+
+   /**
+* The port to connect to the producer for shuffle exchange.
+*
+* Negative value means local execution.
+*/
+   private final int dataPort;
+
+   public ProducerDescriptor(
 
 Review comment:
   @VisiableForTesting


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] asfgit closed pull request #8531: [FLINK-12440][python] Add all connector support align Java Table API.

2019-05-30 Thread GitBox
asfgit closed pull request #8531: [FLINK-12440][python] Add all connector 
support align Java Table API.
URL: https://github.com/apache/flink/pull/8531
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-12440) Add all connector support align Java Table API

2019-05-30 Thread sunjincheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12440?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sunjincheng closed FLINK-12440.
---
   Resolution: Fixed
Fix Version/s: 1.9.0

Fixed in master: 836fdfff0db64ff8241f38e8dd362dd50a9d1895

> Add all connector support align Java Table API
> --
>
> Key: FLINK-12440
> URL: https://issues.apache.org/jira/browse/FLINK-12440
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Python
>Affects Versions: 1.9.0
>Reporter: sunjincheng
>Assignee: Wei Zhong
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Add all connector support align Java Table API. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] zhijiangW commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface

2019-05-30 Thread GitBox
zhijiangW commented on a change in pull request #8362: [FLINK-11391] Introduce 
shuffle master interface
URL: https://github.com/apache/flink/pull/8362#discussion_r289246143
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleMaster.java
 ##
 @@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.shuffle;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Intermediate result partition registry to use in {@link 
org.apache.flink.runtime.jobmaster.JobMaster}.
+ */
+public interface ShuffleMaster {
+   CompletableFuture registerPartitionWithProducer(
 
 Review comment:
   empty line before method?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] lirui-apache commented on issue #8564: [FLINK-12649][hive] Add a shim layer to support multiple versions of HMS

2019-05-30 Thread GitBox
lirui-apache commented on issue #8564: [FLINK-12649][hive] Add a shim layer to 
support multiple versions of HMS
URL: https://github.com/apache/flink/pull/8564#issuecomment-497560367
 
 
   > For Hive versions, I think for now we can claim support only for versions 
like 1.2.1 explicitly. In theory is that all 1.2.x versions should be 
compatible. However, we haven't verified this so before doing so let's be 
conservative.
   > 
   > On the other hand, if we eventually only support 1.2.1 and 2.3.4, then I 
guess a lot of users will be turned off because their versions may not match 
any of those.
   
   I think we can be conservative for now, and claim to support more versions 
when we have better test coverage. Besides, allowing user to specify version 
might mitigate the issue, e.g. user can explicitly specify a supported version 
even if the Hive jars in use don't exactly match the supported versions.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wisgood commented on a change in pull request #8571: [FLINK-12682][connectors] StringWriter support custom row delimiter

2019-05-30 Thread GitBox
wisgood commented on a change in pull request #8571: [FLINK-12682][connectors] 
StringWriter support custom row delimiter
URL: https://github.com/apache/flink/pull/8571#discussion_r289245475
 
 

 ##
 File path: 
flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java
 ##
 @@ -82,7 +87,7 @@ public void open(FileSystem fs, Path path) throws 
IOException {
public void write(T element) throws IOException {
FSDataOutputStream outputStream = getStream();
outputStream.write(element.toString().getBytes(charset));
-   outputStream.write('\n');
+   outputStream.write(rowDelimiter.getBytes(charset));
 
 Review comment:
   ok,done


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface

2019-05-30 Thread GitBox
zhijiangW commented on a change in pull request #8362: [FLINK-11391] Introduce 
shuffle master interface
URL: https://github.com/apache/flink/pull/8362#discussion_r289244295
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputGateDeploymentDescriptor.java
 ##
 @@ -54,24 +56,31 @@
 * The index of the consumed subpartition of each consumed partition. 
This index depends on the
 * {@link DistributionPattern} and the subtask indices of the producing 
and consuming task.
 */
+   @Nonnegative
private final int consumedSubpartitionIndex;
 
/** An input channel for each consumed subpartition. */
-   private final InputChannelDeploymentDescriptor[] inputChannels;
+   private final ShuffleDescriptor[] inputChannels;
+
+   /**
+* {@link ResourceID} of partition consume to identify its location.
+*
+* It can be used e.g. to compare with partition producer {@link 
ResourceID} in
+* {@link ProducerDescriptor} to determine producer/consumer 
co-location.
+*/
+   private final ResourceID consumerLocation;
 
 Review comment:
   consumerLocation -> consumerResourceId?  Because in `ProducerDescriptor` or 
`NettyShuffleDescriptor`, we also name `producerResourceId`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface

2019-05-30 Thread GitBox
zhijiangW commented on a change in pull request #8362: [FLINK-11391] Introduce 
shuffle master interface
URL: https://github.com/apache/flink/pull/8362#discussion_r289243434
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleDescriptor.java
 ##
 @@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.shuffle;
+
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+
+import java.io.Serializable;
+
+/**
+ * Interface for shuffle deployment descriptor of result partition resource.
+ */
+public interface ShuffleDescriptor extends Serializable {
+   ResultPartitionID getResultPartitionID();
 
 Review comment:
   Add empty line before this method?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8362: [FLINK-11391] Introduce shuffle master interface

2019-05-30 Thread GitBox
zhijiangW commented on a change in pull request #8362: [FLINK-11391] Introduce 
shuffle master interface
URL: https://github.com/apache/flink/pull/8362#discussion_r289242090
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputGateDeploymentDescriptor.java
 ##
 @@ -54,24 +56,31 @@
 * The index of the consumed subpartition of each consumed partition. 
This index depends on the
 * {@link DistributionPattern} and the subtask indices of the producing 
and consuming task.
 */
+   @Nonnegative
private final int consumedSubpartitionIndex;
 
/** An input channel for each consumed subpartition. */
-   private final InputChannelDeploymentDescriptor[] inputChannels;
+   private final ShuffleDescriptor[] inputChannels;
+
+   /**
+* {@link ResourceID} of partition consume to identify its location.
+*
+* It can be used e.g. to compare with partition producer {@link 
ResourceID} in
+* {@link ProducerDescriptor} to determine producer/consumer 
co-location.
+*/
+   private final ResourceID consumerLocation;
 
public InputGateDeploymentDescriptor(
IntermediateDataSetID consumedResultId,
ResultPartitionType consumedPartitionType,
-   int consumedSubpartitionIndex,
-   InputChannelDeploymentDescriptor[] inputChannels) {
-
-   this.consumedResultId = checkNotNull(consumedResultId);
-   this.consumedPartitionType = 
checkNotNull(consumedPartitionType);
-
-   checkArgument(consumedSubpartitionIndex >= 0);
+   @Nonnegative int consumedSubpartitionIndex,
+   ShuffleDescriptor[] inputChannels,
+   ResourceID consumerLocation) {
+   this.consumedResultId = consumedResultId;
 
 Review comment:
   keep checkNotNull for these parameters?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on issue #8531: [FLINK-12440][python] Add all connector support align Java Table API.

2019-05-30 Thread GitBox
sunjincheng121 commented on issue #8531: [FLINK-12440][python] Add all 
connector support align Java Table API.
URL: https://github.com/apache/flink/pull/8531#issuecomment-497555012
 
 
   LGTM. +1 to merged.
   @flinkbot approve all


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] Myasuka commented on a change in pull request #8571: [FLINK-12682][connectors] StringWriter support custom row delimiter

2019-05-30 Thread GitBox
Myasuka commented on a change in pull request #8571: [FLINK-12682][connectors] 
StringWriter support custom row delimiter
URL: https://github.com/apache/flink/pull/8571#discussion_r289240462
 
 

 ##
 File path: 
flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java
 ##
 @@ -82,7 +87,7 @@ public void open(FileSystem fs, Path path) throws 
IOException {
public void write(T element) throws IOException {
FSDataOutputStream outputStream = getStream();
outputStream.write(element.toString().getBytes(charset));
-   outputStream.write('\n');
+   outputStream.write(rowDelimiter.getBytes(charset));
 
 Review comment:
   We could store the `delimiter` in bytes as the private field in 
`StringWriter.java` instead of serializing this every time.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-12683) Provide task manager's location information for checkpoint coordinator specific log messages

2019-05-30 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16852600#comment-16852600
 ] 

vinoyang commented on FLINK-12683:
--

Hi [~klion26] Here is my plan:

Changing these two methods' parameter list to :
{code:java}
public void receiveDeclineMessage(DeclineCheckpoint message) -> public void 
receiveDeclineMessage(DeclineCheckpoint message, TaskManagerLocation 
taskManagerLocation);

public boolean receiveAcknowledgeMessage(AcknowledgeCheckpoint message) -> 
public boolean receiveAcknowledgeMessage(AcknowledgeCheckpoint message, 
TaskManagerLocation taskManagerLocation);
{code}
So that we do not need to introduce the {{ExecutionGraph}} or the mapping 
relationship of {{ExecutionAttemptID}} and {{Execution}} to the 
{{CheckpointCoordinator}}. 

Currently, the caller of both methods are {{LegacyScheduler}} , in this class 
we can access the instance of {{ExecutionGraph}} and get the Execution by 
{{ExecutionAttemptID}}, then we can call 
{{Execution#getAssignedResourceLocation}}. WDYT?

 

 

> Provide task manager's location information for checkpoint coordinator 
> specific log messages
> 
>
> Key: FLINK-12683
> URL: https://issues.apache.org/jira/browse/FLINK-12683
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Major
>
> Currently, the {{AcknowledgeCheckpoint}} does not contain the task manager's 
> location information. When a task's snapshot task sends an ack message to the 
> coordinator, we can only log this message:
> {code:java}
> Received late message for now expired checkpoint attempt 6035 from 
> ccd88d08bf82245f3466c9480fb5687a of job 775ef8ff0159b071da7804925bbd362f.
> {code}
> Sometimes we need to get this sub task's location information to do the 
> further debug work, e.g. stack trace dump. But, without the location 
> information, It will not help to quickly locate the problem.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] zhijiangW commented on a change in pull request #8566: [FLINK-12673][network] Introduce NetworkEnvironment.getUnreleasedPartitions instead of using getResultPartitionManager

2019-05-30 Thread GitBox
zhijiangW commented on a change in pull request #8566: [FLINK-12673][network] 
Introduce NetworkEnvironment.getUnreleasedPartitions instead of using 
getResultPartitionManager
URL: https://github.com/apache/flink/pull/8566#discussion_r289239909
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManager.java
 ##
 @@ -122,14 +123,9 @@ void onConsumedPartition(ResultPartition partition) {
}
}
 
-   public boolean areAllPartitionsReleased() {
+   public Collection getUnreleasedPartitions() {
synchronized (registeredPartitions) {
-   for (ResultPartition partition : 
registeredPartitions.values()) {
-   if (!partition.isReleased()) {
-   return false;
-   }
-   }
-   return true;
+   return registeredPartitions.keySet();
 
 Review comment:
   Yes, the `partition#release` should also put under the synchronized in 
`ResultPartitionManager#onConsumedPartition`, then it could solve this issue.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-12683) Provide task manager's location information for checkpoint coordinator specific log messages

2019-05-30 Thread vinoyang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12683?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

vinoyang updated FLINK-12683:
-
Summary: Provide task manager's location information for checkpoint 
coordinator specific log messages  (was: Provide task manager's location 
information for checkpoint acknowledge log message)

> Provide task manager's location information for checkpoint coordinator 
> specific log messages
> 
>
> Key: FLINK-12683
> URL: https://issues.apache.org/jira/browse/FLINK-12683
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Major
>
> Currently, the {{AcknowledgeCheckpoint}} does not contain the task manager's 
> location information. When a task's snapshot task sends an ack message to the 
> coordinator, we can only log this message:
> {code:java}
> Received late message for now expired checkpoint attempt 6035 from 
> ccd88d08bf82245f3466c9480fb5687a of job 775ef8ff0159b071da7804925bbd362f.
> {code}
> Sometimes we need to get this sub task's location information to do the 
> further debug work, e.g. stack trace dump. But, without the location 
> information, It will not help to quickly locate the problem.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12683) Provide task manager's location information for checkpoint acknowledge log message

2019-05-30 Thread vinoyang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12683?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

vinoyang updated FLINK-12683:
-
Summary: Provide task manager's location information for checkpoint 
acknowledge log message  (was: Provide task manager's location information for 
checkpoint acknowledge message to improve the checkpoint log message)

> Provide task manager's location information for checkpoint acknowledge log 
> message
> --
>
> Key: FLINK-12683
> URL: https://issues.apache.org/jira/browse/FLINK-12683
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Major
>
> Currently, the {{AcknowledgeCheckpoint}} does not contain the task manager's 
> location information. When a task's snapshot task sends an ack message to the 
> coordinator, we can only log this message:
> {code:java}
> Received late message for now expired checkpoint attempt 6035 from 
> ccd88d08bf82245f3466c9480fb5687a of job 775ef8ff0159b071da7804925bbd362f.
> {code}
> Sometimes we need to get this sub task's location information to do the 
> further debug work, e.g. stack trace dump. But, without the location 
> information, It will not help to quickly locate the problem.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12683) Provide task manager's location information for checkpoint acknowledge message to improve the checkpoint log message

2019-05-30 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16852592#comment-16852592
 ] 

vinoyang commented on FLINK-12683:
--

[~klion26] you are right, thanks, will change this issue title.

> Provide task manager's location information for checkpoint acknowledge 
> message to improve the checkpoint log message
> 
>
> Key: FLINK-12683
> URL: https://issues.apache.org/jira/browse/FLINK-12683
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Major
>
> Currently, the {{AcknowledgeCheckpoint}} does not contain the task manager's 
> location information. When a task's snapshot task sends an ack message to the 
> coordinator, we can only log this message:
> {code:java}
> Received late message for now expired checkpoint attempt 6035 from 
> ccd88d08bf82245f3466c9480fb5687a of job 775ef8ff0159b071da7804925bbd362f.
> {code}
> Sometimes we need to get this sub task's location information to do the 
> further debug work, e.g. stack trace dump. But, without the location 
> information, It will not help to quickly locate the problem.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] flinkbot commented on issue #8578: [FLINK-12685] [table-planner-blink] Supports UNNEST query in blink planner

2019-05-30 Thread GitBox
flinkbot commented on issue #8578: [FLINK-12685] [table-planner-blink] Supports 
UNNEST query in blink planner
URL: https://github.com/apache/flink/pull/8578#issuecomment-497551523
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] godfreyhe opened a new pull request #8578: [FLINK-12685] [table-planner-blink] Supports UNNEST query in blink planner

2019-05-30 Thread GitBox
godfreyhe opened a new pull request #8578: [FLINK-12685] [table-planner-blink] 
Supports UNNEST query in blink planner
URL: https://github.com/apache/flink/pull/8578
 
 
   
   
   ## What is the purpose of the change
   
   *Supports UNNEST query in blink planner*
   
   
   ## Brief change log
   
 - *added LogicalUnnestRule that rewrites UNNEST to explode function*
 - *added NothingType that is a placeholder internal type for Nothing type*
   
   ## Verifying this change
   
   
   This change added tests and can be verified as follows:
   
 - *Added test that validates the logical plan and physical plan after 
LogicalUnnestRule applied*
 - *Extended integration test for UNNEST queries*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / **no**)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ **not documented**)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-12685) Supports UNNEST query in blink planner

2019-05-30 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12685?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-12685:
---
Labels: pull-request-available  (was: )

> Supports UNNEST query in blink planner
> --
>
> Key: FLINK-12685
> URL: https://issues.apache.org/jira/browse/FLINK-12685
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / Planner
>Reporter: godfrey he
>Assignee: godfrey he
>Priority: Major
>  Labels: pull-request-available
>
> this issue aim to support queries with UNNEST keyword, which relate to nested 
> fields.
> for example: 
> table name: 
> MyTable
> schema: 
> a: int, b int, c array[int]
> sql:
> SELECT a, b, s FROM MyTable, UNNEST(MyTable.c) AS A (s)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] klion26 commented on a change in pull request #8573: [FLINK-12670][runtime] Implement FailureRateRestartBackoffTimeStrategy

2019-05-30 Thread GitBox
klion26 commented on a change in pull request #8573: [FLINK-12670][runtime] 
Implement FailureRateRestartBackoffTimeStrategy
URL: https://github.com/apache/flink/pull/8573#discussion_r289237692
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailureRateRestartBackoffTimeStrategyTest.java
 ##
 @@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover.flip1;
+
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Unit tests for {@link FailureRateRestartBackoffTimeStrategy}.
+ */
+public class FailureRateRestartBackoffTimeStrategyTest extends TestLogger {
+
+   private final Exception failure = new Exception();
 
 Review comment:
   Do we need to add a test for `backofTimeMS` and class 
`FailureRateRestartBackoffTimeStrategyFactory`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] klion26 commented on a change in pull request #8573: [FLINK-12670][runtime] Implement FailureRateRestartBackoffTimeStrategy

2019-05-30 Thread GitBox
klion26 commented on a change in pull request #8573: [FLINK-12670][runtime] 
Implement FailureRateRestartBackoffTimeStrategy
URL: https://github.com/apache/flink/pull/8573#discussion_r289237526
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailureRateRestartBackoffTimeStrategy.java
 ##
 @@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover.flip1;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+
+import scala.concurrent.duration.Duration;
+
+import java.util.ArrayDeque;
+
+import static 
org.apache.flink.configuration.ConfigConstants.RESTART_BACKOFF_TIME_STRATEGY_FAILURE_RATE_FAILURE_RATE_INTERVAL;
+import static 
org.apache.flink.configuration.ConfigConstants.RESTART_BACKOFF_TIME_STRATEGY_FAILURE_RATE_MAX_FAILURES_PER_INTERVAL;
+import static 
org.apache.flink.configuration.ConfigConstants.RESTART_BACKOFF_TIME_STRATEGY_RESTART_BACKOFF_TIME;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Restart strategy which can restart when failure rate is not exceeded.
+ */
+public class FailureRateRestartBackoffTimeStrategy implements 
RestartBackoffTimeStrategy {
+
+   private final long failuresIntervalMS;
+
+   private final long backoffTimeMS;
+
+   private final int maxFailuresPerInterval;
+
+   private final ArrayDeque failureTimestamps;
+
+   private final String strategyString;
+
+   public FailureRateRestartBackoffTimeStrategy(int 
maxFailuresPerInterval, long failuresInterval, long backoffTimeMS) {
+
+   checkArgument(maxFailuresPerInterval > 0, "Maximum number of 
restart attempts per time unit must be greater than 0.");
+   checkArgument(failuresInterval > 0, "Failures interval must be 
greater than 0 ms.");
+   checkArgument(backoffTimeMS >= 0, "Backoff time must be at 
least 0 ms.");
+
+   this.failuresIntervalMS = failuresInterval;
+   this.backoffTimeMS = backoffTimeMS;
+   this.maxFailuresPerInterval = maxFailuresPerInterval;
+   this.failureTimestamps = new 
ArrayDeque<>(maxFailuresPerInterval);
+   this.strategyString = generateStrategyString();
+   }
+
+   @Override
+   public boolean canRestart() {
+   if (isFailureTimestampsQueueFull()) {
+   Long now = System.currentTimeMillis();
+   Long earliestFailure = failureTimestamps.peek();
+
+   return (now - earliestFailure) > failuresIntervalMS;
+   } else {
+   return true;
+   }
+   }
+
+   @Override
+   public long getBackoffTime() {
+   return backoffTimeMS;
+   }
+
+   @Override
+   public void notifyFailure(Throwable cause) {
+   if (isFailureTimestampsQueueFull()) {
+   failureTimestamps.remove();
+   }
+   failureTimestamps.add(System.currentTimeMillis());
+   }
+
+   @Override
+   public String toString() {
+   return strategyString;
+   }
+
+   private boolean isFailureTimestampsQueueFull() {
+   return failureTimestamps.size() >= maxFailuresPerInterval;
+   }
+
+   private String generateStrategyString() {
+   StringBuilder buf = new 
StringBuilder("FailureRateRestartBackoffTimeStrategy(");
+   
buf.append("FailureRateRestartBackoffTimeStrategy(failuresIntervalMS=");
+   buf.append(failuresIntervalMS);
+   buf.append(",backoffTimeMS=");
+   buf.append(backoffTimeMS);
+   buf.append(",maxFailuresPerInterval=");
+   buf.append(maxFailuresPerInterval);
+   buf.append(")");
+
+   return buf.toString();
+   }
+
+   public static FailureRateRestartBackoffTimeStrategyFactory 
createFactory(final Configuration configuration) {
+   return new 

[GitHub] [flink] sunjincheng121 commented on issue #8474: [FLINK-12409][python] Adds from_elements in TableEnvironment

2019-05-30 Thread GitBox
sunjincheng121 commented on issue #8474: [FLINK-12409][python] Adds 
from_elements in TableEnvironment
URL: https://github.com/apache/flink/pull/8474#issuecomment-497548692
 
 
   Thanks for the quick updated! @dianfu 
   LGTM. 
   From my side, +1 to merged.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-12683) Provide task manager's location information for checkpoint acknowledge message to improve the checkpoint log message

2019-05-30 Thread Congxian Qiu(klion26) (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16852572#comment-16852572
 ] 

Congxian Qiu(klion26) commented on FLINK-12683:
---

Please have a look at FLINK-1165.:)

> Provide task manager's location information for checkpoint acknowledge 
> message to improve the checkpoint log message
> 
>
> Key: FLINK-12683
> URL: https://issues.apache.org/jira/browse/FLINK-12683
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Major
>
> Currently, the {{AcknowledgeCheckpoint}} does not contain the task manager's 
> location information. When a task's snapshot task sends an ack message to the 
> coordinator, we can only log this message:
> {code:java}
> Received late message for now expired checkpoint attempt 6035 from 
> ccd88d08bf82245f3466c9480fb5687a of job 775ef8ff0159b071da7804925bbd362f.
> {code}
> Sometimes we need to get this sub task's location information to do the 
> further debug work, e.g. stack trace dump. But, without the location 
> information, It will not help to quickly locate the problem.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-12683) Provide task manager's location information for checkpoint acknowledge message to improve the checkpoint log message

2019-05-30 Thread Congxian Qiu(klion26) (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16852572#comment-16852572
 ] 

Congxian Qiu(klion26) edited comment on FLINK-12683 at 5/31/19 2:03 AM:


Please have a look at FLINK-11165.:)


was (Author: klion26):
Please have a look at FLINK-1165.:)

> Provide task manager's location information for checkpoint acknowledge 
> message to improve the checkpoint log message
> 
>
> Key: FLINK-12683
> URL: https://issues.apache.org/jira/browse/FLINK-12683
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Major
>
> Currently, the {{AcknowledgeCheckpoint}} does not contain the task manager's 
> location information. When a task's snapshot task sends an ack message to the 
> coordinator, we can only log this message:
> {code:java}
> Received late message for now expired checkpoint attempt 6035 from 
> ccd88d08bf82245f3466c9480fb5687a of job 775ef8ff0159b071da7804925bbd362f.
> {code}
> Sometimes we need to get this sub task's location information to do the 
> further debug work, e.g. stack trace dump. But, without the location 
> information, It will not help to quickly locate the problem.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12685) Supports UNNEST query in blink planner

2019-05-30 Thread godfrey he (JIRA)
godfrey he created FLINK-12685:
--

 Summary: Supports UNNEST query in blink planner
 Key: FLINK-12685
 URL: https://issues.apache.org/jira/browse/FLINK-12685
 Project: Flink
  Issue Type: New Feature
  Components: Table SQL / Planner
Reporter: godfrey he
Assignee: godfrey he


this issue aim to support queries with UNNEST keyword, which relate to nested 
fields.
for example: 
table name: 
MyTable

schema: 
a: int, b int, c array[int]

sql:
SELECT a, b, s FROM MyTable, UNNEST(MyTable.c) AS A (s)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10455) Potential Kafka producer leak in case of failures

2019-05-30 Thread sunjincheng (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16852565#comment-16852565
 ] 

sunjincheng commented on FLINK-10455:
-

Hi, [~cslotterback] thanks for your reply. and do not worry about the 
experience of flink commit, everyone is from scratch. And [~becket_qin] as the 
PMC of Kafka, so he has more experience about Kafka. I assigned the ticket to 
him, but I think you can share your thought here, and work with [~becket_qin] 
and all contributors who care about this JIRA. We can grow and progress 
together, what do you think? :) 

> Potential Kafka producer leak in case of failures
> -
>
> Key: FLINK-10455
> URL: https://issues.apache.org/jira/browse/FLINK-10455
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.5.2
>Reporter: Nico Kruber
>Assignee: Jiangjie Qin
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.5.6, 1.7.0, 1.7.3, 1.9.0, 1.8.1
>
>
> If the Kafka brokers' timeout is too low for our checkpoint interval [1], we 
> may get an {{ProducerFencedException}}. Documentation around 
> {{ProducerFencedException}} explicitly states that we should close the 
> producer after encountering it.
> By looking at the code, it doesn't seem like this is actually done in 
> {{FlinkKafkaProducer011}}. Also, in case one transaction's commit in 
> {{TwoPhaseCommitSinkFunction#notifyCheckpointComplete}} fails with an 
> exception, we don't clean up (nor try to commit) any other transaction.
> -> from what I see, {{TwoPhaseCommitSinkFunction#notifyCheckpointComplete}} 
> simply iterates over the {{pendingCommitTransactions}} which is not touched 
> during {{close()}}
> Now if we restart the failing job on the same Flink cluster, any resources 
> from the previous attempt will still linger around.
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/connectors/kafka.html#kafka-011



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-10455) Potential Kafka producer leak in case of failures

2019-05-30 Thread sunjincheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10455?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sunjincheng reassigned FLINK-10455:
---

Assignee: Jiangjie Qin

> Potential Kafka producer leak in case of failures
> -
>
> Key: FLINK-10455
> URL: https://issues.apache.org/jira/browse/FLINK-10455
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.5.2
>Reporter: Nico Kruber
>Assignee: Jiangjie Qin
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.5.6, 1.7.0, 1.7.3, 1.9.0, 1.8.1
>
>
> If the Kafka brokers' timeout is too low for our checkpoint interval [1], we 
> may get an {{ProducerFencedException}}. Documentation around 
> {{ProducerFencedException}} explicitly states that we should close the 
> producer after encountering it.
> By looking at the code, it doesn't seem like this is actually done in 
> {{FlinkKafkaProducer011}}. Also, in case one transaction's commit in 
> {{TwoPhaseCommitSinkFunction#notifyCheckpointComplete}} fails with an 
> exception, we don't clean up (nor try to commit) any other transaction.
> -> from what I see, {{TwoPhaseCommitSinkFunction#notifyCheckpointComplete}} 
> simply iterates over the {{pendingCommitTransactions}} which is not touched 
> during {{close()}}
> Now if we restart the failing job on the same Flink cluster, any resources 
> from the previous attempt will still linger around.
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/connectors/kafka.html#kafka-011



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10455) Potential Kafka producer leak in case of failures

2019-05-30 Thread sunjincheng (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16852563#comment-16852563
 ] 

sunjincheng commented on FLINK-10455:
-

Hi [~becket_qin], I am very glad that you are willing to help with this issue. 
I have already assigned this ticket to you :) , thanks!

> Potential Kafka producer leak in case of failures
> -
>
> Key: FLINK-10455
> URL: https://issues.apache.org/jira/browse/FLINK-10455
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.5.2
>Reporter: Nico Kruber
>Assignee: Jiangjie Qin
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.5.6, 1.7.0, 1.7.3, 1.9.0, 1.8.1
>
>
> If the Kafka brokers' timeout is too low for our checkpoint interval [1], we 
> may get an {{ProducerFencedException}}. Documentation around 
> {{ProducerFencedException}} explicitly states that we should close the 
> producer after encountering it.
> By looking at the code, it doesn't seem like this is actually done in 
> {{FlinkKafkaProducer011}}. Also, in case one transaction's commit in 
> {{TwoPhaseCommitSinkFunction#notifyCheckpointComplete}} fails with an 
> exception, we don't clean up (nor try to commit) any other transaction.
> -> from what I see, {{TwoPhaseCommitSinkFunction#notifyCheckpointComplete}} 
> simply iterates over the {{pendingCommitTransactions}} which is not touched 
> during {{close()}}
> Now if we restart the failing job on the same Flink cluster, any resources 
> from the previous attempt will still linger around.
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/connectors/kafka.html#kafka-011



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] danny0405 commented on a change in pull request #8548: [FLINK-6962] [table] Add create(drop) table SQL DDL

2019-05-30 Thread GitBox
danny0405 commented on a change in pull request #8548: [FLINK-6962] [table] Add 
create(drop) table SQL DDL
URL: https://github.com/apache/flink/pull/8548#discussion_r289230491
 
 

 ##
 File path: 
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlWatermarkStrategy.java
 ##
 @@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.ddl;
+
+import org.apache.flink.sql.parser.utils.SqlTimeUnit;
+
+import org.apache.calcite.sql.SqlWriter;
+
+/** Watermark strategies for create table DDL. **/
+public enum SqlWatermarkStrategy {
 
 Review comment:
   Yeah, let's do it in another patch.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8567: [FLINK-12676][table][sql-client] Add descriptor, validator, and factory of GenericInMemoryCatalog for table discovery service

2019-05-30 Thread GitBox
bowenli86 commented on a change in pull request #8567: 
[FLINK-12676][table][sql-client] Add descriptor, validator, and factory of 
GenericInMemoryCatalog for table discovery service
URL: https://github.com/apache/flink/pull/8567#discussion_r289169055
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/CatalogDescriptor.java
 ##
 @@ -35,26 +36,49 @@
 
private final int propertyVersion;
 
+   private final String defaultDatabase;
+
/**
 * Constructs a {@link CatalogDescriptor}.
 *
 * @param type string that identifies this catalog
 * @param propertyVersion property version for backwards compatibility
 */
public CatalogDescriptor(String type, int propertyVersion) {
+   this(type, propertyVersion, null);
+   }
+
+   /**
+* Constructs a {@link CatalogDescriptor}.
+*
+* @param type string that identifies this catalog
+* @param propertyVersion property version for backwards compatibility
+* @param defaultDatabase default database of the catalog
+*/
+   public CatalogDescriptor(String type, int propertyVersion, String 
defaultDatabase) {
this.type = type;
this.propertyVersion = propertyVersion;
+   this.defaultDatabase = defaultDatabase;
}
 
@Override
public final Map toProperties() {
final DescriptorProperties properties = new 
DescriptorProperties();
properties.putString(CATALOG_TYPE, type);
properties.putLong(CATALOG_PROPERTY_VERSION, propertyVersion);
+
+   if (defaultDatabase != null) {
+   properties.putString(CATALOG_DEFAULT_DATABASE, 
defaultDatabase);
+   }
+
properties.putProperties(toCatalogProperties());
return properties.asMap();
}
 
+   public String getDefaultDatabase() {
+   return defaultDatabase;
+   }
+
/**
 * Converts this descriptor into a set of catalog properties.
 */
 
 Review comment:
   this pattern is borrowed from existing descriptors. `toProperties()` is for 
converting common configs for all catalogs, and `toCatalogProperties()` is for 
converting catalog-specific configs. Impl of `toCatalogProperties()` will 
always call `super.toProperties()` and then run their own own logic.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8567: [FLINK-12676][table][sql-client] Add descriptor, validator, and factory of GenericInMemoryCatalog for table discovery service

2019-05-30 Thread GitBox
bowenli86 commented on a change in pull request #8567: 
[FLINK-12676][table][sql-client] Add descriptor, validator, and factory of 
GenericInMemoryCatalog for table discovery service
URL: https://github.com/apache/flink/pull/8567#discussion_r289166409
 
 

 ##
 File path: 
flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java
 ##
 @@ -73,12 +73,12 @@ public void testExecutionConfig() throws Exception {
 
@Test
public void testCatalogs() throws Exception {
-   final String catalogName = "catalog1";
+   final String catalogName = "inmemorycatalog";
final ExecutionContext context = 
createCatalogExecutionContext();
final TableEnvironment tableEnv = 
context.createEnvironmentInstance().getTableEnvironment();
 
assertEquals(tableEnv.getCurrentCatalog(), catalogName);
-   assertEquals(tableEnv.getCurrentDatabase(), "mydatabase");
+   assertEquals(tableEnv.getCurrentDatabase(), 
"test-default-database");
 
 Review comment:
   reverted


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8567: [FLINK-12676][table][sql-client] Add descriptor, validator, and factory of GenericInMemoryCatalog for table discovery service

2019-05-30 Thread GitBox
bowenli86 commented on a change in pull request #8567: 
[FLINK-12676][table][sql-client] Add descriptor, validator, and factory of 
GenericInMemoryCatalog for table discovery service
URL: https://github.com/apache/flink/pull/8567#discussion_r289167578
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/AbstractCatalog.java
 ##
 @@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.util.StringUtils;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * This interface is responsible for reading and writing metadata such as 
database/table/views/UDFs
+ * from a registered catalog. It connects a registered catalog and Flink's 
Table API.
+ */
+@PublicEvolving
+public abstract class AbstractCatalog implements Catalog {
+   private final String catalogName;
+   private final String defaultDatabase;
+
+   public AbstractCatalog(String catalogName, String defaultDatabase) {
+   checkArgument(!StringUtils.isNullOrWhitespaceOnly(catalogName), 
"catalogName cannot be null or empty");
+   
checkArgument(!StringUtils.isNullOrWhitespaceOnly(defaultDatabase), 
"defaultDatabase cannot be null or empty");
+
+   this.catalogName = catalogName;
+   this.defaultDatabase = defaultDatabase;
+   }
+
+   public String getCatalogName() {
 
 Review comment:
   make sense. renamed it


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-12684) User defined configuration for per job clusters

2019-05-30 Thread Richard Moorhead (JIRA)
Richard Moorhead created FLINK-12684:


 Summary: User defined configuration for per job clusters
 Key: FLINK-12684
 URL: https://issues.apache.org/jira/browse/FLINK-12684
 Project: Flink
  Issue Type: New Feature
  Components: Command Line Client, Runtime / Configuration
Reporter: Richard Moorhead


We have a use case where we need to be able to provide job specific 
configuration to a metrics reporter. In other words metrics reporting needs to 
be done at a per job basis rather than static configuration derived from the 
flink installation. In general would having the ability to provide user defined 
config (overrides) at job submission time be a reasonable feature?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10817) Upgrade presto dependency to support path-style access

2019-05-30 Thread Alexander Fedulov (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10817?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16852258#comment-16852258
 ] 

Alexander Fedulov commented on FLINK-10817:
---

[~aljoscha] do you see anything that would block this fix from being backported 
onto 1.7? We have a requirement/request for this.

> Upgrade presto dependency to support path-style access
> --
>
> Key: FLINK-10817
> URL: https://issues.apache.org/jira/browse/FLINK-10817
> Project: Flink
>  Issue Type: Improvement
>Reporter: Adam Lamar
>Assignee: Aljoscha Krettek
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> In order to use any given non-AWS s3 implementation backed by the presto s3 
> filesystem, it is necessary to set at least one configuration parameter in 
> flink-conf.yaml:
>  * presto.s3.endpoint: https://example.com
> This appears to work as expected for hosted s3 alternatives.
> In order to use a bring-your-own, self-hosted s3 alternative like 
> [minio|https://www.minio.io/], at least two configuration parameters are 
> required:
>  * presto.s3.endpoint: https://example.com
>  * presto.s3.path-style-access: true
> However, the second path-style-access parameter doesn't work because the 
> 0.185 version of presto doesn't support passing through that configuration 
> option to the hive s3 client.
> To work around the issue, path-style-access can be forced on the s3 client by 
> using an IP address for the endpoint (instead of a hostname). Without this 
> workaround, flink attempts to use the virtualhost-style at 
> bucketname.example.com, which fails unless the expected DNS records exist.
> To solve this problem and enable non-IP endpoints, upgrade the 
> [pom|https://github.com/apache/flink/blob/master/flink-filesystems/flink-s3-fs-presto/pom.xml#L36]
>  to at least 0.186 which includes [this 
> commit|https://github.com/prestodb/presto/commit/0707f2f21a96d2fd30953fb3fa9a9a03f03d88bd]
>  Note that 0.213 is the latest presto release.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] xuefuz commented on issue #8564: [FLINK-12649][hive] Add a shim layer to support multiple versions of HMS

2019-05-30 Thread GitBox
xuefuz commented on issue #8564: [FLINK-12649][hive] Add a shim layer to 
support multiple versions of HMS
URL: https://github.com/apache/flink/pull/8564#issuecomment-497440703
 
 
   For  Hive versions, I think for now we can claim support only for versions 
like 1.2.1 explicitly. In theory is that all 1.2.x versions should be 
compatible. However, we haven't verified this so before doing so let's be 
conservative. 
   
   On the other hand, if we eventually only support 1.2.1 and 2.3.4, then I 
guess a lot of users will be turned off because their versions may not match 
any of those.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11921) Upgrade Calcite dependency to 1.19

2019-05-30 Thread Rong Rong (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11921?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16852147#comment-16852147
 ] 

Rong Rong commented on FLINK-11921:
---

Hi [~twalthr], [~danny0405] and I discussed briefly that we will skip 1.19 and 
directly to with 1.20, I will rename this to upgrade 1.20 if that's all good 
with everyone (since dependencies and todos are still valid)

> Upgrade Calcite dependency to 1.19
> --
>
> Key: FLINK-11921
> URL: https://issues.apache.org/jira/browse/FLINK-11921
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Reporter: Timo Walther
>Assignee: Rong Rong
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Umbrella issue for all tasks related to the next Calcite upgrade.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] xuefuz commented on a change in pull request #8567: [FLINK-12676][table][sql-client] Add descriptor, validator, and factory of GenericInMemoryCatalog for table discovery service

2019-05-30 Thread GitBox
xuefuz commented on a change in pull request #8567: 
[FLINK-12676][table][sql-client] Add descriptor, validator, and factory of 
GenericInMemoryCatalog for table discovery service
URL: https://github.com/apache/flink/pull/8567#discussion_r289084623
 
 

 ##
 File path: 
flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java
 ##
 @@ -73,12 +73,12 @@ public void testExecutionConfig() throws Exception {
 
@Test
public void testCatalogs() throws Exception {
-   final String catalogName = "catalog1";
+   final String catalogName = "inmemorycatalog";
final ExecutionContext context = 
createCatalogExecutionContext();
final TableEnvironment tableEnv = 
context.createEnvironmentInstance().getTableEnvironment();
 
assertEquals(tableEnv.getCurrentCatalog(), catalogName);
-   assertEquals(tableEnv.getCurrentDatabase(), "mydatabase");
+   assertEquals(tableEnv.getCurrentDatabase(), 
"test-default-database");
 
 Review comment:
   Nit: can we be consistent on how we define the two string variables?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on a change in pull request #8567: [FLINK-12676][table][sql-client] Add descriptor, validator, and factory of GenericInMemoryCatalog for table discovery service

2019-05-30 Thread GitBox
xuefuz commented on a change in pull request #8567: 
[FLINK-12676][table][sql-client] Add descriptor, validator, and factory of 
GenericInMemoryCatalog for table discovery service
URL: https://github.com/apache/flink/pull/8567#discussion_r289079158
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/CatalogDescriptor.java
 ##
 @@ -35,26 +36,49 @@
 
private final int propertyVersion;
 
+   private final String defaultDatabase;
+
/**
 * Constructs a {@link CatalogDescriptor}.
 *
 * @param type string that identifies this catalog
 * @param propertyVersion property version for backwards compatibility
 */
public CatalogDescriptor(String type, int propertyVersion) {
+   this(type, propertyVersion, null);
+   }
+
+   /**
+* Constructs a {@link CatalogDescriptor}.
+*
+* @param type string that identifies this catalog
+* @param propertyVersion property version for backwards compatibility
+* @param defaultDatabase default database of the catalog
+*/
+   public CatalogDescriptor(String type, int propertyVersion, String 
defaultDatabase) {
this.type = type;
this.propertyVersion = propertyVersion;
+   this.defaultDatabase = defaultDatabase;
}
 
@Override
public final Map toProperties() {
final DescriptorProperties properties = new 
DescriptorProperties();
properties.putString(CATALOG_TYPE, type);
properties.putLong(CATALOG_PROPERTY_VERSION, propertyVersion);
+
+   if (defaultDatabase != null) {
+   properties.putString(CATALOG_DEFAULT_DATABASE, 
defaultDatabase);
+   }
+
properties.putProperties(toCatalogProperties());
return properties.asMap();
}
 
+   public String getDefaultDatabase() {
+   return defaultDatabase;
+   }
+
/**
 * Converts this descriptor into a set of catalog properties.
 */
 
 Review comment:
   Side q: what's the purpose of this method?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #8577: [FLINK-10921] [kinesis] Shard watermark synchronization in Kinesis consumer

2019-05-30 Thread GitBox
flinkbot commented on issue #8577: [FLINK-10921] [kinesis] Shard watermark 
synchronization in Kinesis consumer
URL: https://github.com/apache/flink/pull/8577#issuecomment-497399565
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tweise opened a new pull request #8577: [FLINK-10921] [kinesis] Shard watermark synchronization in Kinesis consumer

2019-05-30 Thread GitBox
tweise opened a new pull request #8577: [FLINK-10921] [kinesis] Shard watermark 
synchronization in Kinesis consumer
URL: https://github.com/apache/flink/pull/8577
 
 
   Backport https://github.com/apache/flink/pull/8517


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on a change in pull request #8567: [FLINK-12676][table][sql-client] Add descriptor, validator, and factory of GenericInMemoryCatalog for table discovery service

2019-05-30 Thread GitBox
xuefuz commented on a change in pull request #8567: 
[FLINK-12676][table][sql-client] Add descriptor, validator, and factory of 
GenericInMemoryCatalog for table discovery service
URL: https://github.com/apache/flink/pull/8567#discussion_r289070336
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/CatalogDescriptor.java
 ##
 @@ -35,26 +36,49 @@
 
private final int propertyVersion;
 
+   private final String defaultDatabase;
+
/**
 * Constructs a {@link CatalogDescriptor}.
 *
 * @param type string that identifies this catalog
 * @param propertyVersion property version for backwards compatibility
 */
public CatalogDescriptor(String type, int propertyVersion) {
+   this(type, propertyVersion, null);
+   }
+
+   /**
+* Constructs a {@link CatalogDescriptor}.
+*
+* @param type string that identifies this catalog
+* @param propertyVersion property version for backwards compatibility
+* @param defaultDatabase default database of the catalog
+*/
+   public CatalogDescriptor(String type, int propertyVersion, String 
defaultDatabase) {
this.type = type;
 
 Review comment:
   check null or empty?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on a change in pull request #8567: [FLINK-12676][table][sql-client] Add descriptor, validator, and factory of GenericInMemoryCatalog for table discovery service

2019-05-30 Thread GitBox
xuefuz commented on a change in pull request #8567: 
[FLINK-12676][table][sql-client] Add descriptor, validator, and factory of 
GenericInMemoryCatalog for table discovery service
URL: https://github.com/apache/flink/pull/8567#discussion_r289069456
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/AbstractCatalog.java
 ##
 @@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.util.StringUtils;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * This interface is responsible for reading and writing metadata such as 
database/table/views/UDFs
+ * from a registered catalog. It connects a registered catalog and Flink's 
Table API.
+ */
+@PublicEvolving
+public abstract class AbstractCatalog implements Catalog {
+   private final String catalogName;
+   private final String defaultDatabase;
+
+   public AbstractCatalog(String catalogName, String defaultDatabase) {
+   checkArgument(!StringUtils.isNullOrWhitespaceOnly(catalogName), 
"catalogName cannot be null or empty");
+   
checkArgument(!StringUtils.isNullOrWhitespaceOnly(defaultDatabase), 
"defaultDatabase cannot be null or empty");
+
+   this.catalogName = catalogName;
+   this.defaultDatabase = defaultDatabase;
+   }
+
+   public String getCatalogName() {
 
 Review comment:
   While not introduced here, I think we could remove "catalog" from the method 
and variable name, as these are for Catalog class. The difference is 
"catalog.getName()" vs "catalog.getCatalogName()", with the former is more 
natural.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on a change in pull request #8567: [FLINK-12676][table][sql-client] Add descriptor, validator, and factory of GenericInMemoryCatalog for table discovery service

2019-05-30 Thread GitBox
xuefuz commented on a change in pull request #8567: 
[FLINK-12676][table][sql-client] Add descriptor, validator, and factory of 
GenericInMemoryCatalog for table discovery service
URL: https://github.com/apache/flink/pull/8567#discussion_r289068202
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/AbstractCatalog.java
 ##
 @@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.util.StringUtils;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * This interface is responsible for reading and writing metadata such as 
database/table/views/UDFs
 
 Review comment:
   I think we need to update the comment here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8320: [FLINK-12201][network, metrics] Introduce InputGateWithMetrics in Task to increment numBytesIn metric

2019-05-30 Thread GitBox
zhijiangW commented on a change in pull request #8320: 
[FLINK-12201][network,metrics] Introduce InputGateWithMetrics in Task to 
increment numBytesIn metric
URL: https://github.com/apache/flink/pull/8320#discussion_r289016052
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
 ##
 @@ -227,39 +229,22 @@ protected ResultPartitionWriter createResultPartition(
}
 
private InputGate createInputGate(
-   IntermediateDataSetID dataSetID,
-   ExecutionAttemptID executionAttemptID,
final TaskManagerLocation senderLocation,
-   NetworkEnvironment environment,
+   final SingleInputGateFactory gateFactory,
 
 Review comment:
   I also found this issue before, but considering all the fields in this class 
are defined as protected, so I keep the way as others. 
   Also for the class field factory, I keep the way as field `channels`  to 
pass because it is not initialized in the constructor. Actually I also think 
many other fields in this class could not be passed as well.
   Do you think we should further refactoring this?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on issue #8518: [FLINK-12564][network] Remove getBufferProvider method from ResultPartitionWriter

2019-05-30 Thread GitBox
zhijiangW commented on issue #8518: [FLINK-12564][network] Remove 
getBufferProvider method from ResultPartitionWriter
URL: https://github.com/apache/flink/pull/8518#issuecomment-497348585
 
 
   @zentol Thanks for reviews and I pushed a fixup commit for removing 
`getBufferProvider` from `ResultPartition`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8518: [FLINK-12564][network] Remove getBufferProvider method from ResultPartitionWriter

2019-05-30 Thread GitBox
zhijiangW commented on a change in pull request #8518: [FLINK-12564][network] 
Remove getBufferProvider method from ResultPartitionWriter
URL: https://github.com/apache/flink/pull/8518#discussion_r289011719
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
 ##
 @@ -188,7 +190,7 @@ public int getNumberOfSubpartitions() {
return subpartitions.length;
}
 
-   @Override
+   @VisibleForTesting
public BufferProvider getBufferProvider() {
 
 Review comment:
   It could not be package private, but we could remove it to use 
`getBufferPool` instead. I could submit a fixup for it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dianfu commented on issue #8474: [FLINK-12409][python] Adds from_elements in TableEnvironment

2019-05-30 Thread GitBox
dianfu commented on issue #8474: [FLINK-12409][python] Adds from_elements in 
TableEnvironment
URL: https://github.com/apache/flink/pull/8474#issuecomment-497346537
 
 
   @sunjincheng121 Thanks a lot for your review. Have updated the PR 
accordingly.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dianfu commented on a change in pull request #8474: [FLINK-12409][python] Adds from_elements in TableEnvironment

2019-05-30 Thread GitBox
dianfu commented on a change in pull request #8474: [FLINK-12409][python] Adds 
from_elements in TableEnvironment
URL: https://github.com/apache/flink/pull/8474#discussion_r289010188
 
 

 ##
 File path: docs/ops/cli.md
 ##
 @@ -100,40 +100,40 @@ These examples about how to submit a job in CLI.
 
 -   Run Python Table program:
 
-./bin/flink run -py examples/python/table/batch/word_count.py -j 

+./bin/flink run -py examples/python/table/batch/word_count.py -C 
 -C 
 
 Review comment:
   Make sense to me as flink-python-*-java-binding.jar is always needed for 
Python jobs.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


  1   2   3   >