[GitHub] [flink] rmetzger commented on issue #8303: [FLINK-12343]add file replication config for yarn configuration

2019-05-07 Thread GitBox
rmetzger commented on issue #8303: [FLINK-12343]add file replication config for 
yarn configuration
URL: https://github.com/apache/flink/pull/8303#issuecomment-490370551
 
 
   @xintongsong Thanks a lot for taking a look at the PR.
   Isn't it a good thing (in this case), that the `copyFromLocalFile` blocks 
until the file has been replicated? If it happens asynchronously, we might run 
into a situation where the files are not yet replicated, and the deployment of 
the YARN cluster won't benefit from a higher replication.
   
> It is easy to control which files should be affected while preserve the 
dfs default replication for the others, by passing an argument into the utility.
   
   Of course, we should only set the replication factor of the files uploaded 
by the YARN client and AM, needed for the deployment, not the operation of 
Flink.
   
   This should be validated through a test in Flink.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW removed a comment on issue #8242: [FLINK-6227][network] Introduce the DataConsumptionException for downstream task failure

2019-05-07 Thread GitBox
zhijiangW removed a comment on issue #8242: [FLINK-6227][network] Introduce the 
DataConsumptionException for downstream task failure
URL: https://github.com/apache/flink/pull/8242#issuecomment-490348518
 
 
   Yes, I think we are on the same page now.
   
   I would focus on `b, c` in this PR, and launch separate PRs for the other 
cases future. Especially for `DataConnectionException` we might add the retry 
mechanism if possible during connecting the server and could throw 
`PartitionNotFoundException` or `DataConnectionException` after retry fails.
   
   I already updated the codes in two aspects:
   
   -  Not transform the received `PartitionNotFoundException` on consumer side.
   
   - Send the `PartitionNotFoundException` on producer side in the process of 
creating reader view. If the `ResultPartition` is not removed from 
`ResultPartitionManager`, only the `SpillableSubpartition#createView` might 
throw `IOException` which could indicate the required data file not open 
correctly. So I wrap the `PartitionNotFoundException` in upper layer which 
seems unified for all the `ResultSubpartition` instances besides the  new 
`BoundedBlockingSubpartition` stephan proposed.
   
   After you confirm this way is correct, then I would add new unit tests for 
covering these cases. :) 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-12376) GCS runtime exn: Request payload size exceeds the limit

2019-05-07 Thread Stephan Ewen (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12376?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16835366#comment-16835366
 ] 

Stephan Ewen commented on FLINK-12376:
--

This is actually not a GCS / checkpoint issue, it is an issue in the source 
(probably PubSub connector?)

The checkpoint completes, then Flink notifies the source that the checkpoint is 
complete and the source task acks some IDs back. That ack message is too large 
for the PubSub client's RPC service.

I think we need to rethink how the PubSub source works. Seems that keeping the 
IDS and acknowledging a large number or records is not feasible in a stable way.

I am not a PubSub expert, but is there a way to keep something like a sequence 
number (or vector of sequence numbers), similar to Kafka's offsets? 

> GCS runtime exn: Request payload size exceeds the limit
> ---
>
> Key: FLINK-12376
> URL: https://issues.apache.org/jira/browse/FLINK-12376
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystems
>Affects Versions: 1.7.2
> Environment: FROM flink:1.8.0-scala_2.11
> ARG version=0.17
> ADD 
> https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-latest-hadoop2.jar
>  /opt/flink/lib
> COPY target/analytics-${version}.jar /opt/flink/lib/analytics.jar
>Reporter: Henrik
>Priority: Major
> Attachments: Screenshot 2019-04-30 at 22.32.34.png
>
>
> I'm trying to use the google cloud storage file system, but it would seem 
> that the FLINK / GCS client libs are creating too-large requests far down in 
> the GCS Java client.
> The Java client is added to the lib folder with this command in Dockerfile 
> (probably 
> [hadoop2-1.9.16|https://search.maven.org/artifact/com.google.cloud.bigdataoss/gcs-connector/hadoop2-1.9.16/jar]
>  at the time of writing):
>  
> {code:java}
> ADD 
> https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-latest-hadoop2.jar
>  /opt/flink/lib{code}
> This is the crash output. Focus lines:
> {code:java}
> java.lang.RuntimeException: Error while confirming checkpoint{code}
> and
> {code:java}
>  Caused by: com.google.api.gax.rpc.InvalidArgumentException: 
> io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Request payload size 
> exceeds the limit: 524288 bytes.{code}
> Full stacktrace:
>  
> {code:java}
> [analytics-867c867ff6-l622h taskmanager] 2019-04-30 20:23:14,532 INFO  
> org.apache.flink.runtime.taskmanager.Task - Source: 
> Custom Source -> Process -> Timestamps/Watermarks -> app_events (1/1) 
> (9a01e96c0271025d5ba73b735847cd4c) switched from RUNNING to FAILED.
> [analytics-867c867ff6-l622h taskmanager] java.lang.RuntimeException: Error 
> while confirming checkpoint
> [analytics-867c867ff6-l622h taskmanager]     at 
> org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1211)
> [analytics-867c867ff6-l622h taskmanager]     at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> [analytics-867c867ff6-l622h taskmanager]     at 
> java.util.concurrent.FutureTask.run(FutureTask.java:266)
> [analytics-867c867ff6-l622h taskmanager]     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> [analytics-867c867ff6-l622h taskmanager]     at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> [analytics-867c867ff6-l622h taskmanager]     at 
> java.lang.Thread.run(Thread.java:748)
> [analytics-867c867ff6-l622h taskmanager] Caused by: 
> com.google.api.gax.rpc.InvalidArgumentException: 
> io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Request payload size 
> exceeds the limit: 524288 bytes.
> [analytics-867c867ff6-l622h taskmanager]     at 
> com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:49)
> [analytics-867c867ff6-l622h taskmanager]     at 
> com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:72)
> [analytics-867c867ff6-l622h taskmanager]     at 
> com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:60)
> [analytics-867c867ff6-l622h taskmanager]     at 
> com.google.api.gax.grpc.GrpcExceptionCallable$ExceptionTransformingFuture.onFailure(GrpcExceptionCallable.java:97)
> [analytics-867c867ff6-l622h taskmanager]     at 
> com.google.api.core.ApiFutures$1.onFailure(ApiFutures.java:68)
> [analytics-867c867ff6-l622h taskmanager]     at 
> com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1056)
> [analytics-867c867ff6-l622h taskmanager]     at 
> com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:30)
> [analytics-867c867ff6-l622h taskmanager]     at 
> com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1138)
> [analytics-867c867ff6-l622h taskmanager]     at 
> com.googl

[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8290: [FLINK-12070] [network] Implement new bounded blocking subpartitions

2019-05-07 Thread GitBox
eaglewatcherwb commented on a change in pull request #8290: [FLINK-12070] 
[network] Implement new bounded blocking subpartitions
URL: https://github.com/apache/flink/pull/8290#discussion_r281906582
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PageSizeUtil.java
 ##
 @@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.util.ExceptionUtils;
+
+import org.apache.flink.shaded.netty4.io.netty.util.internal.PlatformDependent;
+import 
org.apache.flink.shaded.netty4.io.netty.util.internal.shaded.org.jctools.util.UnsafeAccess;
+
+import sun.misc.Unsafe;
+
+import javax.annotation.Nullable;
+
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+
+/**
+ * Utility for accessing the system page size.
+ */
+final class PageSizeUtil {
+
+   /** Value indicating an unknown page size. */
+   public static final int PAGE_SIZE_UNKOWN = -1;
+
+   /** The default page size on most system. */
+   public static final int DEFAULT_PAGE_SIZE = 4 * 1024;
+
+   /** A conservative fallback value (64 KiBytes) that should be a 
multiple of the page size even
+* in some uncommon cases of servers installations with 
larger-than-usual page sizes. */
+   public static final int CONSERVATIVE_PAGE_SIZE_MULTIPLE = 64 * 1024;
+
+   /**
+* Tries to get the system page size. If the page size cannot be 
determined, this
+* returns -1.
+*
+* This internally relies on the presence of "unsafe" and the 
resolution via some
+* Netty utilities.
+*/
+   public static int getSystemPageSize() {
+   try {
+   return PageSizeUtilInternal.getSystemPageSize();
+   }
+   catch (Throwable t) {
+   ExceptionUtils.rethrowIfFatalError(t);
+   return PAGE_SIZE_UNKOWN;
+   }
+   }
+
+   /**
+* Tries to get the system page size. If the page size cannot be 
determined, this
+* returns the {@link #DEFAULT_PAGE_SIZE}.
+*/
+   public static int getSystemPageSizeOrDefault() {
+   final int pageSize = getSystemPageSize();
+   return pageSize == PAGE_SIZE_UNKOWN ? DEFAULT_PAGE_SIZE : 
pageSize;
+   }
+
+   /**
+* Tries to get the system page size. If the page size cannot be 
determined, this
+* returns the {@link #CONSERVATIVE_PAGE_SIZE_MULTIPLE}.
+*/
+   public static int getSystemPageSizeOrConservativeMultiple() {
 
 Review comment:
   access of package-private is enough


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8290: [FLINK-12070] [network] Implement new bounded blocking subpartitions

2019-05-07 Thread GitBox
eaglewatcherwb commented on a change in pull request #8290: [FLINK-12070] 
[network] Implement new bounded blocking subpartitions
URL: https://github.com/apache/flink/pull/8290#discussion_r281905867
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionReader.java
 ##
 @@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import 
org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.Iterator;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The reader (read view) of a BoundedBlockingSubpartition.
+ */
+class BoundedBlockingSubpartitionReader implements ResultSubpartitionView {
+
+   /** The result subpartition that we read. */
+   private final BoundedBlockingSubpartition parent;
+
+   /** Further byte buffers, to handle cases where there is more data than 
fits into
+* one mapped byte buffer (2GB = Integer.MAX_VALUE). */
+   private final Iterator furtherData;
+
+   /** The reader/decoder to the memory mapped region with the data we 
currently read from.
+* Max 2GB large. Further regions may be in the {@link #furtherData} 
field.
+* Null once the reader empty or disposed.*/
+   @Nullable
+   private BoundedBlockingSubpartitionMemory.Reader data;
+
+   /** The next buffer (look ahead). Null once the data is depleted or 
reader is disposed. */
+   @Nullable
+   private Buffer nextBuffer;
+
+   /** The remaining number of data buffers (not events) in the result. */
+   private int bufferBacklog;
+
+   /** Flag whether this reader is released. Atomic, to avoid double 
release. */
+   private boolean isReleased;
+
+   /**
+* Convenience constructor that takes a single buffer.
+*/
+   BoundedBlockingSubpartitionReader(
 
 Review comment:
   never used constructor


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8290: [FLINK-12070] [network] Implement new bounded blocking subpartitions

2019-05-07 Thread GitBox
eaglewatcherwb commented on a change in pull request #8290: [FLINK-12070] 
[network] Implement new bounded blocking subpartitions
URL: https://github.com/apache/flink/pull/8290#discussion_r281906373
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PageSizeUtil.java
 ##
 @@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.util.ExceptionUtils;
+
+import org.apache.flink.shaded.netty4.io.netty.util.internal.PlatformDependent;
+import 
org.apache.flink.shaded.netty4.io.netty.util.internal.shaded.org.jctools.util.UnsafeAccess;
+
+import sun.misc.Unsafe;
+
+import javax.annotation.Nullable;
+
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+
+/**
+ * Utility for accessing the system page size.
+ */
+final class PageSizeUtil {
+
+   /** Value indicating an unknown page size. */
+   public static final int PAGE_SIZE_UNKOWN = -1;
 
 Review comment:
   typo, should be `PAGE_SIZE_UNKNOWN`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8290: [FLINK-12070] [network] Implement new bounded blocking subpartitions

2019-05-07 Thread GitBox
eaglewatcherwb commented on a change in pull request #8290: [FLINK-12070] 
[network] Implement new bounded blocking subpartitions
URL: https://github.com/apache/flink/pull/8290#discussion_r281906483
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PageSizeUtil.java
 ##
 @@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.util.ExceptionUtils;
+
+import org.apache.flink.shaded.netty4.io.netty.util.internal.PlatformDependent;
+import 
org.apache.flink.shaded.netty4.io.netty.util.internal.shaded.org.jctools.util.UnsafeAccess;
+
+import sun.misc.Unsafe;
+
+import javax.annotation.Nullable;
+
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+
+/**
+ * Utility for accessing the system page size.
+ */
+final class PageSizeUtil {
+
+   /** Value indicating an unknown page size. */
+   public static final int PAGE_SIZE_UNKOWN = -1;
+
+   /** The default page size on most system. */
+   public static final int DEFAULT_PAGE_SIZE = 4 * 1024;
+
+   /** A conservative fallback value (64 KiBytes) that should be a 
multiple of the page size even
+* in some uncommon cases of servers installations with 
larger-than-usual page sizes. */
+   public static final int CONSERVATIVE_PAGE_SIZE_MULTIPLE = 64 * 1024;
+
+   /**
+* Tries to get the system page size. If the page size cannot be 
determined, this
+* returns -1.
+*
+* This internally relies on the presence of "unsafe" and the 
resolution via some
+* Netty utilities.
+*/
+   public static int getSystemPageSize() {
+   try {
+   return PageSizeUtilInternal.getSystemPageSize();
+   }
+   catch (Throwable t) {
+   ExceptionUtils.rethrowIfFatalError(t);
+   return PAGE_SIZE_UNKOWN;
+   }
+   }
+
+   /**
+* Tries to get the system page size. If the page size cannot be 
determined, this
+* returns the {@link #DEFAULT_PAGE_SIZE}.
+*/
+   public static int getSystemPageSizeOrDefault() {
 
 Review comment:
   never used function


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] eaglewatcherwb commented on a change in pull request #8290: [FLINK-12070] [network] Implement new bounded blocking subpartitions

2019-05-07 Thread GitBox
eaglewatcherwb commented on a change in pull request #8290: [FLINK-12070] 
[network] Implement new bounded blocking subpartitions
URL: https://github.com/apache/flink/pull/8290#discussion_r281906191
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionMemory.java
 ##
 @@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+
+import javax.annotation.Nullable;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Putting and getting of buffers to/from a region of memory.
+ * This class handles the headers, length encoding, memory slicing.
+ */
+final class BoundedBlockingSubpartitionMemory {
+
+   // all fields and methods below here have package-private access to 
avoid bridge
+   // methods when accessing them from the nested classes
+
+   static final int HEADER_LENGTH = 8;
+
+   static final int HEADER_VALUE_IS_BUFFER = 0;
+
+   static final int HEADER_VALUE_IS_EVENT = 1;
+
+   static ByteBuffer checkAndConfigureByteBuffer(ByteBuffer buffer) {
+   checkArgument(buffer.position() == 0);
+   checkArgument(buffer.capacity() > 8);
+   checkArgument(buffer.limit() == buffer.capacity());
+
+   return buffer.order(ByteOrder.nativeOrder());
+   }
+
+   // 

+
+   static final class Writer {
+
+   private final ByteBuffer memory;
+
+   Writer(ByteBuffer memory) {
+   this.memory = checkAndConfigureByteBuffer(memory);
+   }
+
+   public boolean writeBuffer(Buffer buffer) {
+   final ByteBuffer memory = this.memory;
+   final int bufferSize = buffer.getSize();
+
+   if (memory.remaining() < bufferSize + HEADER_LENGTH) {
+   return false;
+   }
+
+   memory.putInt(buffer.isBuffer() ? 
HEADER_VALUE_IS_BUFFER : HEADER_VALUE_IS_EVENT);
+   memory.putInt(bufferSize);
+   memory.put(buffer.getNioBufferReadable());
+   return true;
+   }
+
+   public ByteBuffer complete() {
+   memory.flip();
+   return memory;
+   }
+
+   public int getNumBytes() {
+   return memory.position();
+   }
+   }
+
+   static final class Reader {
+
+   private final ByteBuffer memory;
+
+   Reader(ByteBuffer memory) {
+   this.memory = checkAndConfigureByteBuffer(memory);
+   }
+
+   @Nullable
+   public Buffer sliceNextBuffer() {
+   final ByteBuffer memory = this.memory;
 
 Review comment:
   why use temporary variable `memory`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-12379) Parallelism in job/GCS/Hadoop: Could not finalize the pending checkpoint

2019-05-07 Thread Stephan Ewen (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12379?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16835361#comment-16835361
 ] 

Stephan Ewen commented on FLINK-12379:
--

This looks like the same issue as FLINK-12381

Jobs do not overwrite existing jobs' checkpoints and usually fence paths 
through jobID. Here, the jobID is all zero for some reason. Can you share the 
setup?

> Parallelism in job/GCS/Hadoop: Could not finalize the pending checkpoint
> 
>
> Key: FLINK-12379
> URL: https://issues.apache.org/jira/browse/FLINK-12379
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystems, Runtime / Coordination
>Affects Versions: 1.8.0
> Environment: GCS +
>  
> {code:java}
> 1.8.0
> 1.8
> 2.11{code}
> {code:java}
> 
> 
> 
> 
>   com.google.cloud.bigdataoss
>   gcs-connector
>   hadoop2-1.9.16
> 
> 
>   org.apache.flink
>   flink-connector-filesystem_2.11
>   ${flink.version}
> 
> 
>   org.apache.flink
>   flink-hadoop-fs
>   ${flink.version}
> 
> 
> 
>   org.apache.flink
>   flink-shaded-hadoop2
>   ${hadoop.version}-${flink.version}
> 
> {code}
>  
>  
>Reporter: Henrik
>Priority: Major
>
> When running one standalone-job w/ parallelism=1 + one taskmanager, you will 
> shortly get this crash
> {code:java}
> 2019-04-30 22:20:02,928 WARN  org.apache.flink.runtime.jobmaster.JobMaster
>   - Error while processing checkpoint acknowledgement message
> org.apache.flink.runtime.checkpoint.CheckpointException: Could not finalize 
> the pending checkpoint 5.
>     at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:837)
>     at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:756)
>     at 
> org.apache.flink.runtime.jobmaster.JobMaster.lambda$acknowledgeCheckpoint$9(JobMaster.java:676)
>     at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
>     at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
>     at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>     at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>     at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>     at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: org.apache.hadoop.fs.FileAlreadyExistsException: 
> 'gs://example_bucket/flink/checkpoints//chk-5/_metadata'
>  already exists
>     at 
> com.google.cloud.hadoop.fs.gcs.GoogleHadoopOutputStream.createChannel(GoogleHadoopOutputStream.java:85)
>     at 
> com.google.cloud.hadoop.fs.gcs.GoogleHadoopOutputStream.(GoogleHadoopOutputStream.java:74)
>     at 
> com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.create(GoogleHadoopFileSystemBase.java:797)
>     at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:929)
>     at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:910)
>     at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:807)
>     at 
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:141)
>     at 
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:37)
>     at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream.(FsCheckpointMetadataOutputStream.java:65)
>     at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStorageLocation.createMetadataOutputStream(FsCheckpointStorageLocation.java:104)
>     at 
> org.apache.flink.runtime.checkpoint.PendingCheckpoint.finalizeCheckpoint(PendingCheckpoint.java:259)
>     at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:829)
>     ... 8 more
> Caused by: java.nio.file.FileAlreadyExistsException: Object 
> gs://example_bucket/flink/checkpoints//chk-5/_metadata
>  already exists.
>     at 
> com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.getWriteGeneration(GoogleCloudStorageImpl.java:1918)
>     at 
> com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.create(GoogleCloudStorageImpl.java:410)
>     at 
> com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem.createInternal(GoogleCloudStorageFileSystem.java:286)
>     at 
> com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem.create(GoogleCloudStorageFileSystem.java:264)
>     at 
> com.google.cloud.hadoop.fs.gcs.GoogleHadoopOutputStream.createChannel(GoogleHadoopOutputStream.java:82)
>     ... 19 more
> 2019-04-30 22:20:03,114 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
> checkpoint 6 @ 1556662802928 for job 00

[jira] [Commented] (FLINK-12381) W/o HA, upon a full restart, checkpointing crashes

2019-05-07 Thread Stephan Ewen (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12381?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16835360#comment-16835360
 ] 

Stephan Ewen commented on FLINK-12381:
--

Flink does not overwrite checkpoints, because this could lead to one job 
causing another job data loss.

Paths between jobs should be different due to different JobIDs. Can you share 
your setup, specifically why your job ID is all zero?

> W/o HA, upon a full restart, checkpointing crashes
> --
>
> Key: FLINK-12381
> URL: https://issues.apache.org/jira/browse/FLINK-12381
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Runtime / Coordination
>Affects Versions: 1.8.0
> Environment: Same as FLINK-\{12379, 12377, 12376}
>Reporter: Henrik
>Priority: Major
>
> {code:java}
> Caused by: org.apache.hadoop.fs.FileAlreadyExistsException: 
> 'gs://example_bucket/flink/checkpoints//chk-16/_metadata'
>  already exists
>     at 
> com.google.cloud.hadoop.fs.gcs.GoogleHadoopOutputStream.createChannel(GoogleHadoopOutputStream.java:85)
>     at 
> com.google.cloud.hadoop.fs.gcs.GoogleHadoopOutputStream.(GoogleHadoopOutputStream.java:74)
>     at 
> com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.create(GoogleHadoopFileSystemBase.java:797)
>     at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:929)
>     at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:910)
>     at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:807)
>     at 
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:141)
>     at 
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:37)
>     at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream.(FsCheckpointMetadataOutputStream.java:65)
>     at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStorageLocation.createMetadataOutputStream(FsCheckpointStorageLocation.java:104)
>     at 
> org.apache.flink.runtime.checkpoint.PendingCheckpoint.finalizeCheckpoint(PendingCheckpoint.java:259)
>     at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:829)
>     ... 8 more
> {code}
> Instead, it should either just overwrite the checkpoint or fail to start the 
> job completely. Partial and undefined failure is not what should happen.
>  
> Repro:
>  # Set up a single purpose job cluster (which could use much better docs btw!)
>  # Let it run with GCS checkpointing for a while with rocksdb/gs://example
>  # Kill it
>  # Start it



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] bowenli86 edited a comment on issue #8340: [FLINK-12395][table] Add more detailed javadoc for getDescription() and getDetailedDescription() in catalog object interfaces

2019-05-07 Thread GitBox
bowenli86 edited a comment on issue #8340: [FLINK-12395][table] Add more 
detailed javadoc for getDescription() and getDetailedDescription() in catalog 
object interfaces
URL: https://github.com/apache/flink/pull/8340#issuecomment-490366314
 
 
   Hi @xuefuz , https://issues.apache.org/jira/browse/FLINK-12395 contains the 
discussion with @twalthr  and @dawidwys 
   
   What do you mean by "DDL output"?
   
   I think the output can depend on either the client or the object itself. The 
latter one will actually be easier and more controllable in a central place. In 
this case,  `getDescription()` and `getDetailedDescription()` are similar to 
`toString()` of an object, and "DESCRIBE" is like a print out.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on issue #8340: [FLINK-12395][table] Add more detailed javadoc for getDescription() and getDetailedDescription() in catalog object interfaces

2019-05-07 Thread GitBox
bowenli86 commented on issue #8340: [FLINK-12395][table] Add more detailed 
javadoc for getDescription() and getDetailedDescription() in catalog object 
interfaces
URL: https://github.com/apache/flink/pull/8340#issuecomment-490366314
 
 
   Discussed with @twalthr  and @dawidwys  in 
https://issues.apache.org/jira/browse/FLINK-12395
   
   What do you mean by "DDL output"?
   
   I think the output can depend on either the client or the object itself. The 
latter one will actually be easier and more controllable in a central place. In 
this case,  `getDescription()` and `getDetailedDescription()` are similar to 
`toString()` of an object, and "DESCRIBE" is like a print out.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-12425) Implement RPCs to allow clients release result partitions in a Flink cluster.

2019-05-07 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12425?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16835354#comment-16835354
 ] 

vinoyang commented on FLINK-12425:
--

OK, it doesn't matter.

> Implement RPCs to allow clients release result partitions in a Flink cluster.
> -
>
> Key: FLINK-12425
> URL: https://issues.apache.org/jira/browse/FLINK-12425
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Jiangjie Qin
>Assignee: Ruidong Li
>Priority: Major
>
> This ticket implements the following:
>  # An RPC between client and dispatcher to release result partitions.
>  # An RPC from dispatcher to RM to release result partitions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure

2019-05-07 Thread GitBox
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a 
CheckpointFailureManager to centralized manage checkpoint failure
URL: https://github.com/apache/flink/pull/8322#discussion_r281928748
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
 ##
 @@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.util.Preconditions;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * The checkpoint failure manager which centralized manage checkpoint failure 
processing logic.
+ */
+public class CheckpointFailureManager {
+
+   private final AtomicInteger continuousFailureCounter;
+   private final int tolerableCpFailureNumber;
+
+   public CheckpointFailureManager(int tolerableCpFailureNumber) {
+   Preconditions.checkArgument(tolerableCpFailureNumber > 0,
+   "The tolerable checkpoint failure number must be larger 
than 0.");
+   this.tolerableCpFailureNumber = tolerableCpFailureNumber;
+   this.continuousFailureCounter = new AtomicInteger(0);
+   }
+
+   /**
+* Handle checkpoint exception with a handler callback.
+*
+* @param exception the checkpoint exception.
+* @param callback the handler callback which defines the process logic.
+*/
+   public void handleCheckpointException(CheckpointException exception, 
FailureHandlerCallback callback) {
 
 Review comment:
   I am sorry the change of the implementation makes you confused. The reason I 
introduced a callback interface because we must make the "decision" (let job 
fail or something else). Although we have not distinguished the `Trigger` and 
`Execution` phase. But there are still two different behavior: `sync`(method 
call in trigger phase) and `async`(message in execution phase). So I think the 
`handleCheckpoint` in design doc is not enough. We should track the two phases 
of failure (`triggerCheckpoint` and `abort`). What do you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] yanghua commented on issue #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure

2019-05-07 Thread GitBox
yanghua commented on issue #8322: [FLINK-12364] Introduce a 
CheckpointFailureManager to centralized manage checkpoint failure
URL: https://github.com/apache/flink/pull/8322#issuecomment-490365680
 
 
   @StefanRRichter Really sorry for the late reply, I just took a holiday and 
attended QCon conference Beijing. I have given a more detailed implementation 
so that we could discuss more easily. The new commit cannot be passed by 
Travis, it is for discussion reason. When we agree about the result, I will 
complete this PR.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure

2019-05-07 Thread GitBox
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a 
CheckpointFailureManager to centralized manage checkpoint failure
URL: https://github.com/apache/flink/pull/8322#discussion_r281928748
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
 ##
 @@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.util.Preconditions;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * The checkpoint failure manager which centralized manage checkpoint failure 
processing logic.
+ */
+public class CheckpointFailureManager {
+
+   private final AtomicInteger continuousFailureCounter;
+   private final int tolerableCpFailureNumber;
+
+   public CheckpointFailureManager(int tolerableCpFailureNumber) {
+   Preconditions.checkArgument(tolerableCpFailureNumber > 0,
+   "The tolerable checkpoint failure number must be larger 
than 0.");
+   this.tolerableCpFailureNumber = tolerableCpFailureNumber;
+   this.continuousFailureCounter = new AtomicInteger(0);
+   }
+
+   /**
+* Handle checkpoint exception with a handler callback.
+*
+* @param exception the checkpoint exception.
+* @param callback the handler callback which defines the process logic.
+*/
+   public void handleCheckpointException(CheckpointException exception, 
FailureHandlerCallback callback) {
 
 Review comment:
   I am sorry the change of the implementation make you confused. The reason I 
introduced a callback interface because we must make the "decision" (let job 
fail or something else). Although we have not distinguished the `Trigger` and 
`Execution` phase. But there are still two different behavior: `sync`(method 
call in trigger phase) and `async`(message in execution phase). So I think the 
`handleCheckpoint` in design doc is not enough. We should track the two phases 
of failure. What do you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure

2019-05-07 Thread GitBox
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a 
CheckpointFailureManager to centralized manage checkpoint failure
URL: https://github.com/apache/flink/pull/8322#discussion_r281927266
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
 ##
 @@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.util.Preconditions;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * The checkpoint failure manager which centralized manage checkpoint failure 
processing logic.
+ */
+public class CheckpointFailureManager {
+
+   private final AtomicInteger continuousFailureCounter;
+   private final int tolerableCpFailureNumber;
+
+   public CheckpointFailureManager(int tolerableCpFailureNumber) {
+   Preconditions.checkArgument(tolerableCpFailureNumber > 0,
+   "The tolerable checkpoint failure number must be larger 
than 0.");
+   this.tolerableCpFailureNumber = tolerableCpFailureNumber;
+   this.continuousFailureCounter = new AtomicInteger(0);
+   }
+
+   /**
+* Handle checkpoint exception with a handler callback.
+*
+* @param exception the checkpoint exception.
+* @param callback the handler callback which defines the process logic.
+*/
+   public void handleCheckpointException(CheckpointException exception, 
FailureHandlerCallback callback) {
+   CheckpointFailureReason reason = 
exception.getCheckpointFailureReason();
+   switch (reason) {
+   case PERIODIC_SCHEDULER_SHUTDOWN:
+   case ALREADY_QUEUED:
+   case TOO_MANY_CONCURRENT_CHECKPOINTS:
+   case MINIMUM_TIME_BETWEEN_CHECKPOINTS:
+   case NOT_ALL_REQUIRED_TASKS_RUNNING:
+   case CHECKPOINT_SUBSUMED:
+   case CHECKPOINT_COORDINATOR_SUSPEND:
+   case CHECKPOINT_COORDINATOR_SHUTDOWN:
+   case JOB_FAILURE:
+   case JOB_FAILOVER_REGION:
+   //ignore
+   break;
+
+   case EXCEPTION:
+   case CHECKPOINT_EXPIRED:
+   case CHECKPOINT_DECLINED:
+   case TASK_CHECKPOINT_FAILURE:
+   case TRIGGER_CHECKPOINT_FAILURE:
+   case FINALIZE_CHECKPOINT_FAILURE:
+   continuousFailureCounter.incrementAndGet();
+   break;
+
+   default:
+   throw new RuntimeException("Unknown checkpoint 
failure reason : " + reason.name());
+   }
+
+   if (continuousFailureCounter.get() > tolerableCpFailureNumber) {
+   callback.process();
+   }
+   }
+
+   /**
+* Handle checkpoint success.
+*/
+   public void handleCheckpointSuccess() {
+   continuousFailureCounter.set(0);
 
 Review comment:
   The checkpoint can be executed parallelly. I think the 
"continuousFailureCounter" will just tract the invokable order, not checkpoint 
sequence number. What do you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Assigned] (FLINK-12440) Add all connector support align Java Table API

2019-05-07 Thread sunjincheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12440?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sunjincheng reassigned FLINK-12440:
---

Assignee: (was: sunjincheng)

> Add all connector support align Java Table API
> --
>
> Key: FLINK-12440
> URL: https://issues.apache.org/jira/browse/FLINK-12440
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Python
>Affects Versions: 1.9.0
>Reporter: sunjincheng
>Priority: Major
>
> Add all connector support align Java Table API. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] bowenli86 commented on issue #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog

2019-05-07 Thread GitBox
bowenli86 commented on issue #8353: [FLINK-12233][hive] Support table related 
operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8353#issuecomment-490362402
 
 
   Thanks @xuefuz . The reason I use util methods is because as the logic 
complexity grows, the size of two catalog classes can be quite big. I'm fine to 
remove util methods and keep shared logic in HiveCatalogBase and special logic 
of each catalog in its own class now, and we can reevaluate whenever necessary.
   
   Looking at each commit would probably be easier. To summarize the changes:
   
   - moved shared logic to HiveCatalogBase
   - added a few interfaces in HiveCatalogBase for subclasses to implement
   - updated test util and a few tests to cater to above changes


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-12365) Add stats related catalog APIs

2019-05-07 Thread Kurt Young (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12365?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young closed FLINK-12365.
--
Resolution: Fixed

merged in 1.9.0: 12d698b6e1c1aef03834963fba8a0974b3729c50

> Add stats related catalog APIs
> --
>
> Key: FLINK-12365
> URL: https://issues.apache.org/jira/browse/FLINK-12365
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Xuefu Zhang
>Assignee: Xuefu Zhang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> This is to support  (table and column)  stats for table/partition with 
> related to catalog.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] KurtYoung merged pull request #8314: [FLINK-12365][table] Add stats related catalog APIs

2019-05-07 Thread GitBox
KurtYoung merged pull request #8314: [FLINK-12365][table] Add stats related 
catalog APIs
URL: https://github.com/apache/flink/pull/8314
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure

2019-05-07 Thread GitBox
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a 
CheckpointFailureManager to centralized manage checkpoint failure
URL: https://github.com/apache/flink/pull/8322#discussion_r281923773
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
 ##
 @@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.util.Preconditions;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * The checkpoint failure manager which centralized manage checkpoint failure 
processing logic.
+ */
+public class CheckpointFailureManager {
+
+   private final AtomicInteger continuousFailureCounter;
+   private final int tolerableCpFailureNumber;
+
+   public CheckpointFailureManager(int tolerableCpFailureNumber) {
+   Preconditions.checkArgument(tolerableCpFailureNumber > 0,
 
 Review comment:
   Agree, it's my negligence.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog

2019-05-07 Thread GitBox
bowenli86 commented on a change in pull request #8353: [FLINK-12233][hive] 
Support table related operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8353#discussion_r281922013
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java
 ##
 @@ -197,4 +204,140 @@ protected void alterHiveDatabase(String name, Database 
newHiveDatabase, boolean
throw new CatalogException(String.format("Failed to 
alter database %s", name), e);
}
}
+
+   // -- tables --
+
+   protected void createHiveTable(ObjectPath tablePath, Table table, 
boolean ignoreIfExists)
+   throws TableAlreadyExistException, 
DatabaseNotExistException, CatalogException {
+   if (!databaseExists(tablePath.getDatabaseName())) {
+   throw new DatabaseNotExistException(catalogName, 
tablePath.getDatabaseName());
+   } else {
+   try {
+   client.createTable(table);
+   } catch (AlreadyExistsException e) {
+   if (!ignoreIfExists) {
+   throw new 
TableAlreadyExistException(catalogName, tablePath);
+   }
+   } catch (TException e) {
+   throw new 
CatalogException(String.format("Failed to create table %s", 
tablePath.getFullName()), e);
+   }
+   }
+   }
+
+   @Override
+   public void renameTable(ObjectPath tablePath, String newTableName, 
boolean ignoreIfNotExists)
+   throws TableNotExistException, 
TableAlreadyExistException, CatalogException {
+   try {
+   // alter_table() doesn't throw a clear exception when 
target table doesn't exist. Thus, check the table existence explicitly
+   if (tableExists(tablePath)) {
+   ObjectPath newPath = new 
ObjectPath(tablePath.getDatabaseName(), newTableName);
+   // alter_table() doesn't throw a clear 
exception when new table already exists. Thus, check the table existence 
explicitly
+   if (tableExists(newPath)) {
+   throw new 
TableAlreadyExistException(catalogName, newPath);
+   } else {
+   Table table = getHiveTable(tablePath);
+   table.setTableName(newTableName);
+   
client.alter_table(tablePath.getDatabaseName(), tablePath.getObjectName(), 
table);
+   }
+   } else if (!ignoreIfNotExists) {
+   throw new TableNotExistException(catalogName, 
tablePath);
+   }
+   } catch (TException e) {
+   throw new CatalogException(
+   String.format("Failed to rename table %s", 
tablePath.getFullName()), e);
+   }
+   }
+
+   protected Table getHiveTable(ObjectPath tablePath) throws 
TableNotExistException {
+   try {
+   return client.getTable(tablePath.getDatabaseName(), 
tablePath.getObjectName());
+   } catch (NoSuchObjectException e) {
+   throw new TableNotExistException(catalogName, 
tablePath);
+   } catch (TException e) {
+   throw new CatalogException(
+   String.format("Failed to get table %s from Hive 
metastore", tablePath.getFullName()), e);
+   }
+   }
+
+   @Override
+   public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, 
boolean ignoreIfNotExists)
+   throws TableNotExistException, CatalogException {
+   if (!tableExists(tablePath)) {
+   if (!ignoreIfNotExists) {
+   throw new TableNotExistException(catalogName, 
tablePath);
+   }
+   } else {
+   // IMetastoreClient.alter_table() requires the table to 
have a valid location, which it doesn't in this case
+   // Thus we have to translate alterTable() into 
(dropTable() + createTable())
+   dropTable(tablePath, false);
 
 Review comment:
   good catch, fixed


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog

2019-05-07 Thread GitBox
bowenli86 commented on a change in pull request #8353: [FLINK-12233][hive] 
Support table related operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8353#discussion_r281922282
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveTableConfig.java
 ##
 @@ -19,11 +19,15 @@
 package org.apache.flink.table.catalog.hive;
 
 /**
- * Configs for Flink tables stored in Hive metastore.
+ * Configs for tables in Hive metastore.
  */
 public class HiveTableConfig {
 
-   // Description of the Flink table
+   // ---
 
 Review comment:
   removed


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #8367: [hotfix][doc] typo, remove `one` from `value of one two...`

2019-05-07 Thread GitBox
flinkbot commented on issue #8367: [hotfix][doc] typo, remove `one` from `value 
of one two...`
URL: https://github.com/apache/flink/pull/8367#issuecomment-490357186
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] robstoll opened a new pull request #8367: [hotfix][doc] typo, remove `one` from `value of one two...`

2019-05-07 Thread GitBox
robstoll opened a new pull request #8367: [hotfix][doc] typo, remove `one` from 
`value of one two...`
URL: https://github.com/apache/flink/pull/8367
 
 
   ## What is the purpose of the change
   typo
   
   ## Brief change log
   see changes
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no) => no
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Assigned] (FLINK-12425) Implement RPCs to allow clients release result partitions in a Flink cluster.

2019-05-07 Thread Jiangjie Qin (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12425?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin reassigned FLINK-12425:


Assignee: Ruidong Li

> Implement RPCs to allow clients release result partitions in a Flink cluster.
> -
>
> Key: FLINK-12425
> URL: https://issues.apache.org/jira/browse/FLINK-12425
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Jiangjie Qin
>Assignee: Ruidong Li
>Priority: Major
>
> This ticket implements the following:
>  # An RPC between client and dispatcher to release result partitions.
>  # An RPC from dispatcher to RM to release result partitions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12425) Implement RPCs to allow clients release result partitions in a Flink cluster.

2019-05-07 Thread Jiangjie Qin (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12425?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16835330#comment-16835330
 ] 

Jiangjie Qin commented on FLINK-12425:
--

[~yanghua] Thanks for being interested in working on this. I think [~RuidongLi] 
already had a patch ready. I forgot to assign the ticket to him when I create 
the issue. Sorry for the confusion.

> Implement RPCs to allow clients release result partitions in a Flink cluster.
> -
>
> Key: FLINK-12425
> URL: https://issues.apache.org/jira/browse/FLINK-12425
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Jiangjie Qin
>Priority: Major
>
> This ticket implements the following:
>  # An RPC between client and dispatcher to release result partitions.
>  # An RPC from dispatcher to RM to release result partitions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12379) Parallelism in job/GCS/Hadoop: Could not finalize the pending checkpoint

2019-05-07 Thread Robert Metzger (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12379?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger updated FLINK-12379:
---
Component/s: Runtime / Coordination

> Parallelism in job/GCS/Hadoop: Could not finalize the pending checkpoint
> 
>
> Key: FLINK-12379
> URL: https://issues.apache.org/jira/browse/FLINK-12379
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystems, Runtime / Coordination
>Affects Versions: 1.8.0
> Environment: GCS +
>  
> {code:java}
> 1.8.0
> 1.8
> 2.11{code}
> {code:java}
> 
> 
> 
> 
>   com.google.cloud.bigdataoss
>   gcs-connector
>   hadoop2-1.9.16
> 
> 
>   org.apache.flink
>   flink-connector-filesystem_2.11
>   ${flink.version}
> 
> 
>   org.apache.flink
>   flink-hadoop-fs
>   ${flink.version}
> 
> 
> 
>   org.apache.flink
>   flink-shaded-hadoop2
>   ${hadoop.version}-${flink.version}
> 
> {code}
>  
>  
>Reporter: Henrik
>Priority: Major
>
> When running one standalone-job w/ parallelism=1 + one taskmanager, you will 
> shortly get this crash
> {code:java}
> 2019-04-30 22:20:02,928 WARN  org.apache.flink.runtime.jobmaster.JobMaster
>   - Error while processing checkpoint acknowledgement message
> org.apache.flink.runtime.checkpoint.CheckpointException: Could not finalize 
> the pending checkpoint 5.
>     at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:837)
>     at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:756)
>     at 
> org.apache.flink.runtime.jobmaster.JobMaster.lambda$acknowledgeCheckpoint$9(JobMaster.java:676)
>     at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
>     at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
>     at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>     at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>     at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>     at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: org.apache.hadoop.fs.FileAlreadyExistsException: 
> 'gs://example_bucket/flink/checkpoints//chk-5/_metadata'
>  already exists
>     at 
> com.google.cloud.hadoop.fs.gcs.GoogleHadoopOutputStream.createChannel(GoogleHadoopOutputStream.java:85)
>     at 
> com.google.cloud.hadoop.fs.gcs.GoogleHadoopOutputStream.(GoogleHadoopOutputStream.java:74)
>     at 
> com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.create(GoogleHadoopFileSystemBase.java:797)
>     at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:929)
>     at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:910)
>     at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:807)
>     at 
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:141)
>     at 
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:37)
>     at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream.(FsCheckpointMetadataOutputStream.java:65)
>     at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStorageLocation.createMetadataOutputStream(FsCheckpointStorageLocation.java:104)
>     at 
> org.apache.flink.runtime.checkpoint.PendingCheckpoint.finalizeCheckpoint(PendingCheckpoint.java:259)
>     at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:829)
>     ... 8 more
> Caused by: java.nio.file.FileAlreadyExistsException: Object 
> gs://example_bucket/flink/checkpoints//chk-5/_metadata
>  already exists.
>     at 
> com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.getWriteGeneration(GoogleCloudStorageImpl.java:1918)
>     at 
> com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.create(GoogleCloudStorageImpl.java:410)
>     at 
> com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem.createInternal(GoogleCloudStorageFileSystem.java:286)
>     at 
> com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem.create(GoogleCloudStorageFileSystem.java:264)
>     at 
> com.google.cloud.hadoop.fs.gcs.GoogleHadoopOutputStream.createChannel(GoogleHadoopOutputStream.java:82)
>     ... 19 more
> 2019-04-30 22:20:03,114 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
> checkpoint 6 @ 1556662802928 for job .{code}
> My guess at why; concurrent checkpoint writers are updating the _metadata 
> resource concurrently. They should be using optimistic concurrency control 
> with ETag on GCS, and then

[jira] [Updated] (FLINK-12381) W/o HA, upon a full restart, checkpointing crashes

2019-05-07 Thread Robert Metzger (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12381?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger updated FLINK-12381:
---
Component/s: Runtime / Coordination

> W/o HA, upon a full restart, checkpointing crashes
> --
>
> Key: FLINK-12381
> URL: https://issues.apache.org/jira/browse/FLINK-12381
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Runtime / Coordination
>Affects Versions: 1.8.0
> Environment: Same as FLINK-\{12379, 12377, 12376}
>Reporter: Henrik
>Priority: Major
>
> {code:java}
> Caused by: org.apache.hadoop.fs.FileAlreadyExistsException: 
> 'gs://example_bucket/flink/checkpoints//chk-16/_metadata'
>  already exists
>     at 
> com.google.cloud.hadoop.fs.gcs.GoogleHadoopOutputStream.createChannel(GoogleHadoopOutputStream.java:85)
>     at 
> com.google.cloud.hadoop.fs.gcs.GoogleHadoopOutputStream.(GoogleHadoopOutputStream.java:74)
>     at 
> com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.create(GoogleHadoopFileSystemBase.java:797)
>     at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:929)
>     at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:910)
>     at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:807)
>     at 
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:141)
>     at 
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:37)
>     at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream.(FsCheckpointMetadataOutputStream.java:65)
>     at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStorageLocation.createMetadataOutputStream(FsCheckpointStorageLocation.java:104)
>     at 
> org.apache.flink.runtime.checkpoint.PendingCheckpoint.finalizeCheckpoint(PendingCheckpoint.java:259)
>     at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:829)
>     ... 8 more
> {code}
> Instead, it should either just overwrite the checkpoint or fail to start the 
> job completely. Partial and undefined failure is not what should happen.
>  
> Repro:
>  # Set up a single purpose job cluster (which could use much better docs btw!)
>  # Let it run with GCS checkpointing for a while with rocksdb/gs://example
>  # Kill it
>  # Start it



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12384) Rolling the etcd servers causes "Connected to an old server; r-o mode will be unavailable"

2019-05-07 Thread Robert Metzger (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12384?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger updated FLINK-12384:
---
Component/s: Runtime / Coordination

> Rolling the etcd servers causes "Connected to an old server; r-o mode will be 
> unavailable"
> --
>
> Key: FLINK-12384
> URL: https://issues.apache.org/jira/browse/FLINK-12384
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Reporter: Henrik
>Priority: Major
>
> {code:java}
> [tm] 2019-05-01 13:30:53,316 INFO  
> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - 
> Initiating client connection, connectString=analytics-zetcd:2181 
> sessionTimeout=6 
> watcher=org.apache.flink.shaded.curator.org.apache.curator.ConnectionState@5c8eee0f
> [tm] 2019-05-01 13:30:53,384 WARN  
> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - SASL 
> configuration failed: javax.security.auth.login.LoginException: No JAAS 
> configuration section named 'Client' was found in specified JAAS 
> configuration file: '/tmp/jaas-3674237213070587877.conf'. Will continue 
> connection to Zookeeper server without SASL authentication, if Zookeeper 
> server allows it.
> [tm] 2019-05-01 13:30:53,395 INFO  
> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Opening 
> socket connection to server 
> analytics-zetcd.default.svc.cluster.local/10.108.52.97:2181
> [tm] 2019-05-01 13:30:53,395 INFO  
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner   - Using 
> configured hostname/address for TaskManager: 10.1.2.173.
> [tm] 2019-05-01 13:30:53,401 ERROR 
> org.apache.flink.shaded.curator.org.apache.curator.ConnectionState  - 
> Authentication failed
> [tm] 2019-05-01 13:30:53,418 INFO  
> org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils - Trying to 
> start actor system at 10.1.2.173:0
> [tm] 2019-05-01 13:30:53,420 INFO  
> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Socket 
> connection established to 
> analytics-zetcd.default.svc.cluster.local/10.108.52.97:2181, initiating 
> session
> [tm] 2019-05-01 13:30:53,500 WARN  
> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxnSocket  - 
> Connected to an old server; r-o mode will be unavailable
> [tm] 2019-05-01 13:30:53,500 INFO  
> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Session 
> establishment complete on server 
> analytics-zetcd.default.svc.cluster.local/10.108.52.97:2181, sessionid = 
> 0xbf06a739001d446, negotiated timeout = 6
> [tm] 2019-05-01 13:30:53,525 INFO  
> org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager
>   - State change: CONNECTED{code}
> Repro:
> Start an etcd-cluster, with e.g. etcd-operator, with three members. Start 
> zetcd in front. Configure the sesssion cluster to go against zetcd.
> Ensure the job can start successfully.
> Now, kill the etcd pods one by one, letting the quorum re-establish in 
> between, so that the cluster is still OK.
> Now restart the job/tm pods. You'll end up in this no-mans-land.
>  
> ---
> Workaround: clean out the etcd cluster and remove all its data, however, this 
> resets all time windows and state, despite having that saved in GCS, so it's 
> a crappy workaround.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-12326) Add a basic test framework, just like the existing Java TableAPI, abstract some TestBase.

2019-05-07 Thread sunjincheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12326?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sunjincheng closed FLINK-12326.
---
   Resolution: Fixed
Fix Version/s: 1.9.0

Fixed in master: 84eec21108f2c05fa872c9a3735457d73f75dc51

> Add a basic test framework, just like the existing Java TableAPI, abstract 
> some TestBase.
> -
>
> Key: FLINK-12326
> URL: https://issues.apache.org/jira/browse/FLINK-12326
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Python
>Affects Versions: 1.9.0
>Reporter: sunjincheng
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Add a basic test framework, just like the existing Java/Scala TableAPI, 
> abstract some TestBase.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] asfgit closed pull request #8347: [FLINK-12326][python] Add basic test framework for python api

2019-05-07 Thread GitBox
asfgit closed pull request #8347: [FLINK-12326][python] Add basic test 
framework for python api
URL: https://github.com/apache/flink/pull/8347
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on issue #8347: [FLINK-12326][python] Add basic test framework for python api

2019-05-07 Thread GitBox
sunjincheng121 commented on issue #8347: [FLINK-12326][python] Add basic test 
framework for python api
URL: https://github.com/apache/flink/pull/8347#issuecomment-490351312
 
 
   Thanks for the quick update! @dianfu 
   +1 to merged.
   @flinkbot approve all


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dianfu commented on issue #8347: [FLINK-12326][python] Add basic test framework for python api

2019-05-07 Thread GitBox
dianfu commented on issue #8347: [FLINK-12326][python] Add basic test framework 
for python api
URL: https://github.com/apache/flink/pull/8347#issuecomment-490350804
 
 
   @sunjincheng121 Thanks a lot for the review. The latest comments make sense. 
Have updated the PR.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dianfu commented on a change in pull request #8347: [FLINK-12326][python] Add basic test framework for python api

2019-05-07 Thread GitBox
dianfu commented on a change in pull request #8347: [FLINK-12326][python] Add 
basic test framework for python api
URL: https://github.com/apache/flink/pull/8347#discussion_r281915661
 
 

 ##
 File path: flink-python/pyflink/table/tests/test_table.py
 ##
 @@ -0,0 +1,70 @@
+# 
###
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import os
+
+from pyflink.table.table_source import CsvTableSource
+from pyflink.table.types import DataTypes
+from pyflink.testing import source_sink_utils
+from pyflink.testing.test_case_utils import PyFlinkStreamTableTestCase
+
+
+class TableTests(PyFlinkStreamTableTestCase):
 
 Review comment:
   Make sense. +1


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on issue #8242: [FLINK-6227][network] Introduce the DataConsumptionException for downstream task failure

2019-05-07 Thread GitBox
zhijiangW commented on issue #8242: [FLINK-6227][network] Introduce the 
DataConsumptionException for downstream task failure
URL: https://github.com/apache/flink/pull/8242#issuecomment-490348518
 
 
   Yes, I think we are on the same page now.
   
   I would focus on `b, c` in this PR, and launch separate PRs for the other 
cases future. Especially for `DataConnectionException` we might add the retry 
mechanism if possible during connecting the server and could throw 
`PartitionNotFoundException` or `DataConnectionException` after retry fails.
   
   I already updated the codes in two aspects:
   
   -  Not transform the received `PartitionNotFoundException` on consumer side.
   
   - Send the `PartitionNotFoundException` on producer side in the process of 
creating reader view. If the `ResultPartition` is not removed from 
`ResultPartitionManager`, only the `SpillableSubpartition#createView` might 
throw `IOException` which could indicate the required data file not open 
correctly. So I wrap the `PartitionNotFoundException` in upper layer which 
seems unified for all the `ResultSubpartition` instances besides the  new 
`BoundedBlockingSubpartition` Stephan proposed.
   
   After you confirm this way is correct, then I would add new unit tests for 
covering these cases. :) 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW edited a comment on issue #8242: [FLINK-6227][network] Introduce the DataConsumptionException for downstream task failure

2019-05-07 Thread GitBox
zhijiangW edited a comment on issue #8242: [FLINK-6227][network] Introduce the 
DataConsumptionException for downstream task failure
URL: https://github.com/apache/flink/pull/8242#issuecomment-490348518
 
 
   Yes, I think we are on the same page now.
   
   I would focus on `b, c` in this PR, and launch separate PRs for the other 
cases future. Especially for `DataConnectionException` we might add the retry 
mechanism if possible during connecting the server and could throw 
`PartitionNotFoundException` or `DataConnectionException` after retry fails.
   
   I already updated the codes in two aspects:
   
   -  Not transform the received `PartitionNotFoundException` on consumer side.
   
   - Send the `PartitionNotFoundException` on producer side in the process of 
creating reader view. If the `ResultPartition` is not removed from 
`ResultPartitionManager`, only the `SpillableSubpartition#createView` might 
throw `IOException` which could indicate the required data file not open 
correctly. So I wrap the `PartitionNotFoundException` in upper layer which 
seems unified for all the `ResultSubpartition` instances besides the  new 
`BoundedBlockingSubpartition` stephan proposed.
   
   After you confirm this way is correct, then I would add new unit tests for 
covering these cases. :) 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Assigned] (FLINK-12440) Add all connector support align Java Table API

2019-05-07 Thread Zhenqiu Huang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12440?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhenqiu Huang reassigned FLINK-12440:
-

Assignee: sunjincheng  (was: Zhenqiu Huang)

> Add all connector support align Java Table API
> --
>
> Key: FLINK-12440
> URL: https://issues.apache.org/jira/browse/FLINK-12440
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Python
>Affects Versions: 1.9.0
>Reporter: sunjincheng
>Assignee: sunjincheng
>Priority: Major
>
> Add all connector support align Java Table API. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-12440) Add all connector support align Java Table API

2019-05-07 Thread Zhenqiu Huang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12440?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhenqiu Huang reassigned FLINK-12440:
-

Assignee: Zhenqiu Huang

> Add all connector support align Java Table API
> --
>
> Key: FLINK-12440
> URL: https://issues.apache.org/jira/browse/FLINK-12440
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Python
>Affects Versions: 1.9.0
>Reporter: sunjincheng
>Assignee: Zhenqiu Huang
>Priority: Major
>
> Add all connector support align Java Table API. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-12441) Add column stats for decimal type

2019-05-07 Thread Xuefu Zhang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12441?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xuefu Zhang reassigned FLINK-12441:
---

Assignee: (was: Xuefu Zhang)

> Add column stats for decimal type
> -
>
> Key: FLINK-12441
> URL: https://issues.apache.org/jira/browse/FLINK-12441
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Xuefu Zhang
>Priority: Major
>
> As a followup for FLINK-12365, add column stats type for decimal.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12442) Add some more (negative) test cases for FLINK-12365

2019-05-07 Thread Xuefu Zhang (JIRA)
Xuefu Zhang created FLINK-12442:
---

 Summary: Add some more (negative) test cases for FLINK-12365
 Key: FLINK-12442
 URL: https://issues.apache.org/jira/browse/FLINK-12442
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / API
Reporter: Xuefu Zhang


As a followup for FLINK-12365, add more, especially negative, test cases.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] sunjincheng121 commented on a change in pull request #8347: [FLINK-12326][python] Add basic test framework for python api

2019-05-07 Thread GitBox
sunjincheng121 commented on a change in pull request #8347: 
[FLINK-12326][python] Add basic test framework for python api
URL: https://github.com/apache/flink/pull/8347#discussion_r281897668
 
 

 ##
 File path: flink-python/pyflink/table/tests/test_table.py
 ##
 @@ -0,0 +1,70 @@
+# 
###
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import os
+
+from pyflink.table.table_source import CsvTableSource
+from pyflink.table.types import DataTypes
+from pyflink.testing import source_sink_utils
+from pyflink.testing.test_case_utils import PyFlinkStreamTableTestCase
+
+
+class TableTests(PyFlinkStreamTableTestCase):
 
 Review comment:
   I think we need to divide the test categorization in some way, like:
   The operator classification is as follows:
   - test_calc
   - test_aggregate
   - test_window
   - test_sort
   - test_join
   - test_connector
   So, I suggest rename the file name `test_table`->`test_calc`, What do you 
think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #8347: [FLINK-12326][python] Add basic test framework for python api

2019-05-07 Thread GitBox
sunjincheng121 commented on a change in pull request #8347: 
[FLINK-12326][python] Add basic test framework for python api
URL: https://github.com/apache/flink/pull/8347#discussion_r281897834
 
 

 ##
 File path: flink-python/pyflink/find_flink_home.py
 ##
 @@ -15,7 +15,9 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 

+
 from __future__ import print_function
 
 Review comment:
   remove this import?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-12441) Add column stats for decimal type

2019-05-07 Thread Xuefu Zhang (JIRA)
Xuefu Zhang created FLINK-12441:
---

 Summary: Add column stats for decimal type
 Key: FLINK-12441
 URL: https://issues.apache.org/jira/browse/FLINK-12441
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / API
Reporter: Xuefu Zhang
Assignee: Xuefu Zhang


As a followup for FLINK-12365, add column stats type for decimal.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] JingsongLi closed pull request #8015: [FLINK-11974][streaming] Introduce StreamOperatorSubstitutor to help table perform the whole Operator CodeGen

2019-05-07 Thread GitBox
JingsongLi closed pull request #8015: [FLINK-11974][streaming] Introduce 
StreamOperatorSubstitutor to help table perform the whole Operator CodeGen
URL: https://github.com/apache/flink/pull/8015
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-10979) Add support for group keys in Unbounded Aggregate/FlatAggregate operator

2019-05-07 Thread Hequn Cheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10979?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hequn Cheng closed FLINK-10979.
---
Resolution: Fixed

> Add support for group keys in Unbounded Aggregate/FlatAggregate operator
> 
>
> Key: FLINK-10979
> URL: https://issues.apache.org/jira/browse/FLINK-10979
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: sunjincheng
>Assignee: Wei Zhong
>Priority: Major
>
> Add support for group keys in Aggregate/FlatAggregate operator, the detail 
> will be described in [Google 
> doc|https://docs.google.com/document/d/1tnpxg31EQz2-MEzSotwFzqatsB4rNLz0I-l_vPa5H4Q/edit#heading=h.q23rny2iglsr].



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10979) Add support for group keys in Unbounded Aggregate/FlatAggregate operator

2019-05-07 Thread Hequn Cheng (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10979?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16835283#comment-16835283
 ] 

Hequn Cheng commented on FLINK-10979:
-

Will be implemented directly by flatAggregate/aggregate.

> Add support for group keys in Unbounded Aggregate/FlatAggregate operator
> 
>
> Key: FLINK-10979
> URL: https://issues.apache.org/jira/browse/FLINK-10979
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: sunjincheng
>Assignee: Wei Zhong
>Priority: Major
>
> Add support for group keys in Aggregate/FlatAggregate operator, the detail 
> will be described in [Google 
> doc|https://docs.google.com/document/d/1tnpxg31EQz2-MEzSotwFzqatsB4rNLz0I-l_vPa5H4Q/edit#heading=h.q23rny2iglsr].



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] shuai-xu commented on a change in pull request #8296: [FLINK-12228] [runtime] Implement Eager Scheduling Strategy

2019-05-07 Thread GitBox
shuai-xu commented on a change in pull request #8296: [FLINK-12228] [runtime] 
Implement Eager Scheduling Strategy
URL: https://github.com/apache/flink/pull/8296#discussion_r281905258
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingExecutionVertex.java
 ##
 @@ -27,7 +27,7 @@
 /**
  * Scheduling representation of {@link ExecutionVertex}.
  */
-public interface SchedulingVertex {
+public interface SchedulingExecutionVertex {
 
 Review comment:
   Yes, Gray suggested changing it to SchedulingExecutionVertex. I think both 
works. Which do you prefer? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] Aitozi commented on issue #8280: [FLINK-12297]Harden ClosureCleaner to handle the wrapped function

2019-05-07 Thread GitBox
Aitozi commented on issue #8280: [FLINK-12297]Harden ClosureCleaner to handle 
the wrapped function
URL: https://github.com/apache/flink/pull/8280#issuecomment-490336487
 
 
   Thanks @StephanEwen for your valuable suggestion, I will address your 
comments as far as i have time (about at this weekend).


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] shuai-xu commented on a change in pull request #8296: [FLINK-12228] [runtime] Implement Eager Scheduling Strategy

2019-05-07 Thread GitBox
shuai-xu commented on a change in pull request #8296: [FLINK-12228] [runtime] 
Implement Eager Scheduling Strategy
URL: https://github.com/apache/flink/pull/8296#discussion_r281903791
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingTopology.java
 ##
 @@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * A simple scheduling topology for testing purposes.
+ */
+public class TestingSchedulingTopology implements SchedulingTopology {
+
+   private final Map 
schedulingExecutionVertices = new HashMap<>();
+
+   private final Map schedulingResultPartitions = new HashMap<>();
+
+   @Override
+   public Iterable getVertices() {
+   return 
Collections.unmodifiableCollection(schedulingExecutionVertices.values());
+   }
+
+   @Override
+   public Optional getVertex(ExecutionVertexID 
executionVertexId)  {
+   return 
Optional.ofNullable(schedulingExecutionVertices.get(executionVertexId));
+   }
+
+   @Override
+   public Optional getResultPartition(
+   IntermediateResultPartitionID 
intermediateResultPartitionId) {
+   return 
Optional.ofNullable(schedulingResultPartitions.get(intermediateResultPartitionId));
+   }
+
+   public void addSchedulingExecutionVertex(SchedulingExecutionVertex 
schedulingExecutionVertex) {
+   
schedulingExecutionVertices.put(schedulingExecutionVertex.getId(), 
schedulingExecutionVertex);
+   
addSchedulingResultPartitions(schedulingExecutionVertex.getConsumedResultPartitions());
+   
addSchedulingResultPartitions(schedulingExecutionVertex.getProducedResultPartitions());
+   }
+
+   public void addSchedulingResultPartitions(final 
Collection resultPartitions) {
 
 Review comment:
   It maybe used by other schedulingStrategyTest, so I make it public.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #8302: [FLINK-12269][table-blink] Support Temporal Table Join in blink planner and runtime

2019-05-07 Thread GitBox
JingsongLi commented on a change in pull request #8302: 
[FLINK-12269][table-blink] Support Temporal Table Join in blink planner and 
runtime
URL: https://github.com/apache/flink/pull/8302#discussion_r281903520
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/join/lookup/AsyncLookupJoinRunner.java
 ##
 @@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join.lookup;
+
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.async.AsyncFunction;
+import org.apache.flink.streaming.api.functions.async.ResultFuture;
+import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.dataformat.DataFormatConverters;
+import org.apache.flink.table.dataformat.DataFormatConverters.RowConverter;
+import org.apache.flink.table.dataformat.GenericRow;
+import org.apache.flink.table.dataformat.JoinedRow;
+import org.apache.flink.table.generated.GeneratedFunction;
+import org.apache.flink.table.generated.GeneratedResultFuture;
+import org.apache.flink.table.runtime.collector.TableFunctionResultFuture;
+import org.apache.flink.table.typeutils.BaseRowTypeInfo;
+import org.apache.flink.types.Row;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * The async join runner to lookup the dimension table.
+ */
+public class AsyncLookupJoinRunner extends RichAsyncFunction 
{
+   private static final long serialVersionUID = -6664660022391632480L;
+
+   private final GeneratedFunction> 
generatedFetcher;
+   private final GeneratedResultFuture> 
generatedResultFuture;
+   private final boolean isLeftOuterJoin;
+   private final int asyncBufferCapacity;
+   private final TypeInformation fetcherReturnType;
+   private final BaseRowTypeInfo rightRowTypeInfo;
+
+   private transient AsyncFunction fetcher;
+
+   /**
+* Buffers {@link ResultFuture} to avoid newInstance cost when 
processing elements every time.
+* We use {@link BlockingQueue} to make sure the head {@link 
ResultFuture}s are available.
+*/
+   private transient BlockingQueue 
resultFutureBuffer;
+
+   public AsyncLookupJoinRunner(
+   GeneratedFunction> 
generatedFetcher,
+   
GeneratedResultFuture> generatedResultFuture,
+   TypeInformation fetcherReturnType,
+   BaseRowTypeInfo rightRowTypeInfo,
+   boolean isLeftOuterJoin,
+   int asyncBufferCapacity) {
+   this.generatedFetcher = generatedFetcher;
+   this.generatedResultFuture = generatedResultFuture;
+   this.isLeftOuterJoin = isLeftOuterJoin;
+   this.asyncBufferCapacity = asyncBufferCapacity;
+   this.fetcherReturnType = fetcherReturnType;
+   this.rightRowTypeInfo = rightRowTypeInfo;
+   }
+
+   @Override
+   public void open(Configuration parameters) throws Exception {
+   super.open(parameters);
+   this.fetcher = 
generatedFetcher.newInstance(getRuntimeContext().getUserCodeClassLoader());
+   FunctionUtils.setFunctionRuntimeContext(fetcher, 
getRuntimeContext());
+   FunctionUtils.openFunction(fetcher, parameters);
+
+   // try to compile the generated ResultFuture, fail fast if the 
code is corrupt.
+   
generatedResultFuture.compile(getRuntimeContext().getUserCodeClassLoader());
+
+   // row converter is stateless which is thread-safe
+   RowConverter rowConverter;
+   if (fetcherReturnType instanceof RowTypeInfo) {
+   

[GitHub] [flink] JingsongLi commented on a change in pull request #8302: [FLINK-12269][table-blink] Support Temporal Table Join in blink planner and runtime

2019-05-07 Thread GitBox
JingsongLi commented on a change in pull request #8302: 
[FLINK-12269][table-blink] Support Temporal Table Join in blink planner and 
runtime
URL: https://github.com/apache/flink/pull/8302#discussion_r281903477
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/join/lookup/AsyncLookupJoinRunner.java
 ##
 @@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.join.lookup;
+
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.async.AsyncFunction;
+import org.apache.flink.streaming.api.functions.async.ResultFuture;
+import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.dataformat.DataFormatConverters;
+import org.apache.flink.table.dataformat.DataFormatConverters.RowConverter;
+import org.apache.flink.table.dataformat.GenericRow;
+import org.apache.flink.table.dataformat.JoinedRow;
+import org.apache.flink.table.generated.GeneratedFunction;
+import org.apache.flink.table.generated.GeneratedResultFuture;
+import org.apache.flink.table.runtime.collector.TableFunctionResultFuture;
+import org.apache.flink.table.typeutils.BaseRowTypeInfo;
+import org.apache.flink.types.Row;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * The async join runner to lookup the dimension table.
+ */
+public class AsyncLookupJoinRunner extends RichAsyncFunction 
{
+   private static final long serialVersionUID = -6664660022391632480L;
+
+   private final GeneratedFunction> 
generatedFetcher;
+   private final GeneratedResultFuture> 
generatedResultFuture;
+   private final boolean isLeftOuterJoin;
+   private final int asyncBufferCapacity;
+   private final TypeInformation fetcherReturnType;
+   private final BaseRowTypeInfo rightRowTypeInfo;
+
+   private transient AsyncFunction fetcher;
+
+   /**
+* Buffers {@link ResultFuture} to avoid newInstance cost when 
processing elements every time.
+* We use {@link BlockingQueue} to make sure the head {@link 
ResultFuture}s are available.
+*/
+   private transient BlockingQueue 
resultFutureBuffer;
+
+   public AsyncLookupJoinRunner(
+   GeneratedFunction> 
generatedFetcher,
+   
GeneratedResultFuture> generatedResultFuture,
+   TypeInformation fetcherReturnType,
+   BaseRowTypeInfo rightRowTypeInfo,
+   boolean isLeftOuterJoin,
+   int asyncBufferCapacity) {
+   this.generatedFetcher = generatedFetcher;
+   this.generatedResultFuture = generatedResultFuture;
+   this.isLeftOuterJoin = isLeftOuterJoin;
+   this.asyncBufferCapacity = asyncBufferCapacity;
+   this.fetcherReturnType = fetcherReturnType;
+   this.rightRowTypeInfo = rightRowTypeInfo;
+   }
+
+   @Override
+   public void open(Configuration parameters) throws Exception {
+   super.open(parameters);
+   this.fetcher = 
generatedFetcher.newInstance(getRuntimeContext().getUserCodeClassLoader());
+   FunctionUtils.setFunctionRuntimeContext(fetcher, 
getRuntimeContext());
+   FunctionUtils.openFunction(fetcher, parameters);
+
+   // try to compile the generated ResultFuture, fail fast if the 
code is corrupt.
+   
generatedResultFuture.compile(getRuntimeContext().getUserCodeClassLoader());
+
+   // row converter is stateless which is thread-safe
+   RowConverter rowConverter;
+   if (fetcherReturnType instanceof RowTypeInfo) {
+   

[GitHub] [flink] flinkbot commented on issue #8366: [FLINK-12415][doc-zh]Translate HistoryServer page into Chinese

2019-05-07 Thread GitBox
flinkbot commented on issue #8366: [FLINK-12415][doc-zh]Translate HistoryServer 
page into Chinese
URL: https://github.com/apache/flink/pull/8366#issuecomment-490334809
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-12415) Translate "History Server" page into Chinese

2019-05-07 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12415?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-12415:
---
Labels: pull-request-available  (was: )

> Translate "History Server" page into Chinese
> 
>
> Key: FLINK-12415
> URL: https://issues.apache.org/jira/browse/FLINK-12415
> Project: Flink
>  Issue Type: Sub-task
>  Components: chinese-translation, Documentation
>Reporter: Armstrong Nova
>Assignee: Armstrong Nova
>Priority: Major
>  Labels: pull-request-available
>
> Translate 
> "[https://ci.apache.org/projects/flink/flink-docs-master/monitoring/historyserver.html]";
>  page into Chinese.
> This doc located in "flink/docs/monitoring/historyserver.zh.md"



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] Armstrongya opened a new pull request #8366: [FLINK-12415][doc-zh]Translate HistoryServer page into Chinese

2019-05-07 Thread GitBox
Armstrongya opened a new pull request #8366: [FLINK-12415][doc-zh]Translate 
HistoryServer page into Chinese
URL: https://github.com/apache/flink/pull/8366
 
 
   
   
   ## What is the purpose of the change
   
   This change mainly translate "History Server" page into Chinese.
   
   
   ## Brief change log
   
 - *Translate History Server page into Chinese*
   
   
   ## Verifying this change
   
- *this change is to add a new translated document.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / **no**)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on a change in pull request #8302: [FLINK-12269][table-blink] Support Temporal Table Join in blink planner and runtime

2019-05-07 Thread GitBox
KurtYoung commented on a change in pull request #8302: 
[FLINK-12269][table-blink] Support Temporal Table Join in blink planner and 
runtime
URL: https://github.com/apache/flink/pull/8302#discussion_r281893667
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/common/CommonLookupJoin.scala
 ##
 @@ -0,0 +1,731 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.nodes.common
+
+import com.google.common.primitives.Primitives
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeField}
+import org.apache.calcite.rel.core.{JoinInfo, JoinRelType}
+import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
+import org.apache.calcite.rex._
+import org.apache.calcite.sql.SqlKind
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.calcite.sql.validate.SqlValidatorUtil
+import org.apache.calcite.tools.RelBuilder
+import org.apache.calcite.util.mapping.IntPair
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.{RowTypeInfo, TypeExtractor}
+import org.apache.flink.streaming.api.datastream.AsyncDataStream.OutputMode
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.functions.async.ResultFuture
+import org.apache.flink.streaming.api.operators.ProcessOperator
+import org.apache.flink.streaming.api.operators.async.AsyncWaitOperator
+import org.apache.flink.streaming.api.transformations.{OneInputTransformation, 
StreamTransformation}
+import org.apache.flink.table.`type`._
+import org.apache.flink.table.api.{TableConfig, TableException, TableSchema}
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.codegen.LookupJoinCodeGenerator._
+import org.apache.flink.table.codegen.{CodeGeneratorContext, 
LookupJoinCodeGenerator}
+import org.apache.flink.table.dataformat.BaseRow
+import 
org.apache.flink.table.functions.utils.UserDefinedFunctionUtils.{getParamClassesConsiderVarArgs,
 getUserDefinedMethod, signatureToString, signaturesToString}
+import org.apache.flink.table.functions.{AsyncTableFunction, TableFunction, 
UserDefinedFunction}
+import org.apache.flink.table.plan.nodes.FlinkRelNode
+import org.apache.flink.table.plan.schema.TimeIndicatorRelDataType
+import org.apache.flink.table.plan.util.{JoinTypeUtil, RelExplainUtil}
+import org.apache.flink.table.plan.util.LookupJoinUtil._
+import org.apache.flink.table.runtime.join.lookup.{AsyncLookupJoinRunner, 
LookupJoinRunner, AsyncLookupJoinWithCalcRunner, LookupJoinWithCalcRunner}
+import org.apache.flink.table.sources.TableIndex.IndexType
+import org.apache.flink.table.sources.{LookupConfig, LookupableTableSource, 
TableIndex, TableSource}
+import org.apache.flink.table.typeutils.BaseRowTypeInfo
+import org.apache.flink.types.Row
+
+import java.util.Collections
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+/**
+  * Common abstract RelNode for temporal table join which shares most methods.
+  * @param input  input rel node
+  * @param tableSource  the table source to be temporal joined
+  * @param tableRowType  the row type of the table source
+  * @param calcOnTemporalTable  the calc (projection&filter) after table scan 
before joining
+  */
+abstract class CommonLookupJoin(
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+input: RelNode,
+val tableSource: TableSource[_],
+tableRowType: RelDataType,
+val calcOnTemporalTable: Option[RexProgram],
+val joinInfo: JoinInfo,
+val joinType: JoinRelType)
+  extends SingleRel(cluster, traitSet, input)
+  with FlinkRelNode {
+
+  val joinKeyPairs: Array[IntPair] = getTemporalTableJoinKeyPairs(joinInfo, 
calcOnTemporalTable)
+  val indexKeys: Array[TableIndex] = getTableIndexes(tableSource)
+  // all potential index keys, mapping from field index in table source to 
LookupKey
+  val allLookupKeys: Map[Int, LookupKey] = analyzeLookupKeys(
+cluster.getRexBuilder,
+joinKeyPairs,
+indexKeys,
+tableSource.getTableSchema,
+calcOnTemporalTable)

[GitHub] [flink] KurtYoung commented on a change in pull request #8302: [FLINK-12269][table-blink] Support Temporal Table Join in blink planner and runtime

2019-05-07 Thread GitBox
KurtYoung commented on a change in pull request #8302: 
[FLINK-12269][table-blink] Support Temporal Table Join in blink planner and 
runtime
URL: https://github.com/apache/flink/pull/8302#discussion_r281893334
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/sources/LookupableTableSource.java
 ##
 @@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.functions.AsyncTableFunction;
+import org.apache.flink.table.functions.TableFunction;
+
+/**
+ * A {@link TableSource} which supports for lookup accessing via key column(s).
+ * For example, MySQL TableSource can implement this interface to support 
lookup accessing.
+ * When temporal join this MySQL table, the runtime behavior could be in a 
lookup fashion.
+ *
+ * @param  type of the result
+ */
+@PublicEvolving
+public interface LookupableTableSource extends TableSource {
 
 Review comment:
   This interface should also implement `DefinedIndexes` and 
`DefinedPrimaryKey`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on a change in pull request #8302: [FLINK-12269][table-blink] Support Temporal Table Join in blink planner and runtime

2019-05-07 Thread GitBox
KurtYoung commented on a change in pull request #8302: 
[FLINK-12269][table-blink] Support Temporal Table Join in blink planner and 
runtime
URL: https://github.com/apache/flink/pull/8302#discussion_r281898716
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/collector/TableFunctionCollector.java
 ##
 @@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.collector;
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.util.Collector;
+
+/**
+ * The basic implementation of collector for {@link TableFunction}.
+ */
+public abstract class TableFunctionCollector extends AbstractRichFunction 
implements Collector {
+
+   private static final long serialVersionUID = 1L;
+
+   private Object input;
+   private Collector collector;
+   private boolean collected;
+
+   /**
+* Sets the input row from left table,
+* which will be used to cross join with the result of table function.
+*/
+   public void setInput(Object input) {
+   this.input = input;
+   }
+
+   /**
+* Gets the input value from left table,
+* which will be used to cross join with the result of table function.
+*/
+   public Object getInput() {
+   return input;
+   }
+
+   /**
+* Sets the current collector, which used to emit the final row.
+*/
+   public void setCollector(Collector collector) {
+   this.collector = collector;
+   }
+
+   /**
+* Gets the internal collector which used to emit the final row.
+*/
+   public Collector getCollector() {
+   return collector;
+   }
+
+   /**
+* Resets the flag to indicate whether [[collect(T)]] has been called.
+*/
+   public void reset() {
+   this.collected = false;
+   }
+
+   /**
+* Whether {@link #collect(Object)} has been called.
+*
+* @return True if {@link #collect(Object)} has been called.
+*/
+   public boolean isCollected() {
+   return collected;
+   }
+
+   @Override
+   public void collect(T record) {
+   this.collected = true;
 
 Review comment:
   Shouldn't this method call `collector.collect`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on a change in pull request #8302: [FLINK-12269][table-blink] Support Temporal Table Join in blink planner and runtime

2019-05-07 Thread GitBox
KurtYoung commented on a change in pull request #8302: 
[FLINK-12269][table-blink] Support Temporal Table Join in blink planner and 
runtime
URL: https://github.com/apache/flink/pull/8302#discussion_r281898810
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/collector/TableFunctionResultFuture.java
 ##
 @@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.collector;
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.streaming.api.functions.async.ResultFuture;
+
+/**
+ * The basic implementation of collector for {@link ResultFuture} in table 
joining.
+ */
+public abstract class TableFunctionResultFuture extends 
AbstractRichFunction implements ResultFuture {
+
+   private static final long serialVersionUID = 1L;
+
+   private Object input;
+   private ResultFuture resultFuture;
+
+   /**
+* Sets the input row from left table,
+* which will be used to cross join with the result of right table.
+*/
+   public void setInput(Object input) {
+   this.input = input;
+   }
+
+   /**
+* Gets the input value from left table,
+* which will be used to cross join with the result of right table.
+*/
+   public Object getInput() {
+   return input;
+   }
+
+   /**
+* Sets the current collector, which used to emit the final row.
+*/
+   public void setCollector(ResultFuture resultFuture) {
 
 Review comment:
   setResultFuture?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on issue #8314: [FLINK-12365][table] Add stats related catalog APIs

2019-05-07 Thread GitBox
xuefuz commented on issue #8314: [FLINK-12365][table] Add stats related catalog 
APIs
URL: https://github.com/apache/flink/pull/8314#issuecomment-490331675
 
 
   Pushed fix for the test. Thanks for the review, @KurtYoung .


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on a change in pull request #8302: [FLINK-12269][table-blink] Support Temporal Table Join in blink planner and runtime

2019-05-07 Thread GitBox
KurtYoung commented on a change in pull request #8302: 
[FLINK-12269][table-blink] Support Temporal Table Join in blink planner and 
runtime
URL: https://github.com/apache/flink/pull/8302#discussion_r281900158
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/collector/TableFunctionResultFuture.java
 ##
 @@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.collector;
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.streaming.api.functions.async.ResultFuture;
+
+/**
+ * The basic implementation of collector for {@link ResultFuture} in table 
joining.
+ */
+public abstract class TableFunctionResultFuture extends 
AbstractRichFunction implements ResultFuture {
 
 Review comment:
   The responsibility of this class and `TableFunctionCollector` is not clean. 
It looks like they want to do some encapsulation but actually only contain some 
get and set methods. And the interface contract also seems not consistent, see 
the comment i left in `TableFunctionCollector`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on a change in pull request #8302: [FLINK-12269][table-blink] Support Temporal Table Join in blink planner and runtime

2019-05-07 Thread GitBox
KurtYoung commented on a change in pull request #8302: 
[FLINK-12269][table-blink] Support Temporal Table Join in blink planner and 
runtime
URL: https://github.com/apache/flink/pull/8302#discussion_r281899855
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/collector/TableFunctionCollector.java
 ##
 @@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.collector;
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.util.Collector;
+
+/**
+ * The basic implementation of collector for {@link TableFunction}.
+ */
+public abstract class TableFunctionCollector extends AbstractRichFunction 
implements Collector {
+
+   private static final long serialVersionUID = 1L;
+
+   private Object input;
+   private Collector collector;
+   private boolean collected;
+
+   /**
+* Sets the input row from left table,
+* which will be used to cross join with the result of table function.
+*/
+   public void setInput(Object input) {
+   this.input = input;
+   }
+
+   /**
+* Gets the input value from left table,
+* which will be used to cross join with the result of table function.
+*/
+   public Object getInput() {
+   return input;
+   }
+
+   /**
+* Sets the current collector, which used to emit the final row.
+*/
+   public void setCollector(Collector collector) {
+   this.collector = collector;
+   }
+
+   /**
+* Gets the internal collector which used to emit the final row.
+*/
+   public Collector getCollector() {
+   return collector;
+   }
+
+   /**
+* Resets the flag to indicate whether [[collect(T)]] has been called.
+*/
+   public void reset() {
+   this.collected = false;
+   }
+
+   /**
+* Whether {@link #collect(Object)} has been called.
+*
+* @return True if {@link #collect(Object)} has been called.
+*/
+   public boolean isCollected() {
+   return collected;
+   }
+
+   @Override
+   public void collect(T record) {
+   this.collected = true;
 
 Review comment:
   and i just found that the sub-class of this class `RowToBaseRowCollector` is 
actually calling `  
getCollector.asInstanceOf[Collector[BaseRow]].collect(result)`, 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on a change in pull request #8302: [FLINK-12269][table-blink] Support Temporal Table Join in blink planner and runtime

2019-05-07 Thread GitBox
KurtYoung commented on a change in pull request #8302: 
[FLINK-12269][table-blink] Support Temporal Table Join in blink planner and 
runtime
URL: https://github.com/apache/flink/pull/8302#discussion_r281898829
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/collector/TableFunctionResultFuture.java
 ##
 @@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.collector;
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.streaming.api.functions.async.ResultFuture;
+
+/**
+ * The basic implementation of collector for {@link ResultFuture} in table 
joining.
+ */
+public abstract class TableFunctionResultFuture extends 
AbstractRichFunction implements ResultFuture {
+
+   private static final long serialVersionUID = 1L;
+
+   private Object input;
+   private ResultFuture resultFuture;
+
+   /**
+* Sets the input row from left table,
+* which will be used to cross join with the result of right table.
+*/
+   public void setInput(Object input) {
+   this.input = input;
+   }
+
+   /**
+* Gets the input value from left table,
+* which will be used to cross join with the result of right table.
+*/
+   public Object getInput() {
+   return input;
+   }
+
+   /**
+* Sets the current collector, which used to emit the final row.
+*/
+   public void setCollector(ResultFuture resultFuture) {
+   this.resultFuture = resultFuture;
+   }
+
+   /**
+* Gets the internal collector which used to emit the final row.
+*/
+   public ResultFuture getCollector() {
 
 Review comment:
   getResultFuture?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-12406) Report BLOCKING_PERSISTENT result partition meta back to client

2019-05-07 Thread Ruidong Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12406?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ruidong Li updated FLINK-12406:
---
Component/s: API / DataSet

> Report BLOCKING_PERSISTENT result partition meta back to client
> ---
>
> Key: FLINK-12406
> URL: https://issues.apache.org/jira/browse/FLINK-12406
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / DataSet, Runtime / Coordination
>Reporter: Ruidong Li
>Assignee: Ruidong Li
>Priority: Major
>
> After each job finishes, the new {{BLOCKING_PERSISTENT}} result partitions 
> are generated, and locations of these result partitions should be report back 
> to client via {{JobExecutionResult}}, they will be later used for Table 
> {{cache()}} and {{invalidateCache()}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12440) Add all connector support align Java Table API

2019-05-07 Thread sunjincheng (JIRA)
sunjincheng created FLINK-12440:
---

 Summary: Add all connector support align Java Table API
 Key: FLINK-12440
 URL: https://issues.apache.org/jira/browse/FLINK-12440
 Project: Flink
  Issue Type: Sub-task
  Components: API / Python
Affects Versions: 1.9.0
Reporter: sunjincheng


Add all connector support align Java Table API. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12439) Add FileSystem Connector with CSV format support in Python Table API

2019-05-07 Thread sunjincheng (JIRA)
sunjincheng created FLINK-12439:
---

 Summary: Add FileSystem Connector with CSV format support in 
Python Table API
 Key: FLINK-12439
 URL: https://issues.apache.org/jira/browse/FLINK-12439
 Project: Flink
  Issue Type: Sub-task
  Components: API / Python
Affects Versions: 1.9.0
Reporter: sunjincheng


This is the first PR about the connector. In this PR, you need to add the 
descriptor interface and add the FileSystem connector and CSV format to verify 
the integrity of all interfaces. So the JIRA needs to do the following work.

1. Add all of the existing descriptor interfaces align Java Table API

2. Add FileSystem connector and CSV format support

3. Add test case and verify all test cases by run `dev/lint-python.sh`. 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] JingsongLi commented on a change in pull request #8295: [FLINK-11974][runtime] Introduce StreamOperatorFactory

2019-05-07 Thread GitBox
JingsongLi commented on a change in pull request #8295: [FLINK-11974][runtime] 
Introduce StreamOperatorFactory
URL: https://github.com/apache/flink/pull/8295#discussion_r281891845
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorFactory.java
 ##
 @@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+
+import java.io.Serializable;
+
+/**
+ * A factory to create {@link StreamOperator}.
+ *
+ * @param  The output type of the operator
+ */
 
 Review comment:
   OK done


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on issue #8314: [FLINK-12365][table] Add stats related catalog APIs

2019-05-07 Thread GitBox
KurtYoung commented on issue #8314: [FLINK-12365][table] Add stats related 
catalog APIs
URL: https://github.com/apache/flink/pull/8314#issuecomment-490319734
 
 
   Changes LGTM, please fix the test failure:
   21:22:05.992 [ERROR] 
testStatistics(org.apache.flink.table.catalog.GenericInMemoryCatalogTest)  Time 
elapsed: 0.011 s  <<< FAILURE!
   java.lang.AssertionError: Use assertEquals(expected, actual, delta) to 
compare floating-point numbers
at 
org.apache.flink.table.catalog.GenericInMemoryCatalogTest.testStatistics(GenericInMemoryCatalogTest.java:604)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-12438) Translate "Task Lifecycle" page into Chinese

2019-05-07 Thread Armstrong Nova (JIRA)
Armstrong Nova created FLINK-12438:
--

 Summary: Translate "Task Lifecycle" page into Chinese
 Key: FLINK-12438
 URL: https://issues.apache.org/jira/browse/FLINK-12438
 Project: Flink
  Issue Type: Sub-task
  Components: chinese-translation, Documentation
Reporter: Armstrong Nova
Assignee: Armstrong Nova


Translate the internal page 
"[https://ci.apache.org/projects/flink/flink-docs-master/internals/task_lifecycle.html]";
 into Chinese.

 

The doc located in "flink/docs/internals/task_lifecycle.zh.md"



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-12435) UT Failures: GroupWindowTest.testExpressionOnWindowHavingFunction and GroupWindowTest.testWindowEndOnly

2019-05-07 Thread Kurt Young (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12435?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young closed FLINK-12435.
--
Resolution: Invalid

> UT Failures: GroupWindowTest.testExpressionOnWindowHavingFunction and 
> GroupWindowTest.testWindowEndOnly
> ---
>
> Key: FLINK-12435
> URL: https://issues.apache.org/jira/browse/FLINK-12435
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.9.0
>Reporter: Bowen Li
>Assignee: Kurt Young
>Priority: Major
> Fix For: 1.9.0
>
>
> https://travis-ci.org/apache/flink/jobs/529155380
> {code:java}
> 17:44:05.285 [INFO] 
> 17:44:05.285 [INFO] Results:
> 17:44:05.285 [INFO] 
> 17:44:05.285 [ERROR] Failures: 
> 17:44:05.285 [ERROR]   
> GroupWindowTest.testExpressionOnWindowHavingFunction:175 planAfter 
> expected:<...rt, w$end, w$rowtime[, w$proctime]], select=[COUNT(*) ...> but 
> was:<...rt, w$end, w$rowtime[]], select=[COUNT(*) ...>
> 17:44:05.285 [ERROR]   GroupWindowTest.testWindowEndOnly:157 planAfter 
> expected:<...rt, w$end, w$rowtime[, w$proctime]], select=[c])
>+-...> but was:<...rt, w$end, w$rowtime[]], select=[c])
>+-...>
> 17:44:05.285 [INFO] 
> {code}
> [~ykt836] please help to triage this JIRA to proper assignee



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12435) UT Failures: GroupWindowTest.testExpressionOnWindowHavingFunction and GroupWindowTest.testWindowEndOnly

2019-05-07 Thread Kurt Young (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12435?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16835242#comment-16835242
 ] 

Kurt Young commented on FLINK-12435:


This class doesn't exist anymore

> UT Failures: GroupWindowTest.testExpressionOnWindowHavingFunction and 
> GroupWindowTest.testWindowEndOnly
> ---
>
> Key: FLINK-12435
> URL: https://issues.apache.org/jira/browse/FLINK-12435
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.9.0
>Reporter: Bowen Li
>Assignee: Kurt Young
>Priority: Major
> Fix For: 1.9.0
>
>
> https://travis-ci.org/apache/flink/jobs/529155380
> {code:java}
> 17:44:05.285 [INFO] 
> 17:44:05.285 [INFO] Results:
> 17:44:05.285 [INFO] 
> 17:44:05.285 [ERROR] Failures: 
> 17:44:05.285 [ERROR]   
> GroupWindowTest.testExpressionOnWindowHavingFunction:175 planAfter 
> expected:<...rt, w$end, w$rowtime[, w$proctime]], select=[COUNT(*) ...> but 
> was:<...rt, w$end, w$rowtime[]], select=[COUNT(*) ...>
> 17:44:05.285 [ERROR]   GroupWindowTest.testWindowEndOnly:157 planAfter 
> expected:<...rt, w$end, w$rowtime[, w$proctime]], select=[c])
>+-...> but was:<...rt, w$end, w$rowtime[]], select=[c])
>+-...>
> 17:44:05.285 [INFO] 
> {code}
> [~ykt836] please help to triage this JIRA to proper assignee



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] leesf edited a comment on issue #8226: [FLINK-12181][Tests] Port ExecutionGraphRestartTest to new codebase

2019-05-07 Thread GitBox
leesf edited a comment on issue #8226: [FLINK-12181][Tests] Port 
ExecutionGraphRestartTest to new codebase
URL: https://github.com/apache/flink/pull/8226#issuecomment-490022810
 
 
   @azagrebin Thanks for you review and sorry for lately reply. Updated the PR 
and address your comments,  would you please review this pr in your free time? 
cc @tillrohrmann 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Assigned] (FLINK-12327) Add simplicity support for submitting Python Table API job in CliFrontend, i.e. `flink run -py wordcount.py` can be work(with simple test).

2019-05-07 Thread Huang Xingbo (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12327?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Huang Xingbo reassigned FLINK-12327:


Assignee: Huang Xingbo

> Add simplicity support for submitting Python Table API job in CliFrontend, 
> i.e. `flink run -py wordcount.py` can be work(with simple test).
> ---
>
> Key: FLINK-12327
> URL: https://issues.apache.org/jira/browse/FLINK-12327
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Python
>Affects Versions: 1.9.0
>Reporter: sunjincheng
>Assignee: Huang Xingbo
>Priority: Major
>
> Add simplicity support for submitting Python Table API job in CliFrontend, 
> i.e. `flink run -py wordcount.py` can be work(with simple test).   
> Support for submitting Python Table API job in CliFrontend,And using `flink 
> run` submit Python Table API job. The current `flink` command command line 
> syntax is as follows:
> flink  [OPTIONS] [ARGUMENTS]
> On the basis of the current `run` ACTION, we add to Python Table API support, 
> specific OPTIONS are as follows:
> -py --python  
> Python script with the program entry point. We can configure dependent 
> resources with the `--py-files` option.
> * -pyfs --py-files
> Attach custom python files for job. Comma can be used as the separator to 
> specify multiple files. The standard python resource file suffixes such as 
> .py/.egg/.zip all also supported.
> * -pym --py-module   Python module with the program entry 
> point. This option must be used in conjunction with ` --py-files`.
> For more details, please refer to 
> [FLIP-38|https://cwiki.apache.org/confluence/display/FLINK/FLIP-38%3A+Python+Table+API]
> NOTE: In this JIRA we only need to implement the basic options, without fully 
> implementing the parameters related to UDFs in FLIP-38.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] sunjincheng121 commented on a change in pull request #8347: [FLINK-12326][python] Add basic test framework for python api

2019-05-07 Thread GitBox
sunjincheng121 commented on a change in pull request #8347: 
[FLINK-12326][python] Add basic test framework for python api
URL: https://github.com/apache/flink/pull/8347#discussion_r281870466
 
 

 ##
 File path: flink-python/pyflink/testing/source_sink_utils.py
 ##
 @@ -0,0 +1,127 @@
+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import glob
+import os
+import unittest
+from py4j.java_gateway import java_import
+
+from pyflink.find_flink_home import _find_flink_source_root
+from pyflink.java_gateway import get_gateway
+from pyflink.table import TableSink
+
+
+class TestTableSink(TableSink):
+"""
+Base class for test table sink.
+"""
+
+_inited = False
+
+def __init__(self, j_table_sink):
+super(TestTableSink, self).__init__(j_table_sink)
+
+@classmethod
+def _ensure_initialized(cls):
+if TestTableSink._inited:
+return
+
+FLINK_SOURCE_ROOT_DIR = _find_flink_source_root()
+filename_pattern = (
+"flink-table/flink-table-planner/target/"
+"flink-table-planner*-tests.jar")
+if not glob.glob(os.path.join(FLINK_SOURCE_ROOT_DIR, 
filename_pattern)):
+raise unittest.SkipTest(
+"'flink-table-planner*-tests.jar' is not available. Will skip 
the related tests.")
+
+gateway = get_gateway()
+java_import(gateway.jvm, 
"org.apache.flink.table.runtime.stream.table.TestAppendSink")
+java_import(gateway.jvm, 
"org.apache.flink.table.runtime.stream.table.TestRetractSink")
+java_import(gateway.jvm, 
"org.apache.flink.table.runtime.stream.table.TestUpsertSink")
+java_import(gateway.jvm, 
"org.apache.flink.table.runtime.stream.table.RowCollector")
+
+TestTableSink._inited = True
+
+
+class TestAppendSink(TestTableSink):
+"""
+A test append table sink.
+"""
+
+def __init__(self):
+TestTableSink._ensure_initialized()
+
+gateway = get_gateway()
+super(TestAppendSink, self).__init__(gateway.jvm.TestAppendSink())
+
+
+class TestRetractSink(TestTableSink):
+"""
+A test retract table sink.
+"""
+
+def __init__(self):
+TestTableSink._ensure_initialized()
+
+gateway = get_gateway()
+super(TestRetractSink, self).__init__(gateway.jvm.TestRetractSink())
+
+
+class TestUpsertSink(TestTableSink):
+"""
+A test upsert table sink.
+"""
+
+def __init__(self, keys, is_append_only):
+TestTableSink._ensure_initialized()
+
+gateway = get_gateway()
+j_keys = gateway.new_array(gateway.jvm.String, len(keys))
+for i in xrange(0, len(keys)):
+j_keys[i] = keys[i]
+
+super(TestUpsertSink, 
self).__init__(gateway.jvm.TestUpsertSink(j_keys, is_append_only))
+
+
+def results():
+"""
+Retrieves the results from an append table sink.
+"""
+return retract_results()
+
+
+def retract_results():
+"""
+Retrieves the results from a retract table sink.
+"""
+gateway = get_gateway()
+results = gateway.jvm.RowCollector.getAndClearValues()
 
 Review comment:
   Sure!


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on a change in pull request #8347: [FLINK-12326][python] Add basic test framework for python api

2019-05-07 Thread GitBox
sunjincheng121 commented on a change in pull request #8347: 
[FLINK-12326][python] Add basic test framework for python api
URL: https://github.com/apache/flink/pull/8347#discussion_r281870867
 
 

 ##
 File path: flink-python/pyflink/table/tests/test_table.py
 ##
 @@ -0,0 +1,75 @@
+# 
###
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import os
+import tempfile
+
+from pyflink.table.table_source import CsvTableSource
+from pyflink.table.types import DataTypes
+from pyflink.testing import source_sink_utils
+from pyflink.testing.test_case_utils import PyFlinkStreamTableTestCase
+
+
+class TableTests(PyFlinkStreamTableTestCase):
+
+def test_select(self):
+tmp_dir = tempfile.gettempdir()
+source_path = tmp_dir + '/streaming.csv'
+if os.path.isfile(source_path):
+os.remove(source_path)
+with open(source_path, 'w') as f:
+lines = '1,hi,hello\n' + '2,hi,hello\n'
+f.write(lines)
+f.close()
+
+field_names = ["a", "b", "c"]
+field_types = [DataTypes.INT, DataTypes.STRING, DataTypes.STRING]
+
+t_env = self.t_env
+
+# register Orders table in table environment
+t_env.register_table_source(
+"Orders",
+CsvTableSource(source_path, field_names, field_types))
+
+t_env.register_table_sink(
+"Results",
+field_names, field_types, source_sink_utils.TestAppendSink())
+
+t_env.scan("Orders") \
+ .where("a > 0") \
+ .select("a + 1, b, c") \
+ .insert_into("Results")
+
+t_env.execute()
+
+actual = source_sink_utils.results()
+expected = ['2,hi,hello', '3,hi,hello']
+self.assert_equals(actual, expected)
+
+
+if __name__ == '__main__':
+import unittest
 
 Review comment:
   Yes, I found that. May be the Pycharm is better.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-12437) Taskmanager doesn't initiate registration after jobmanager marks it terminated

2019-05-07 Thread Abdul Qadeer (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Abdul Qadeer updated FLINK-12437:
-
Description: 
This issue is observed in Standalone cluster deployment mode with Zookeeper HA 
enabled in Flink 1.4.0. A few taskmanagers restarted due to Out of Metaspace.
 The offending taskmanager `pipelineruntime-taskmgr-6789dd578b-dcp4r` first 
successfully registers with jobmanager, and the remote watcher marks it 
terminated soon after as seen in logs. There were other taskmanagers that were 
terminated around same time but they had been quarantined by jobmanager with 
message similar to:
{noformat}
Association to [akka.tcp://flink@10.60.5.121:8070] having UID [864976677] is 
irrecoverably failed. UID is now quarantined and all messages to this UID will 
be delivered to dead letters. Remote actorsystem must be restarted to recover 
from this situation.
{noformat}
They came back up and successfully registered with jobmanager. This didn't 
happen for the offending taskmanager:
  
 At JobManager:
{noformat}
{"timeMillis":1557073368155,"thread":"flink-akka.actor.default-dispatcher-49","level":"INFO","loggerName":"org.apache.flink.runtime.instance.InstanceManager","message":"Registered
 TaskManager at pipelineruntime-taskmgr-6789dd578b-dcp4r 
(akka.tcp://flink@10.60.5.85:8070/user/taskmanager) as 
ae61ac607f0ab35ab5066f7dc221e654. Current number of registered hosts is 8. 
Current number of alive task slots is 
51.","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":125,"threadPriority":5}
...
...
{"timeMillis":1557073391386,"thread":"flink-akka.actor.default-dispatcher-82","level":"INFO","loggerName":"org.apache.flink.runtime.instance.InstanceManager","message":"Unregistered
 task manager /10.60.5.85. Number of registered task managers 7. Number of 
available slots 
45.","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":159,"threadPriority":5}
...
...
{"timeMillis":1557073391483,"thread":"flink-akka.actor.default-dispatcher-82","level":"INFO","loggerName":"org.apache.flink.runtime.instance.InstanceManager","message":"Unregistered
 task manager /10.60.5.85. Number of registered task managers 6. Number of 
available slots 
39.","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":159,"threadPriority":5}
...
...
{"timeMillis":1557073370389,"thread":"flink-akka.actor.default-dispatcher-35","level":"INFO","loggerName":"akka.actor.LocalActorRef","message":"Message
 [akka.remote.ReliableDeliverySupervisor$Ungate$] from 
Actor[akka://flink/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2Fflink%4010.60.5.85%3A8070-3#1863607260]
 to 
Actor[akka://flink/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2Fflink%4010.60.5.85%3A8070-3#1863607260]
 was not delivered. [22] dead letters encountered. This logging can be turned 
off or adjusted with configuration settings 'akka.log-dead-letters' and 
'akka.log-dead-letters-during-shutdown'.","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":98,"threadPriority":5}
{noformat}
At TaskManager:
{noformat}
{"timeMillis":1557073366068,"thread":"pool-2-thread-1","level":"INFO","loggerName":"org.apache.flink.runtime.taskmanager.TaskManager","message":"Starting
 
TaskManager","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":40,"threadPriority":5}
{"timeMillis":1557073366073,"thread":"pool-2-thread-1","level":"INFO","loggerName":"org.apache.flink.runtime.taskmanager.TaskManager","message":"Starting
 TaskManager actor system at 
10.60.5.85:8070.","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":40,"threadPriority":5}
{"timeMillis":1557073366077,"thread":"pool-2-thread-1","level":"INFO","loggerName":"org.apache.flink.runtime.taskmanager.TaskManager","message":"Trying
 to start actor system at 
10.60.5.85:8070","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":40,"threadPriority":5}
{"timeMillis":1557073366510,"thread":"flink-akka.actor.default-dispatcher-4","level":"INFO","loggerName":"akka.event.slf4j.Slf4jLogger","message":"Slf4jLogger
 
started","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":51,"threadPriority":5}
{"timeMillis":1557073366694,"thread":"flink-akka.actor.default-dispatcher-4","level":"INFO","loggerName":"akka.remote.Remoting","message":"Starting
 
remoting","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":51,"threadPriority":5}
{"timeMillis":1557073367049,"thread":"flink-akka.actor.default-dispatcher-4","level":"INFO","loggerName":"akka.remote.Remoting","message":"Remoting
 started; listening on addresses 
:[akka.tcp://flink@10.60.5.85:8070]","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":51,"threadPriority":5}
{"timeMillis":155

[jira] [Created] (FLINK-12437) Taskmanager doesn't initiate registration after jobmanager marks it terminated

2019-05-07 Thread Abdul Qadeer (JIRA)
Abdul Qadeer created FLINK-12437:


 Summary: Taskmanager doesn't initiate registration after 
jobmanager marks it terminated
 Key: FLINK-12437
 URL: https://issues.apache.org/jira/browse/FLINK-12437
 Project: Flink
  Issue Type: Bug
Reporter: Abdul Qadeer


This issue is observed in Standalone cluster deployment mode with Zookeeper HA 
enabled in Flink 1.4.0. A few taskmanagers restarted due to Out of Metaspace.
 The offending taskmanager `pipelineruntime-taskmgr-6789dd578b-dcp4r` first 
successfully registers with jobmanager, and the remote watcher marks it 
terminated soon after as seen in logs. There were other taskmanagers that were 
terminated around same time but they had been quarantined by jobmanager with 
message similar to:
{noformat}
Association to [akka.tcp://flink@10.60.5.121:8070] having UID [864976677] is 
irrecoverably failed. UID is now quarantined and all messages to this UID will 
be delivered to dead letters. Remote actorsystem must be restarted to recover 
from this situation.
{noformat}
They came back up and successfully registered with jobmanager. This didn't 
happen for the offending taskmanager:
  
 At JobManager:
{noformat}
{"timeMillis":1557073368155,"thread":"flink-akka.actor.default-dispatcher-49","level":"INFO","loggerName":"org.apache.flink.runtime.instance.InstanceManager","message":"Registered
 TaskManager at pipelineruntime-taskmgr-6789dd578b-dcp4r 
(akka.tcp://flink@10.60.5.85:8070/user/taskmanager) as 
ae61ac607f0ab35ab5066f7dc221e654. Current number of registered hosts is 8. 
Current number of alive task slots is 
51.","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":125,"threadPriority":5}
...
...
{"timeMillis":1557073391386,"thread":"flink-akka.actor.default-dispatcher-82","level":"INFO","loggerName":"org.apache.flink.runtime.instance.InstanceManager","message":"Unregistered
 task manager /10.60.5.85. Number of registered task managers 7. Number of 
available slots 
45.","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":159,"threadPriority":5}
...
...
{"timeMillis":1557073391483,"thread":"flink-akka.actor.default-dispatcher-82","level":"INFO","loggerName":"org.apache.flink.runtime.instance.InstanceManager","message":"Unregistered
 task manager /10.60.5.85. Number of registered task managers 6. Number of 
available slots 
39.","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":159,"threadPriority":5}
...
...
{"timeMillis":1557073370389,"thread":"flink-akka.actor.default-dispatcher-35","level":"INFO","loggerName":"akka.actor.LocalActorRef","message":"Message
 [akka.remote.ReliableDeliverySupervisor$Ungate$] from 
Actor[akka://flink/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2Fflink%4010.60.5.85%3A8070-3#1863607260]
 to 
Actor[akka://flink/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2Fflink%4010.60.5.85%3A8070-3#1863607260]
 was not delivered. [22] dead letters encountered. This logging can be turned 
off or adjusted with configuration settings 'akka.log-dead-letters' and 
'akka.log-dead-letters-during-shutdown'.","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":98,"threadPriority":5}
{noformat}
At TaskManager:
{noformat}
{"timeMillis":1557073366068,"thread":"pool-2-thread-1","level":"INFO","loggerName":"org.apache.flink.runtime.taskmanager.TaskManager","message":"Starting
 
TaskManager","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":40,"threadPriority":5}
{"timeMillis":1557073366073,"thread":"pool-2-thread-1","level":"INFO","loggerName":"org.apache.flink.runtime.taskmanager.TaskManager","message":"Starting
 TaskManager actor system at 
10.60.5.85:8070.","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":40,"threadPriority":5}
{"timeMillis":1557073366077,"thread":"pool-2-thread-1","level":"INFO","loggerName":"org.apache.flink.runtime.taskmanager.TaskManager","message":"Trying
 to start actor system at 
10.60.5.85:8070","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":40,"threadPriority":5}
{"timeMillis":1557073366510,"thread":"flink-akka.actor.default-dispatcher-4","level":"INFO","loggerName":"akka.event.slf4j.Slf4jLogger","message":"Slf4jLogger
 
started","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":51,"threadPriority":5}
{"timeMillis":1557073366694,"thread":"flink-akka.actor.default-dispatcher-4","level":"INFO","loggerName":"akka.remote.Remoting","message":"Starting
 
remoting","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":51,"threadPriority":5}
{"timeMillis":1557073367049,"thread":"flink-akka.actor.default-dispatcher-4","level":"INFO","loggerName":"akka.remote.Remoting","message":"Remoting
 started; listening on addresses 
:[ak

[jira] [Updated] (FLINK-12437) Taskmanager doesn't initiate registration after jobmanager marks it terminated

2019-05-07 Thread Abdul Qadeer (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Abdul Qadeer updated FLINK-12437:
-
Description: 
This issue is observed in Standalone cluster deployment mode with Zookeeper HA 
enabled in Flink 1.4.0. A few taskmanagers restarted due to Out of Metaspace.
 The offending taskmanager `pipelineruntime-taskmgr-6789dd578b-dcp4r` first 
successfully registers with jobmanager, and the remote watcher marks it 
terminated soon after as seen in logs. There were other taskmanagers that were 
terminated around same time but they had been quarantined by jobmanager with 
message similar to:
{noformat}
Association to [akka.tcp://flink@10.60.5.121:8070] having UID [864976677] is 
irrecoverably failed. UID is now quarantined and all messages to this UID will 
be delivered to dead letters. Remote actorsystem must be restarted to recover 
from this situation.
{noformat}
They came back up and successfully registered with jobmanager. This didn't 
happen for the offending taskmanager:
  
 At JobManager:
{noformat}
{"timeMillis":1557073368155,"thread":"flink-akka.actor.default-dispatcher-49","level":"INFO","loggerName":"org.apache.flink.runtime.instance.InstanceManager","message":"Registered
 TaskManager at pipelineruntime-taskmgr-6789dd578b-dcp4r 
(akka.tcp://flink@10.60.5.85:8070/user/taskmanager) as 
ae61ac607f0ab35ab5066f7dc221e654. Current number of registered hosts is 8. 
Current number of alive task slots is 
51.","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":125,"threadPriority":5}
...
...
{"timeMillis":1557073391386,"thread":"flink-akka.actor.default-dispatcher-82","level":"INFO","loggerName":"org.apache.flink.runtime.instance.InstanceManager","message":"Unregistered
 task manager /10.60.5.85. Number of registered task managers 7. Number of 
available slots 
45.","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":159,"threadPriority":5}
...
...
{"timeMillis":1557073391483,"thread":"flink-akka.actor.default-dispatcher-82","level":"INFO","loggerName":"org.apache.flink.runtime.instance.InstanceManager","message":"Unregistered
 task manager /10.60.5.85. Number of registered task managers 6. Number of 
available slots 
39.","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":159,"threadPriority":5}
...
...
{"timeMillis":1557073370389,"thread":"flink-akka.actor.default-dispatcher-35","level":"INFO","loggerName":"akka.actor.LocalActorRef","message":"Message
 [akka.remote.ReliableDeliverySupervisor$Ungate$] from 
Actor[akka://flink/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2Fflink%4010.60.5.85%3A8070-3#1863607260]
 to 
Actor[akka://flink/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2Fflink%4010.60.5.85%3A8070-3#1863607260]
 was not delivered. [22] dead letters encountered. This logging can be turned 
off or adjusted with configuration settings 'akka.log-dead-letters' and 
'akka.log-dead-letters-during-shutdown'.","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":98,"threadPriority":5}
{noformat}
At TaskManager:
{noformat}
{"timeMillis":1557073366068,"thread":"pool-2-thread-1","level":"INFO","loggerName":"org.apache.flink.runtime.taskmanager.TaskManager","message":"Starting
 
TaskManager","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":40,"threadPriority":5}
{"timeMillis":1557073366073,"thread":"pool-2-thread-1","level":"INFO","loggerName":"org.apache.flink.runtime.taskmanager.TaskManager","message":"Starting
 TaskManager actor system at 
10.60.5.85:8070.","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":40,"threadPriority":5}
{"timeMillis":1557073366077,"thread":"pool-2-thread-1","level":"INFO","loggerName":"org.apache.flink.runtime.taskmanager.TaskManager","message":"Trying
 to start actor system at 
10.60.5.85:8070","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":40,"threadPriority":5}
{"timeMillis":1557073366510,"thread":"flink-akka.actor.default-dispatcher-4","level":"INFO","loggerName":"akka.event.slf4j.Slf4jLogger","message":"Slf4jLogger
 
started","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":51,"threadPriority":5}
{"timeMillis":1557073366694,"thread":"flink-akka.actor.default-dispatcher-4","level":"INFO","loggerName":"akka.remote.Remoting","message":"Starting
 
remoting","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":51,"threadPriority":5}
{"timeMillis":1557073367049,"thread":"flink-akka.actor.default-dispatcher-4","level":"INFO","loggerName":"akka.remote.Remoting","message":"Remoting
 started; listening on addresses 
:[akka.tcp://flink@10.60.5.85:8070]","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":51,"threadPriority":5}
{"timeMillis":155

[GitHub] [flink] xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog

2019-05-07 Thread GitBox
xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support 
table related operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8353#discussion_r281861009
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
 ##
 @@ -85,49 +86,39 @@ public void alterDatabase(String name, CatalogDatabase 
newDatabase, boolean igno
 
// -- tables and views--
 
-   @Override
-   public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
-   throws TableNotExistException, CatalogException {
-   throw new UnsupportedOperationException();
-   }
-
-   @Override
-   public void renameTable(ObjectPath tablePath, String newTableName, 
boolean ignoreIfNotExists)
-   throws TableNotExistException, 
TableAlreadyExistException, CatalogException {
-   throw new UnsupportedOperationException();
-   }
-
@Override
public void createTable(ObjectPath tablePath, CatalogBaseTable table, 
boolean ignoreIfExists)
throws TableAlreadyExistException, 
DatabaseNotExistException, CatalogException {
-   throw new UnsupportedOperationException();
-   }
 
-   @Override
-   public void alterTable(ObjectPath tableName, CatalogBaseTable newTable, 
boolean ignoreIfNotExists)
-   throws TableNotExistException, CatalogException {
-   throw new UnsupportedOperationException();
-   }
+   validateHiveCatalogTable(table);
 
-   @Override
-   public List listTables(String databaseName)
-   throws DatabaseNotExistException, CatalogException {
-   throw new UnsupportedOperationException();
+   createHiveTable(
+   tablePath,
+   HiveCatalogUtil.createHiveTable(tablePath, table),
+   ignoreIfExists);
}
 
@Override
-   public List listViews(String databaseName) throws 
DatabaseNotExistException, CatalogException {
-   throw new UnsupportedOperationException();
+   public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, 
boolean ignoreIfNotExists)
+   throws TableNotExistException, CatalogException {
+   validateHiveCatalogTable(newTable);
+
+   super.alterTable(tablePath, newTable, ignoreIfNotExists);
}
 
@Override
-   public CatalogBaseTable getTable(ObjectPath objectPath) throws 
TableNotExistException, CatalogException {
-   throw new UnsupportedOperationException();
+   public CatalogBaseTable getTable(ObjectPath tablePath)
 
 Review comment:
   Similar comment here. I don't see we need getTable() here. getTable() is the 
same for both catalogs, and the only diff is the translation, which can be 
captured in a new API.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog

2019-05-07 Thread GitBox
xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support 
table related operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8353#discussion_r281860485
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
 ##
 @@ -85,49 +86,39 @@ public void alterDatabase(String name, CatalogDatabase 
newDatabase, boolean igno
 
// -- tables and views--
 
-   @Override
-   public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
-   throws TableNotExistException, CatalogException {
-   throw new UnsupportedOperationException();
-   }
-
-   @Override
-   public void renameTable(ObjectPath tablePath, String newTableName, 
boolean ignoreIfNotExists)
-   throws TableNotExistException, 
TableAlreadyExistException, CatalogException {
-   throw new UnsupportedOperationException();
-   }
-
@Override
public void createTable(ObjectPath tablePath, CatalogBaseTable table, 
boolean ignoreIfExists)
throws TableAlreadyExistException, 
DatabaseNotExistException, CatalogException {
-   throw new UnsupportedOperationException();
-   }
 
-   @Override
-   public void alterTable(ObjectPath tableName, CatalogBaseTable newTable, 
boolean ignoreIfNotExists)
-   throws TableNotExistException, CatalogException {
-   throw new UnsupportedOperationException();
-   }
+   validateHiveCatalogTable(table);
 
-   @Override
-   public List listTables(String databaseName)
-   throws DatabaseNotExistException, CatalogException {
-   throw new UnsupportedOperationException();
+   createHiveTable(
+   tablePath,
+   HiveCatalogUtil.createHiveTable(tablePath, table),
+   ignoreIfExists);
}
 
@Override
-   public List listViews(String databaseName) throws 
DatabaseNotExistException, CatalogException {
-   throw new UnsupportedOperationException();
+   public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, 
boolean ignoreIfNotExists)
+   throws TableNotExistException, CatalogException {
+   validateHiveCatalogTable(newTable);
 
 Review comment:
   Similar to createTable(), I don't see we need to provide implementations for 
alterTable() in each subclass because I don't really see much difference. If 
validation is different, then we can define a new interface in the base and 
have each subclass implement it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog

2019-05-07 Thread GitBox
xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support 
table related operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8353#discussion_r281860069
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
 ##
 @@ -85,49 +86,39 @@ public void alterDatabase(String name, CatalogDatabase 
newDatabase, boolean igno
 
// -- tables and views--
 
-   @Override
-   public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
-   throws TableNotExistException, CatalogException {
-   throw new UnsupportedOperationException();
-   }
-
-   @Override
-   public void renameTable(ObjectPath tablePath, String newTableName, 
boolean ignoreIfNotExists)
-   throws TableNotExistException, 
TableAlreadyExistException, CatalogException {
-   throw new UnsupportedOperationException();
-   }
-
@Override
public void createTable(ObjectPath tablePath, CatalogBaseTable table, 
boolean ignoreIfExists)
 
 Review comment:
   I think we can keep the implementation only in HiveCatalogBase and define a 
new API called createHiveTable(). Each subcalss only needs to implement 
createHiveTable() (because they may create in different ways). I don't see why 
we need HiveCatalogUtil and related classes for this.
   
   This might be cleaner.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog

2019-05-07 Thread GitBox
xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support 
table related operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8353#discussion_r281857308
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveCatalogUtil.java
 ##
 @@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.hive.util;
+
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.hive.HiveCatalogDatabase;
+import org.apache.flink.table.catalog.hive.HiveCatalogTable;
+import org.apache.flink.table.catalog.hive.HiveTableConfig;
+
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+
+/**
+ * Utils to convert meta objects between Flink and Hive for HiveCatalog.
+ */
+public class HiveCatalogUtil extends HiveCatalogBaseUtil {
+
+   private HiveCatalogUtil() {
+   }
+
+   // -- Utils --
 
 Review comment:
   Nit: The class name is called "utils", and is this line still necessary?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog

2019-05-07 Thread GitBox
xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support 
table related operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8353#discussion_r281856850
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveTableConfig.java
 ##
 @@ -19,11 +19,15 @@
 package org.apache.flink.table.catalog.hive;
 
 /**
- * Configs for Flink tables stored in Hive metastore.
+ * Configs for tables in Hive metastore.
  */
 public class HiveTableConfig {
 
-   // Description of the Flink table
+   // ---
 
 Review comment:
   Do the three lines (26-28) provide any additional info above the class name 
here?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog

2019-05-07 Thread GitBox
xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support 
table related operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8353#discussion_r281856367
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogTable.java
 ##
 @@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.hive;
+
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.plan.stats.TableStats;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A Hive catalog table implementation.
+ */
+public class HiveCatalogTable implements CatalogTable {
+   // Schema of the table (column names and types)
+   private final TableSchema tableSchema;
+   // Partition keys if this is a partitioned table. It's an empty set if 
the table is not partitioned
+   private final List partitionKeys;
+   // Properties of the table
+   private final Map properties;
+   // Comment of the table
+   private String comment = "This is a hive catalog table.";
+
+   public HiveCatalogTable(
+   TableSchema tableSchema,
+   List partitionKeys,
+   Map properties,
+   String comment) {
+   this.tableSchema = checkNotNull(tableSchema, "tableSchema 
cannot be null");
+   this.partitionKeys = checkNotNull(partitionKeys, "partitionKeys 
cannot be null");
+   this.properties = checkNotNull(properties, "properties cannot 
be null");
+   this.comment = comment;
+   }
+
+   public HiveCatalogTable(
+   TableSchema tableSchema,
+   Map properties,
+   String comment) {
+   this(tableSchema, new ArrayList<>(), properties, comment);
+   }
+
+   @Override
+   public TableStats getStatistics() {
+   return new TableStats(0);
+   }
+
+   @Override
+   public boolean isPartitioned() {
+   return !partitionKeys.isEmpty();
+   }
+
+   @Override
+   public List getPartitionKeys() {
+   return partitionKeys;
+   }
+
+   @Override
+   public Map getProperties() {
+   return properties;
+   }
+
+   @Override
+   public TableSchema getSchema() {
+   return tableSchema;
+   }
+
+   @Override
+   public String getComment() {
+   return comment;
+   }
+
+   @Override
+   public CatalogBaseTable copy() {
+   return new HiveCatalogTable(
+   tableSchema, new ArrayList<>(partitionKeys), new 
HashMap<>(properties), comment);
 
 Review comment:
   Don't we need to copy tableSchema?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-12368) add subtask index to FlinkKafkaConsumerBase logging, which can be very useful when debugging problem

2019-05-07 Thread Steven Zhen Wu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12368?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Steven Zhen Wu closed FLINK-12368.
--
   Resolution: Fixed
Fix Version/s: 1.9.0

Stephan merged the PR

> add subtask index to FlinkKafkaConsumerBase logging, which can be very useful 
> when debugging problem
> 
>
> Key: FLINK-12368
> URL: https://issues.apache.org/jira/browse/FLINK-12368
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Affects Versions: 1.8.0
>Reporter: Steven Zhen Wu
>Assignee: Steven Zhen Wu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog

2019-05-07 Thread GitBox
xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support 
table related operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8353#discussion_r281854488
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java
 ##
 @@ -197,4 +204,140 @@ protected void alterHiveDatabase(String name, Database 
newHiveDatabase, boolean
throw new CatalogException(String.format("Failed to 
alter database %s", name), e);
}
}
+
+   // -- tables --
+
+   protected void createHiveTable(ObjectPath tablePath, Table table, 
boolean ignoreIfExists)
+   throws TableAlreadyExistException, 
DatabaseNotExistException, CatalogException {
+   if (!databaseExists(tablePath.getDatabaseName())) {
+   throw new DatabaseNotExistException(catalogName, 
tablePath.getDatabaseName());
+   } else {
+   try {
+   client.createTable(table);
+   } catch (AlreadyExistsException e) {
+   if (!ignoreIfExists) {
+   throw new 
TableAlreadyExistException(catalogName, tablePath);
+   }
+   } catch (TException e) {
+   throw new 
CatalogException(String.format("Failed to create table %s", 
tablePath.getFullName()), e);
+   }
+   }
+   }
+
+   @Override
+   public void renameTable(ObjectPath tablePath, String newTableName, 
boolean ignoreIfNotExists)
+   throws TableNotExistException, 
TableAlreadyExistException, CatalogException {
+   try {
+   // alter_table() doesn't throw a clear exception when 
target table doesn't exist. Thus, check the table existence explicitly
+   if (tableExists(tablePath)) {
+   ObjectPath newPath = new 
ObjectPath(tablePath.getDatabaseName(), newTableName);
+   // alter_table() doesn't throw a clear 
exception when new table already exists. Thus, check the table existence 
explicitly
+   if (tableExists(newPath)) {
+   throw new 
TableAlreadyExistException(catalogName, newPath);
+   } else {
+   Table table = getHiveTable(tablePath);
+   table.setTableName(newTableName);
+   
client.alter_table(tablePath.getDatabaseName(), tablePath.getObjectName(), 
table);
+   }
+   } else if (!ignoreIfNotExists) {
+   throw new TableNotExistException(catalogName, 
tablePath);
+   }
+   } catch (TException e) {
+   throw new CatalogException(
+   String.format("Failed to rename table %s", 
tablePath.getFullName()), e);
+   }
+   }
+
+   protected Table getHiveTable(ObjectPath tablePath) throws 
TableNotExistException {
+   try {
+   return client.getTable(tablePath.getDatabaseName(), 
tablePath.getObjectName());
+   } catch (NoSuchObjectException e) {
+   throw new TableNotExistException(catalogName, 
tablePath);
+   } catch (TException e) {
+   throw new CatalogException(
+   String.format("Failed to get table %s from Hive 
metastore", tablePath.getFullName()), e);
+   }
+   }
+
+   @Override
+   public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, 
boolean ignoreIfNotExists)
+   throws TableNotExistException, CatalogException {
+   if (!tableExists(tablePath)) {
+   if (!ignoreIfNotExists) {
+   throw new TableNotExistException(catalogName, 
tablePath);
+   }
+   } else {
+   // IMetastoreClient.alter_table() requires the table to 
have a valid location, which it doesn't in this case
+   // Thus we have to translate alterTable() into 
(dropTable() + createTable())
+   dropTable(tablePath, false);
 
 Review comment:
   We also need to note that view and table are sharing this call, so it 
probably doesn't make sense to change a view to a regular table and vise versa.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service,

[GitHub] [flink] xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog

2019-05-07 Thread GitBox
xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support 
table related operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8353#discussion_r281854134
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java
 ##
 @@ -197,4 +204,140 @@ protected void alterHiveDatabase(String name, Database 
newHiveDatabase, boolean
throw new CatalogException(String.format("Failed to 
alter database %s", name), e);
}
}
+
+   // -- tables --
+
+   protected void createHiveTable(ObjectPath tablePath, Table table, 
boolean ignoreIfExists)
+   throws TableAlreadyExistException, 
DatabaseNotExistException, CatalogException {
+   if (!databaseExists(tablePath.getDatabaseName())) {
+   throw new DatabaseNotExistException(catalogName, 
tablePath.getDatabaseName());
+   } else {
+   try {
+   client.createTable(table);
+   } catch (AlreadyExistsException e) {
+   if (!ignoreIfExists) {
+   throw new 
TableAlreadyExistException(catalogName, tablePath);
+   }
+   } catch (TException e) {
+   throw new 
CatalogException(String.format("Failed to create table %s", 
tablePath.getFullName()), e);
+   }
+   }
+   }
+
+   @Override
+   public void renameTable(ObjectPath tablePath, String newTableName, 
boolean ignoreIfNotExists)
+   throws TableNotExistException, 
TableAlreadyExistException, CatalogException {
+   try {
+   // alter_table() doesn't throw a clear exception when 
target table doesn't exist. Thus, check the table existence explicitly
+   if (tableExists(tablePath)) {
+   ObjectPath newPath = new 
ObjectPath(tablePath.getDatabaseName(), newTableName);
+   // alter_table() doesn't throw a clear 
exception when new table already exists. Thus, check the table existence 
explicitly
+   if (tableExists(newPath)) {
+   throw new 
TableAlreadyExistException(catalogName, newPath);
+   } else {
+   Table table = getHiveTable(tablePath);
+   table.setTableName(newTableName);
+   
client.alter_table(tablePath.getDatabaseName(), tablePath.getObjectName(), 
table);
+   }
+   } else if (!ignoreIfNotExists) {
+   throw new TableNotExistException(catalogName, 
tablePath);
+   }
+   } catch (TException e) {
+   throw new CatalogException(
+   String.format("Failed to rename table %s", 
tablePath.getFullName()), e);
+   }
+   }
+
+   protected Table getHiveTable(ObjectPath tablePath) throws 
TableNotExistException {
+   try {
+   return client.getTable(tablePath.getDatabaseName(), 
tablePath.getObjectName());
+   } catch (NoSuchObjectException e) {
+   throw new TableNotExistException(catalogName, 
tablePath);
+   } catch (TException e) {
+   throw new CatalogException(
+   String.format("Failed to get table %s from Hive 
metastore", tablePath.getFullName()), e);
+   }
+   }
+
+   @Override
+   public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, 
boolean ignoreIfNotExists)
+   throws TableNotExistException, CatalogException {
+   if (!tableExists(tablePath)) {
+   if (!ignoreIfNotExists) {
+   throw new TableNotExistException(catalogName, 
tablePath);
+   }
+   } else {
+   // IMetastoreClient.alter_table() requires the table to 
have a valid location, which it doesn't in this case
+   // Thus we have to translate alterTable() into 
(dropTable() + createTable())
+   dropTable(tablePath, false);
 
 Review comment:
   I think we needs to be very careful here. dropTable() might also drop the 
data, but alterTable() really is supposed to change the metadata only.
   
   We should probably solve the location problem rather than taking this 
workaround.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub a

[jira] [Closed] (FLINK-12240) Support view related operations in GenericHiveMetastoreCatalog

2019-05-07 Thread Bowen Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12240?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li closed FLINK-12240.

Resolution: Fixed

merged in 1.9.0: 94c08c5cb934a6aabd6f5a2fbba4af6704f80615

> Support view related operations in GenericHiveMetastoreCatalog
> --
>
> Key: FLINK-12240
> URL: https://issues.apache.org/jira/browse/FLINK-12240
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> Support view related operations in GenericHiveMetastoreCatalog, which 
> implements ReadableWritableCatalog API



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog

2019-05-07 Thread GitBox
xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support 
table related operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8353#discussion_r281853049
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
 ##
 @@ -85,49 +86,39 @@ public void alterDatabase(String name, CatalogDatabase 
newDatabase, boolean igno
 
// -- tables and views--
 
-   @Override
-   public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
-   throws TableNotExistException, CatalogException {
-   throw new UnsupportedOperationException();
-   }
-
-   @Override
-   public void renameTable(ObjectPath tablePath, String newTableName, 
boolean ignoreIfNotExists)
-   throws TableNotExistException, 
TableAlreadyExistException, CatalogException {
-   throw new UnsupportedOperationException();
-   }
-
@Override
public void createTable(ObjectPath tablePath, CatalogBaseTable table, 
boolean ignoreIfExists)
throws TableAlreadyExistException, 
DatabaseNotExistException, CatalogException {
-   throw new UnsupportedOperationException();
-   }
 
-   @Override
-   public void alterTable(ObjectPath tableName, CatalogBaseTable newTable, 
boolean ignoreIfNotExists)
-   throws TableNotExistException, CatalogException {
-   throw new UnsupportedOperationException();
-   }
+   validateHiveCatalogTable(table);
 
-   @Override
-   public List listTables(String databaseName)
-   throws DatabaseNotExistException, CatalogException {
-   throw new UnsupportedOperationException();
+   createHiveTable(
+   tablePath,
+   HiveCatalogUtil.createHiveTable(tablePath, table),
+   ignoreIfExists);
}
 
@Override
-   public List listViews(String databaseName) throws 
DatabaseNotExistException, CatalogException {
-   throw new UnsupportedOperationException();
+   public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, 
boolean ignoreIfNotExists)
+   throws TableNotExistException, CatalogException {
+   validateHiveCatalogTable(newTable);
 
 Review comment:
   Wouldn't we want to do similar validation for GenericHiveMetastoreCatalog?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-12417) Unify ReadableCatalog and ReadableWritableCatalog interfaces to Catalog interface

2019-05-07 Thread Bowen Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12417?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li updated FLINK-12417:
-
Summary: Unify ReadableCatalog and ReadableWritableCatalog interfaces to 
Catalog interface  (was: Merge ReadableCatalog and ReadableWritableCatalog)

> Unify ReadableCatalog and ReadableWritableCatalog interfaces to Catalog 
> interface
> -
>
> Key: FLINK-12417
> URL: https://issues.apache.org/jira/browse/FLINK-12417
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> As discussed with [~dawidwys], the original purpose to separate 
> ReadableCatalog and ReadableWritableCatalog is to isolate access to metadata. 
> However, we believe access control and authorization is orthogonal to design 
> of catalog APIs and should be of a different effort.
> Thus, we propose to merge ReadableCatalog and ReadableWritableCatalog to 
> simplify the design.
> cc [~twalthr] [~xuefuz]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] bowenli86 commented on issue #8365: [FLINK-12417][table] Unify ReadableCatalog and ReadableWritableCatalog interfaces to Catalog interface

2019-05-07 Thread GitBox
bowenli86 commented on issue #8365: [FLINK-12417][table] Unify ReadableCatalog 
and ReadableWritableCatalog interfaces to Catalog interface
URL: https://github.com/apache/flink/pull/8365#issuecomment-490271830
 
 
   @flinkbot attention @dawidwys 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] piyushnarang commented on issue #8117: [FLINK-12115] [filesystems]: Add support for AzureFS

2019-05-07 Thread GitBox
piyushnarang commented on issue #8117: [FLINK-12115] [filesystems]: Add support 
for AzureFS
URL: https://github.com/apache/flink/pull/8117#issuecomment-490264770
 
 
   @tillrohrmann - I'm not really sure I understand why we need the 
flink-hadoop-fs-shaded module. Is the whole point of that to reduce the size of 
the flink-azure-fs jar? Or something else? Moreover, is it essential to add 
that right now? We've iterated a fair bit on this PR already and if there is 
something that can be addressed in a future review, I'd prefer doing that 
rather than trying to tackle everything in this one. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on issue #8314: [FLINK-12365][table] Add stats related catalog APIs

2019-05-07 Thread GitBox
xuefuz commented on issue #8314: [FLINK-12365][table] Add stats related catalog 
APIs
URL: https://github.com/apache/flink/pull/8314#issuecomment-490259634
 
 
   I have added more column stats types, leaving out Decimal because that's 
more involved to a different JIRA after this. @KurtYoung could you please take 
another look and merge this so that we can settle on the APIs at least. I have 
rebased this PR for quite a few times already. Thanks.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-12399) FilterableTableSource does not use filters on job run

2019-05-07 Thread Fabian Hueske (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12399?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16835059#comment-16835059
 ] 

Fabian Hueske commented on FLINK-12399:
---

Thanks for looking into this [~walterddr]!

> FilterableTableSource does not use filters on job run
> -
>
> Key: FLINK-12399
> URL: https://issues.apache.org/jira/browse/FLINK-12399
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.8.0
>Reporter: Josh Bradt
>Assignee: Rong Rong
>Priority: Major
> Attachments: flink-filter-bug.tar.gz
>
>
> As discussed [on the mailing 
> list|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Filter-push-down-not-working-for-a-custom-BatchTableSource-tp27654.html],
>  there appears to be a bug where a job that uses a custom 
> FilterableTableSource does not keep the filters that were pushed down into 
> the table source. More specifically, the table source does receive filters 
> via applyPredicates, and a new table source with those filters is returned, 
> but the final job graph appears to use the original table source, which does 
> not contain any filters.
> I attached a minimal example program to this ticket. The custom table source 
> is as follows: 
> {code:java}
> public class CustomTableSource implements BatchTableSource, 
> FilterableTableSource {
> private static final Logger LOG = 
> LoggerFactory.getLogger(CustomTableSource.class);
> private final Filter[] filters;
> private final FilterConverter converter = new FilterConverter();
> public CustomTableSource() {
> this(null);
> }
> private CustomTableSource(Filter[] filters) {
> this.filters = filters;
> }
> @Override
> public DataSet getDataSet(ExecutionEnvironment execEnv) {
> if (filters == null) {
>LOG.info(" No filters defined ");
> } else {
> LOG.info(" Found filters ");
> for (Filter filter : filters) {
> LOG.info("FILTER: {}", filter);
> }
> }
> return execEnv.fromCollection(allModels());
> }
> @Override
> public TableSource applyPredicate(List predicates) {
> LOG.info("Applying predicates");
> List acceptedFilters = new ArrayList<>();
> for (final Expression predicate : predicates) {
> converter.convert(predicate).ifPresent(acceptedFilters::add);
> }
> return new CustomTableSource(acceptedFilters.toArray(new Filter[0]));
> }
> @Override
> public boolean isFilterPushedDown() {
> return filters != null;
> }
> @Override
> public TypeInformation getReturnType() {
> return TypeInformation.of(Model.class);
> }
> @Override
> public TableSchema getTableSchema() {
> return TableSchema.fromTypeInfo(getReturnType());
> }
> private List allModels() {
> List models = new ArrayList<>();
> models.add(new Model(1, 2, 3, 4));
> models.add(new Model(10, 11, 12, 13));
> models.add(new Model(20, 21, 22, 23));
> return models;
> }
> }
> {code}
>  
> When run, it logs
> {noformat}
> 15:24:54,888 INFO  com.klaviyo.filterbug.CustomTableSource
>- Applying predicates
> 15:24:54,901 INFO  com.klaviyo.filterbug.CustomTableSource
>- Applying predicates
> 15:24:54,910 INFO  com.klaviyo.filterbug.CustomTableSource
>- Applying predicates
> 15:24:54,977 INFO  com.klaviyo.filterbug.CustomTableSource
>-  No filters defined {noformat}
> which appears to indicate that although filters are getting pushed down, the 
> final job does not use them.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] flinkbot commented on issue #8365: [FLINK-12417][table] Unify ReadableCatalog and ReadableWritableCatalog interfaces to Catalog interface

2019-05-07 Thread GitBox
flinkbot commented on issue #8365: [FLINK-12417][table] Unify ReadableCatalog 
and ReadableWritableCatalog interfaces to Catalog interface
URL: https://github.com/apache/flink/pull/8365#issuecomment-490221019
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 opened a new pull request #8365: [FLINK-12417][table] Unify ReadableCatalog and ReadableWritableCatalog interfaces to Catalog interface

2019-05-07 Thread GitBox
bowenli86 opened a new pull request #8365: [FLINK-12417][table] Unify 
ReadableCatalog and ReadableWritableCatalog interfaces to Catalog interface
URL: https://github.com/apache/flink/pull/8365
 
 
   ## What is the purpose of the change
   
   This PR unifies `ReadableCatalog` and `ReadableWritableCatalog` interfaces 
to `Catalog` interface to simplify the architecture and management of catalogs.
   
   ## Brief change log
   
 - Created a new interface `Catalog` that contains exactly the same APIs 
from `ReadableCatalog` and `ReadableWritableCatalog`
 - Removed `ReadableCatalog` and `ReadableWritableCatalog`
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes)
 - If yes, how is the feature documented? (JavaDocs)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-12417) Merge ReadableCatalog and ReadableWritableCatalog

2019-05-07 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12417?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-12417:
---
Labels: pull-request-available  (was: )

> Merge ReadableCatalog and ReadableWritableCatalog
> -
>
> Key: FLINK-12417
> URL: https://issues.apache.org/jira/browse/FLINK-12417
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>
> As discussed with [~dawidwys], the original purpose to separate 
> ReadableCatalog and ReadableWritableCatalog is to isolate access to metadata. 
> However, we believe access control and authorization is orthogonal to design 
> of catalog APIs and should be of a different effort.
> Thus, we propose to merge ReadableCatalog and ReadableWritableCatalog to 
> simplify the design.
> cc [~twalthr] [~xuefuz]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12417) Merge ReadableCatalog and ReadableWritableCatalog

2019-05-07 Thread Bowen Li (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16835032#comment-16835032
 ] 

Bowen Li commented on FLINK-12417:
--

[~twalthr] Probably just "Catalog"? I understand the "Table" in {{TableConfig}} 
refers to Table API, but might confuse other people that it's only for "tables".

Yeah, the ReadOnlyCatalog can be added later whenever necessary

> Merge ReadableCatalog and ReadableWritableCatalog
> -
>
> Key: FLINK-12417
> URL: https://issues.apache.org/jira/browse/FLINK-12417
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.9.0
>
>
> As discussed with [~dawidwys], the original purpose to separate 
> ReadableCatalog and ReadableWritableCatalog is to isolate access to metadata. 
> However, we believe access control and authorization is orthogonal to design 
> of catalog APIs and should be of a different effort.
> Thus, we propose to merge ReadableCatalog and ReadableWritableCatalog to 
> simplify the design.
> cc [~twalthr] [~xuefuz]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] bowenli86 opened a new pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog

2019-05-07 Thread GitBox
bowenli86 opened a new pull request #8353: [FLINK-12233][hive] Support table 
related operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8353
 
 
   ## What is the purpose of the change
   
   This PR introduced HiveCatalogTable and implemented table related catalog 
APIs in HiveCatalog.
   
   ## Brief change log
   
   - Created a basic `HiveCatalogTable`, and we can enrich this class later
   - Implemented table related catalog APIs in `HiveCatalog`
   - Extracted common logic between `HiveCatalog` and 
`GenericHiveMetastoreCatalog` into `HiveCatalogBase`
   - Moved a few util classes from package 
`org.apache.flink.table.catalog.hive` to 
`org.apache.flink.table.catalog.hive.util`
   
   
   ## Verifying this change
   
   Reuse tests in CatalogTestBase.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes)
 - If yes, how is the feature documented? (JavaDocs)
   
   
   This PR depends on https://github.com/apache/flink/pull/8339


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


  1   2   3   >