[GitHub] [flink] curcur edited a comment on pull request #11725: [FLINK-15670][API] Provide a Kafka Source/Sink pair as KafkaShuffle

2020-04-26 Thread GitBox


curcur edited a comment on pull request #11725:
URL: https://github.com/apache/flink/pull/11725#issuecomment-619703591


   Sorry to write this much again. These are all good questions that I think is 
more clear to answer in a systematic way :-). Let's chat in detail this 
afternoon.
   
     1. Changes upon `SinkFunction`. 
   By "coordination", do you mean that other changes are also made upon 
`SinkFunction` and may need coordination?
   
   I've expected reviewers to have strong reactions to the API change, that's 
fine. But I am a bit confused about what is agreed/disagreed and what is a 
suggested better way, so let me try to clarify some of my thoughts and reason 
about why the API is changed in this way.
   
   As suggested by Stephan, In the PR, I do have a custom operator 
`StreamShuffleSink` and a custom transformation in `SinkTransformation` for the 
custom operator. As Arvid mentioned in the previous reviews, there are a lot of 
code duplications between `StreamShuffleSink` and `StreamSink`. 
   - That's true because they are very similar LOL, but we do want to provide a 
different operator to minimize the impact of changes on existing operators.
   - Personally, I do not prefer to have multi-levels of extends/subclasses, 
especially if the superclass is not abstract. Multi-level extensions make code 
very difficult to read. You can not easily track what functions/members a class 
contains in a straightforward way, especially without a good IDE. 
   - Come back to the duplication. There are in total 100 lines of code, with 
very simple logic. So personally I would prefer to trade these `100` lines of 
code for `readability`.
   
   `SinkFunction` as its name, is the function invoked in the sink operator to 
provide a invoke function to handle record. `FlinkKafkaProducer` itself is a 
TwoPhaseCommitSinkFunction which implements `SinkFunction`.
   If we really want to avoid changing `SinkFunction`, I can have a new 
interface and have the current TwoPhaseCommitSinkFunction implements the new 
interface. It should be safer than the current way, and also avoids conflicts 
if that's the concern.

   Please let me know what do you think of this proposal.
   
    2. `StreamElementSerializer`; 
   I can not simply use `StreamElementSerializer` because the way watermark is 
stored/handled is different. In short, if multiple sink subtasks write to the 
same partition (sink), we need a way to decide the watermark in the source 
(downstream operator from the shuffle perspective). 
   In the current netty shuffle service, we keep N channels and watermarks in 
each channel; while in this case, data and watermarks have been merged when 
writing to partitions.
   
   Please refer to Jira FLINK-15670 for discussion about watermark: 
   You can start from 
[here](https://issues.apache.org/jira/browse/FLINK-15670?focusedCommentId=17053232=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17053232)
 to save some time. It includes my original thoughts, proposals and Stephan's 
enhanced version.
   
    3. "Checkpoints" and "Savepoints"
   As far as I know, savepoints are very similar to checkpoints except that 
savepoints are more or less user-faced. That says user can trigger a replay 
based on save points. I guess I can kind of understanding why you are saying 
"restoring from an old savepoint would completely screw up the data". It is 
true if you think of this problem from a global snapshotting and global 
failover perspective. 
   
   However, let's step back and think of why we we want to have the persistent 
shuffle in the first place. If data is persisted, you do not really need to 
replay the calculation again. Persistency is to unleash the constraints between 
upstream and downstream.
   
   For your concern, we do not need to do a global replay as well. We can 
simply do a regional replay. If there is any constraints in implementation, we 
can disable it for now. In the long term, I do not see it is a problem.
   
   But, Maybe I misunderstand you :-)
   
    4. Why we need `KafkaShuffleProducer` and `KafkaShuffleFetcher`? Why do 
not just reuse the current `KafkaProducer` and `KafkaConumser`?
   I've actually had four versions of this POC (links are listed in the Jira). 
The first two are using the existed kafka producer and consumer. There are 
several reasons I've decided to have a separate one.
   - Watermark write/read; As you can see, there are extra logic of handling 
watermark in both read and write side; and I do not want to interfere with 
existing Kafka reads/writes
   - The current KafkaProducer has two ways to use
   through KeyedSerializationSchema (depreciated) and 
   through (KafkaSerializationSchema). 
   If we want to use a customPartitioner, we have to use 
KeyedSerializationSchema (bundled)
   - KeyedSerializationSchema following the Kafka key-value schema (key write 
to the key, and value write to the value); However, KafkaShuffle 

[GitHub] [flink] xintongsong commented on a change in pull request #11916: [FLINK-17390][yarn] Fix Hadoop 2.10+ compatibility for WorkerSpecContainerResourceAdapter.

2020-04-26 Thread GitBox


xintongsong commented on a change in pull request #11916:
URL: https://github.com/apache/flink/pull/11916#discussion_r415529655



##
File path: 
flink-yarn/src/test/java/org/apache/flink/yarn/WorkerSpecContainerResourceAdapterTest.java
##
@@ -194,6 +198,41 @@ public void testMaxLimit() {

assertFalse(adapter.tryComputeContainerResource(workerSpec2).isPresent());
}
 
+   @Test
+   public void testMatchResourceWithDifferentImplementation() {
+   final WorkerSpecContainerResourceAdapter.MatchingStrategy 
strategy =
+   
WorkerSpecContainerResourceAdapter.MatchingStrategy.IGNORE_VCORE;
+   final int minMemMB = 1;
+   final int minVcore = 1;
+
+   final WorkerSpecContainerResourceAdapter adapter =
+   new WorkerSpecContainerResourceAdapter(
+   getConfigProcessSpecEqualsWorkerSpec(),
+   minMemMB,
+   minVcore,
+   Integer.MAX_VALUE,
+   Integer.MAX_VALUE);
+
+   final WorkerResourceSpec workerSpec = new 
WorkerResourceSpec.Builder()
+   .setCpuCores(1.0)
+   .setTaskHeapMemoryMB(100)
+   .setTaskOffHeapMemoryMB(200)
+   .setNetworkMemoryMB(300)
+   .setManagedMemoryMB(400)
+   .build();
+
+   Optional resourceOpt = 
adapter.tryComputeContainerResource(workerSpec);
+   assertTrue(resourceOpt.isPresent());
+   Resource resourceImpl1 = resourceOpt.get();
+
+   Resource resourceImpl2 = new TestingResourceImpl(
+   resourceImpl1.getMemory(),
+   resourceImpl1.getVirtualCores() + 1);

Review comment:
   I think it's just for getting a typical non-identical resource as an 
argument for `getEquivalentContainerResource`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #11839: [FLINK-17166][dist] Modify the log4j-console.properties to also output logs into the files for WebUI

2020-04-26 Thread GitBox


flinkbot edited a comment on pull request #11839:
URL: https://github.com/apache/flink/pull/11839#issuecomment-617042256


   
   ## CI report:
   
   * ab5e8324f0ace63d1e5b3f292dd6d517b056fd21 UNKNOWN
   * 6e097b629bafde50b88d261cd856d624cf6f89c5 Travis: 
[CANCELED](https://travis-ci.com/github/flink-ci/flink/builds/162139302) Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=276)
 
   * 2ca466db8ef9f2c9ef74cdd830078610dd13506a Travis: 
[PENDING](https://travis-ci.com/github/flink-ci/flink/builds/162146799) Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=282)
 
   * 2c1df7529825b6a0af7e57d29febeade36a758b1 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-15669) SQL client can't cancel flink job

2020-04-26 Thread Yu Li (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17093025#comment-17093025
 ] 

Yu Li commented on FLINK-15669:
---

Since the fix is still good and only test reverted, I think it makes sense to 
still leave the fix version of this JIRA to 1.10.1. Maybe we could also close 
this one and open another JIRA to harden the test, what do you think? 
[~aljoscha] [~godfreyhe] Thanks.

> SQL client can't cancel flink job
> -
>
> Key: FLINK-15669
> URL: https://issues.apache.org/jira/browse/FLINK-15669
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client
>Affects Versions: 1.10.0
>Reporter: godfrey he
>Assignee: godfrey he
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.10.1, 1.11.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> in sql client, CLI client do cancel query operation through {{void 
> cancelQuery(String sessionId, String resultId)}} method in {{Executor}}. 
> However, the {{resultId}} is a random UUID, is not the job id. So CLI client 
> can't cancel a running job.
> related code in {{LocalExecutor}}:
> {code:java}
> private  ResultDescriptor executeQueryInternal(String sessionId, 
> ExecutionContext context, String query) {
>..
>   // store the result with a unique id
>   final String resultId = UUID.randomUUID().toString();
>   resultStore.storeResult(resultId, result);
>   ..
>   // create execution
>   final ProgramDeployer deployer = new ProgramDeployer(
>   configuration, jobName, pipeline);
>   // start result retrieval
>   result.startRetrieval(deployer);
>   return new ResultDescriptor(
>   resultId,
>   removeTimeAttributes(table.getSchema()),
>   result.isMaterialized());
> }
> private  void cancelQueryInternal(ExecutionContext context, String 
> resultId) {
>   ..
>   // stop Flink job
>   try (final ClusterDescriptor clusterDescriptor = 
> context.createClusterDescriptor()) {
>   ClusterClient clusterClient = null;
>   try {
>   // retrieve existing cluster
>   clusterClient = 
> clusterDescriptor.retrieve(context.getClusterId()).getClusterClient();
>   try {
>   //  cancel job through resultId ===
>   clusterClient.cancel(new 
> JobID(StringUtils.hexStringToByte(resultId))).get();
>   } catch (Throwable t) {
>   // the job might has finished earlier
>   }
>   } catch (Exception e) {
>   throw new SqlExecutionException("Could not retrieve or 
> create a cluster.", e);
>   } finally {
>   try {
>   if (clusterClient != null) {
>   clusterClient.close();
>   }
>   } catch (Exception e) {
>   // ignore
>   }
>   }
>   } catch (SqlExecutionException e) {
>   throw e;
>   } catch (Exception e) {
>   throw new SqlExecutionException("Could not locate a cluster.", 
> e);
>   }
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on pull request #11897: [FLINK-16104] Translate "Streaming Aggregation" page of "Table API & SQL" into Chinese

2020-04-26 Thread GitBox


flinkbot edited a comment on pull request #11897:
URL: https://github.com/apache/flink/pull/11897#issuecomment-618812841


   
   ## CI report:
   
   * 1cd0f11272e1d4013f2f09a328c8777ae32e5470 Travis: 
[CANCELED](https://travis-ci.com/github/flink-ci/flink/builds/162046977) Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=260)
 
   * 67027fc17a69ef4edc8300e4b82a9e1233b016ee UNKNOWN
   * da909ac1e2831251bf60375a080cc192e4e4a0c8 Travis: 
[PENDING](https://travis-ci.com/github/flink-ci/flink/builds/162152427) Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=283)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #11916: [FLINK-17390][yarn] Fix Hadoop 2.10+ compatibility for WorkerSpecContainerResourceAdapter.

2020-04-26 Thread GitBox


flinkbot edited a comment on pull request #11916:
URL: https://github.com/apache/flink/pull/11916#issuecomment-619702040


   
   ## CI report:
   
   * 92fb1fc93dd94819389bdedf93a13f2dc31397c7 Travis: 
[CANCELED](https://travis-ci.com/github/flink-ci/flink/builds/162145331) Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=281)
 
   * 6c4876c5e0c19afae95c808429442ad65c2585ca Travis: 
[PENDING](https://travis-ci.com/github/flink-ci/flink/builds/162152459) Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=284)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-17338) LocalExecutorITCase.testBatchQueryCancel test timeout

2020-04-26 Thread Yu Li (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17338?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17093023#comment-17093023
 ] 

Yu Li commented on FLINK-17338:
---

Thanks for the analysis and clarification [~aljoscha]

> LocalExecutorITCase.testBatchQueryCancel test timeout
> -
>
> Key: FLINK-17338
> URL: https://issues.apache.org/jira/browse/FLINK-17338
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client, Table SQL / Legacy Planner, Table 
> SQL / Planner
>Affects Versions: 1.10.0, 1.11.0
>Reporter: Robert Metzger
>Assignee: Aljoscha Krettek
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.10.1, 1.11.0
>
>
> CI https://travis-ci.org/github/apache/flink/jobs/678185941
> {code}
> [INFO] ---
> [INFO]  T E S T S
> [INFO] ---
> [INFO] Running org.apache.flink.table.client.gateway.local.LocalExecutorITCase
> [ERROR] Tests run: 70, Failures: 0, Errors: 1, Skipped: 5, Time elapsed: 
> 226.359 s <<< FAILURE! - in 
> org.apache.flink.table.client.gateway.local.LocalExecutorITCase
> [ERROR] testBatchQueryCancel[Planner: 
> old](org.apache.flink.table.client.gateway.local.LocalExecutorITCase)  Time 
> elapsed: 30.009 s  <<< ERROR!
> org.junit.runners.model.TestTimedOutException: test timed out after 3 
> milliseconds
>   at java.lang.Thread.sleep(Native Method)
>   at 
> org.apache.flink.table.client.gateway.local.LocalExecutorITCase.testBatchQueryCancel(LocalExecutorITCase.java:733)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298)
>   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at java.lang.Thread.run(Thread.java:748)
> [INFO] 
> [INFO] Results:
> [INFO] 
> [ERROR] Errors: 
> [ERROR]   LocalExecutorITCase.testBatchQueryCancel:733 » TestTimedOut test 
> timed out aft...
> [INFO] 
> [ERROR] Tests run: 70, Failures: 0, Errors: 1, Skipped: 5
> [INFO] 
> [INFO] 
> 
> [INFO] Reactor Summary:
> [INFO] 
> [INFO] flink-table-common . SUCCESS [  9.955 
> s]
> [INFO] flink-table-api-java ... SUCCESS [  4.615 
> s]
> [INFO] flink-table-api-java-bridge  SUCCESS [  4.150 
> s]
> [INFO] flink-table-api-scala .. SUCCESS [  2.843 
> s]
> [INFO] flink-table-api-scala-bridge ... SUCCESS [  2.843 
> s]
> [INFO] flink-cep .. SUCCESS [ 30.868 
> s]
> [INFO] flink-table-planner  SUCCESS [04:51 
> min]
> [INFO] flink-cep-scala  SUCCESS [  8.361 
> s]
> [INFO] flink-sql-client ... FAILURE [04:21 
> min]
> [INFO] flink-state-processor-api .. SKIPPED
> [INFO] 
> 
> [INFO] BUILD FAILURE
> [INFO] 
> 
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] xintongsong commented on a change in pull request #11916: [FLINK-17390][yarn] Fix Hadoop 2.10+ compatibility for WorkerSpecContainerResourceAdapter.

2020-04-26 Thread GitBox


xintongsong commented on a change in pull request #11916:
URL: https://github.com/apache/flink/pull/11916#discussion_r415523696



##
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/WorkerSpecContainerResourceAdapter.java
##
@@ -71,52 +72,65 @@
}
 
Optional tryComputeContainerResource(final WorkerResourceSpec 
workerResourceSpec) {
-   return 
Optional.ofNullable(workerSpecToContainerResource.computeIfAbsent(
+   final InternalContainerResource internalContainerResource = 
workerSpecToContainerResource.computeIfAbsent(
Preconditions.checkNotNull(workerResourceSpec),
-   this::createAndMapContainerResource));
+   this::createAndMapContainerResource);
+   if (internalContainerResource != null) {
+   return 
Optional.of(internalContainerResource.toResource());
+   } else {
+   return Optional.empty();
+   }
}
 
Set getWorkerSpecs(final Resource 
containerResource, final MatchingStrategy matchingStrategy) {
-   return getEquivalentContainerResource(containerResource, 
matchingStrategy).stream()
+   final InternalContainerResource internalContainerResource = new 
InternalContainerResource(containerResource);
+   return 
getEquivalentInternalContainerResource(internalContainerResource, 
matchingStrategy).stream()
.flatMap(resource -> 
containerResourceToWorkerSpecs.getOrDefault(resource, 
Collections.emptySet()).stream())
.collect(Collectors.toSet());
}
 
Set getEquivalentContainerResource(final Resource 
containerResource, final MatchingStrategy matchingStrategy) {
+   final InternalContainerResource internalContainerResource = new 
InternalContainerResource(containerResource);
+   return 
getEquivalentInternalContainerResource(internalContainerResource, 
matchingStrategy).stream()
+   .map(InternalContainerResource::toResource)
+   .collect(Collectors.toSet());
+   }
+
+   private Set 
getEquivalentInternalContainerResource(final InternalContainerResource 
internalContainerResource, final MatchingStrategy matchingStrategy) {
// Yarn might ignore the requested vcores, depending on its 
configurations.
// In such cases, we should also not matching vcores.
-   final Set equivalentContainerResources;
+   final Set 
equivalentInternalContainerResources;
switch (matchingStrategy) {
case MATCH_VCORE:
-   equivalentContainerResources = 
Collections.singleton(containerResource);
+   equivalentInternalContainerResources = 
Collections.singleton(internalContainerResource);
break;
case IGNORE_VCORE:
default:
-   equivalentContainerResources = 
containerMemoryToContainerResource
-   
.getOrDefault(containerResource.getMemory(), Collections.emptySet());
+   equivalentInternalContainerResources = 
containerMemoryToContainerResource
+   
.getOrDefault(internalContainerResource.memory, Collections.emptySet());
break;
}
-   return equivalentContainerResources;
+   return equivalentInternalContainerResources;
}
 
@Nullable
-   private Resource createAndMapContainerResource(final WorkerResourceSpec 
workerResourceSpec) {
+   private InternalContainerResource createAndMapContainerResource(final 
WorkerResourceSpec workerResourceSpec) {

Review comment:
   We cannot do that because it is `InternalContainerResource` we want to 
put into `workerSpecToContainerResource`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] xintongsong commented on a change in pull request #11916: [FLINK-17390][yarn] Fix Hadoop 2.10+ compatibility for WorkerSpecContainerResourceAdapter.

2020-04-26 Thread GitBox


xintongsong commented on a change in pull request #11916:
URL: https://github.com/apache/flink/pull/11916#discussion_r415523468



##
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/WorkerSpecContainerResourceAdapter.java
##
@@ -71,52 +72,65 @@
}
 
Optional tryComputeContainerResource(final WorkerResourceSpec 
workerResourceSpec) {
-   return 
Optional.ofNullable(workerSpecToContainerResource.computeIfAbsent(
+   final InternalContainerResource internalContainerResource = 
workerSpecToContainerResource.computeIfAbsent(
Preconditions.checkNotNull(workerResourceSpec),
-   this::createAndMapContainerResource));
+   this::createAndMapContainerResource);
+   if (internalContainerResource != null) {
+   return 
Optional.of(internalContainerResource.toResource());
+   } else {
+   return Optional.empty();
+   }
}
 
Set getWorkerSpecs(final Resource 
containerResource, final MatchingStrategy matchingStrategy) {
-   return getEquivalentContainerResource(containerResource, 
matchingStrategy).stream()
+   final InternalContainerResource internalContainerResource = new 
InternalContainerResource(containerResource);
+   return 
getEquivalentInternalContainerResource(internalContainerResource, 
matchingStrategy).stream()

Review comment:
   Same here. I'd like to make it explicit that `Resource` is only used in 
arguments and return values of methods visible from outside, and everywhere 
else should use `InternalContainerResource`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] xintongsong commented on a change in pull request #11916: [FLINK-17390][yarn] Fix Hadoop 2.10+ compatibility for WorkerSpecContainerResourceAdapter.

2020-04-26 Thread GitBox


xintongsong commented on a change in pull request #11916:
URL: https://github.com/apache/flink/pull/11916#discussion_r415522836



##
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/WorkerSpecContainerResourceAdapter.java
##
@@ -71,52 +72,65 @@
}
 
Optional tryComputeContainerResource(final WorkerResourceSpec 
workerResourceSpec) {
-   return 
Optional.ofNullable(workerSpecToContainerResource.computeIfAbsent(
+   final InternalContainerResource internalContainerResource = 
workerSpecToContainerResource.computeIfAbsent(
Preconditions.checkNotNull(workerResourceSpec),
-   this::createAndMapContainerResource));
+   this::createAndMapContainerResource);
+   if (internalContainerResource != null) {
+   return 
Optional.of(internalContainerResource.toResource());
+   } else {
+   return Optional.empty();
+   }
}
 
Set getWorkerSpecs(final Resource 
containerResource, final MatchingStrategy matchingStrategy) {
-   return getEquivalentContainerResource(containerResource, 
matchingStrategy).stream()
+   final InternalContainerResource internalContainerResource = new 
InternalContainerResource(containerResource);
+   return 
getEquivalentInternalContainerResource(internalContainerResource, 
matchingStrategy).stream()
.flatMap(resource -> 
containerResourceToWorkerSpecs.getOrDefault(resource, 
Collections.emptySet()).stream())
.collect(Collectors.toSet());
}
 
Set getEquivalentContainerResource(final Resource 
containerResource, final MatchingStrategy matchingStrategy) {
+   final InternalContainerResource internalContainerResource = new 
InternalContainerResource(containerResource);
+   return 
getEquivalentInternalContainerResource(internalContainerResource, 
matchingStrategy).stream()
+   .map(InternalContainerResource::toResource)
+   .collect(Collectors.toSet());
+   }
+
+   private Set 
getEquivalentInternalContainerResource(final InternalContainerResource 
internalContainerResource, final MatchingStrategy matchingStrategy) {

Review comment:
   I see your point. But I think the benefit for the current approach is 
that, we make it explicit that for all methods visible from outside the first 
thing we do is to convert `Resource` into `InternalContainerResource`, and the 
last thing we do before returning is to covert it back.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (FLINK-17399) CsvTableSink should also extend from OverwritableTableSink

2020-04-26 Thread godfrey he (Jira)
godfrey he created FLINK-17399:
--

 Summary: CsvTableSink should also extend from OverwritableTableSink
 Key: FLINK-17399
 URL: https://issues.apache.org/jira/browse/FLINK-17399
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / API
Reporter: godfrey he
 Fix For: 1.11.0


{{CsvTableSink}} has supported {{writeMode}} which could be {{OVERWRITE}} or 
{{NO_OVERWRITE}}. When we execute "INSERT OVERWRITE csv_table_sink xx", 
planners will check whether a table sink is an {{OverwritableTableSink}}.
Now {{CsvTableSink}} does not extend from {{OverwritableTableSink}}, so we 
can't execute above statement. 




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on pull request #11916: [FLINK-17390][yarn] Fix Hadoop 2.10+ compatibility for WorkerSpecContainerResourceAdapter.

2020-04-26 Thread GitBox


flinkbot edited a comment on pull request #11916:
URL: https://github.com/apache/flink/pull/11916#issuecomment-619702040


   
   ## CI report:
   
   * 92fb1fc93dd94819389bdedf93a13f2dc31397c7 Travis: 
[PENDING](https://travis-ci.com/github/flink-ci/flink/builds/162145331) Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=281)
 
   * 6c4876c5e0c19afae95c808429442ad65c2585ca UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #11915: [FLINK-17395][python] Add the sign and sha logic for PyFlink wheel packages

2020-04-26 Thread GitBox


flinkbot edited a comment on pull request #11915:
URL: https://github.com/apache/flink/pull/11915#issuecomment-619697532


   
   ## CI report:
   
   * dc1358b12a72c8fb87289083c585dd8345f41d86 Travis: 
[SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/162143714) Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=278)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #11897: [FLINK-16104] Translate "Streaming Aggregation" page of "Table API & SQL" into Chinese

2020-04-26 Thread GitBox


flinkbot edited a comment on pull request #11897:
URL: https://github.com/apache/flink/pull/11897#issuecomment-618812841


   
   ## CI report:
   
   * 1cd0f11272e1d4013f2f09a328c8777ae32e5470 Travis: 
[CANCELED](https://travis-ci.com/github/flink-ci/flink/builds/162046977) Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=260)
 
   * 67027fc17a69ef4edc8300e4b82a9e1233b016ee UNKNOWN
   * da909ac1e2831251bf60375a080cc192e4e4a0c8 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] KarmaGYZ commented on a change in pull request #11916: [FLINK-17390][yarn] Fix Hadoop 2.10+ compatibility for WorkerSpecContainerResourceAdapter.

2020-04-26 Thread GitBox


KarmaGYZ commented on a change in pull request #11916:
URL: https://github.com/apache/flink/pull/11916#discussion_r415511862



##
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/WorkerSpecContainerResourceAdapter.java
##
@@ -71,52 +72,65 @@
}
 
Optional tryComputeContainerResource(final WorkerResourceSpec 
workerResourceSpec) {
-   return 
Optional.ofNullable(workerSpecToContainerResource.computeIfAbsent(
+   final InternalContainerResource internalContainerResource = 
workerSpecToContainerResource.computeIfAbsent(
Preconditions.checkNotNull(workerResourceSpec),
-   this::createAndMapContainerResource));
+   this::createAndMapContainerResource);
+   if (internalContainerResource != null) {
+   return 
Optional.of(internalContainerResource.toResource());
+   } else {
+   return Optional.empty();
+   }
}
 
Set getWorkerSpecs(final Resource 
containerResource, final MatchingStrategy matchingStrategy) {
-   return getEquivalentContainerResource(containerResource, 
matchingStrategy).stream()
+   final InternalContainerResource internalContainerResource = new 
InternalContainerResource(containerResource);
+   return 
getEquivalentInternalContainerResource(internalContainerResource, 
matchingStrategy).stream()
.flatMap(resource -> 
containerResourceToWorkerSpecs.getOrDefault(resource, 
Collections.emptySet()).stream())
.collect(Collectors.toSet());
}
 
Set getEquivalentContainerResource(final Resource 
containerResource, final MatchingStrategy matchingStrategy) {
+   final InternalContainerResource internalContainerResource = new 
InternalContainerResource(containerResource);
+   return 
getEquivalentInternalContainerResource(internalContainerResource, 
matchingStrategy).stream()
+   .map(InternalContainerResource::toResource)
+   .collect(Collectors.toSet());
+   }
+
+   private Set 
getEquivalentInternalContainerResource(final InternalContainerResource 
internalContainerResource, final MatchingStrategy matchingStrategy) {

Review comment:
   I would only keep the `Set 
getEquivalentContainerResource(final Resource containerResource, final 
MatchingStrategy matchingStrategy)`. To avoid duplication code `final 
InternalContainerResource internalContainerResource = new 
InternalContainerResource(containerResource);`

##
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/WorkerSpecContainerResourceAdapter.java
##
@@ -71,52 +72,65 @@
}
 
Optional tryComputeContainerResource(final WorkerResourceSpec 
workerResourceSpec) {
-   return 
Optional.ofNullable(workerSpecToContainerResource.computeIfAbsent(
+   final InternalContainerResource internalContainerResource = 
workerSpecToContainerResource.computeIfAbsent(
Preconditions.checkNotNull(workerResourceSpec),
-   this::createAndMapContainerResource));
+   this::createAndMapContainerResource);
+   if (internalContainerResource != null) {
+   return 
Optional.of(internalContainerResource.toResource());
+   } else {
+   return Optional.empty();
+   }
}
 
Set getWorkerSpecs(final Resource 
containerResource, final MatchingStrategy matchingStrategy) {
-   return getEquivalentContainerResource(containerResource, 
matchingStrategy).stream()
+   final InternalContainerResource internalContainerResource = new 
InternalContainerResource(containerResource);
+   return 
getEquivalentInternalContainerResource(internalContainerResource, 
matchingStrategy).stream()

Review comment:
   Seems equivalent to `getEquivalentContainerResource(final Resource 
containerResource, final MatchingStrategy matchingStrategy)`.

##
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/WorkerSpecContainerResourceAdapter.java
##
@@ -71,52 +72,65 @@
}
 
Optional tryComputeContainerResource(final WorkerResourceSpec 
workerResourceSpec) {
-   return 
Optional.ofNullable(workerSpecToContainerResource.computeIfAbsent(
+   final InternalContainerResource internalContainerResource = 
workerSpecToContainerResource.computeIfAbsent(
Preconditions.checkNotNull(workerResourceSpec),
-   this::createAndMapContainerResource));
+   this::createAndMapContainerResource);
+   if (internalContainerResource != null) {
+   return 
Optional.of(internalContainerResource.toResource());
+   } else {
+

[GitHub] [flink] dianfu commented on a change in pull request #11915: [FLINK-17395][python] Add the sign and sha logic for PyFlink wheel packages

2020-04-26 Thread GitBox


dianfu commented on a change in pull request #11915:
URL: https://github.com/apache/flink/pull/11915#discussion_r415511471



##
File path: tools/releasing/create_binary_release.sh
##
@@ -109,14 +112,30 @@ make_python_release() {
 
   cp ${pyflink_actual_name} "${RELEASE_DIR}/${pyflink_release_name}"
 
+  # RM need to move the downloaded wheel packages from Azure CI to the 
directory of flink-python/dist manually.
+  for wheel_file in *.whl; do
+if [[ ! ${wheel_file} =~ ^apache_flink-$PYFLINK_VERSION- ]]; then

Review comment:
   So the name of the wheel package is `apache_flink-version.whl` instead 
of `apache-flink-version.whl`?

##
File path: tools/releasing/create_binary_release.sh
##
@@ -94,7 +94,10 @@ make_python_release() {
   cd flink-python/
   # use lint-python.sh script to create a python environment.
   dev/lint-python.sh -s basic
+  set +eu

Review comment:
   Why we need this change?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #11876: [FLINK-17334] HIVE UDF BUGFIX

2020-04-26 Thread GitBox


flinkbot edited a comment on pull request #11876:
URL: https://github.com/apache/flink/pull/11876#issuecomment-618262946


   
   ## CI report:
   
   * 2a7b13805feb162cf26a374609d6e03af6a58990 Travis: 
[FAILURE](https://travis-ci.com/github/flink-ci/flink/builds/162145298) Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=280)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #11839: [FLINK-17166][dist] Modify the log4j-console.properties to also output logs into the files for WebUI

2020-04-26 Thread GitBox


flinkbot edited a comment on pull request #11839:
URL: https://github.com/apache/flink/pull/11839#issuecomment-617042256


   
   ## CI report:
   
   * ab5e8324f0ace63d1e5b3f292dd6d517b056fd21 UNKNOWN
   * 6e097b629bafde50b88d261cd856d624cf6f89c5 Travis: 
[CANCELED](https://travis-ci.com/github/flink-ci/flink/builds/162139302) Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=276)
 
   * 2ca466db8ef9f2c9ef74cdd830078610dd13506a Travis: 
[PENDING](https://travis-ci.com/github/flink-ci/flink/builds/162146799) Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=282)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #11351: [FLINK-16404][runtime] Avoid caching buffers for blocked input channels before barrier alignment

2020-04-26 Thread GitBox


flinkbot edited a comment on pull request #11351:
URL: https://github.com/apache/flink/pull/11351#issuecomment-596351676


   
   ## CI report:
   
   * 715889a35cfcc3aaf1b17f39dadaa86f755cc75d UNKNOWN
   * 548b22258f5e87fd53b45c7f4bb6de40bfd4e6d2 UNKNOWN
   * 9179aab74eeaf80ea30c0894f3e5a0171338baed UNKNOWN
   * cd385c55e9dc111a061e19e2387f2ae9ce21369e UNKNOWN
   * e869d82f1aa639c02af4a82c5026f939bf4e8f6c UNKNOWN
   * f86372054c1c4724b8dabc8d06e369475e64ac29 UNKNOWN
   * a0c7ff983988f27f5e47f4c71c8fb1ef28f8a24a UNKNOWN
   * f2ba8a55cb8e441a1377b2cc00957e13e7445e47 UNKNOWN
   * 3f566d7286ffa73fb26b729d76ccb5129ca3f974 UNKNOWN
   * d05bfb1ccc492a22a738da35135e372f7f2c48dc Travis: 
[SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/162139169) Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=275)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] xintongsong commented on a change in pull request #11916: [FLINK-17390][yarn] Fix Hadoop 2.10+ compatibility for WorkerSpecContainerResourceAdapter.

2020-04-26 Thread GitBox


xintongsong commented on a change in pull request #11916:
URL: https://github.com/apache/flink/pull/11916#discussion_r415508834



##
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/WorkerSpecContainerResourceAdapter.java
##
@@ -128,12 +142,61 @@ private int normalize(final int value, final int 
unitValue) {
return MathUtils.divideRoundUp(value, unitValue) * unitValue;
}
 
-   boolean resourceWithinMaxAllocation(final Resource resource) {
-   return resource.getMemory() <= maxMemMB && 
resource.getVirtualCores() <= maxVcore;
+   boolean resourceWithinMaxAllocation(final InternalContainerResource 
resource) {
+   return resource.memory <= maxMemMB && resource.vcores <= 
maxVcore;
}
 
enum MatchingStrategy {
MATCH_VCORE,
IGNORE_VCORE
}
+
+   /**
+* An {@link InternalContainerResource} corresponds to a {@link 
Resource}.
+* This class is for {@link WorkerSpecContainerResourceAdapter} 
internal usages only, to overcome the problem that
+* hash codes are calculated inconsistently across different {@link 
Resource} implementations.
+*/
+   private class InternalContainerResource {
+   private final int memory;
+   private final int vcores;
+
+   private InternalContainerResource(final int memory, final int 
vcores) {
+   this.memory = memory;
+   this.vcores = vcores;
+   }
+
+   private InternalContainerResource(final Resource resource) {
+   this(
+   
Preconditions.checkNotNull(resource).getMemory(),
+   
Preconditions.checkNotNull(resource).getVirtualCores());
+   }
+
+   private Resource toResource() {
+   return Resource.newInstance(memory, vcores);
+   }
+
+   @Override
+   public boolean equals(Object obj) {
+   if (obj == this) {
+   return true;
+   } else if (obj != null && obj.getClass() == 
InternalContainerResource.class) {

Review comment:
   I assume the concern is that `equals()` might be invoked on a subclass 
instance of `InternalContainerResource`? To that end, I think we can make the 
`InternalContainerResource` a final class, since it's not supposed to be 
inherited.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-17224) Precision of TIME type does not work correctly

2020-04-26 Thread Danny Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17224?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17092992#comment-17092992
 ] 

Danny Chen commented on FLINK-17224:


[~dwysakowicz] I have fixed the TIME type conversion between our LogicalType 
and RelDataType [1], but there are still 2 problems:

1. we only support TIME type with precision 0 for Blink planner runtime
2. we do not support type conversion of different TIME type yet, either from 
"CAST" expression or output conversion, i.e. from TIME(2) to TIME(1)

[1] 
https://github.com/danny0405/flink/commit/20224ac7203599070105aad34e2b7b74c621f867

> Precision of TIME type does not work correctly
> --
>
> Key: FLINK-17224
> URL: https://issues.apache.org/jira/browse/FLINK-17224
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner
>Reporter: Dawid Wysakowicz
>Assignee: Danny Chen
>Priority: Critical
>
> The support for precision in TIME type does not work correctly causing many 
> different often cryptic problems.
> Precision is completely ignored in {{FlinkTypeFactory:440-446}}:
> {code}
>   case TIME =>
> if (relDataType.getPrecision > 3) {
>   throw new TableException(
> s"TIME precision is not supported: ${relDataType.getPrecision}")
> }
> // blink runner support precision 3, but for consistent with flink 
> runner, we set to 0.
> new TimeType()
> {code}
> Example problem:
> {code}
> @Test
> public void testTimeScalarFunction() throws Exception {
>   int nanoOfDay = 10 * 1_000_000;
>   final List sourceData = Collections.singletonList(
>   Row.of(LocalTime.ofNanoOfDay(nanoOfDay))
>   );
>   final List sinkData = Arrays.asList(
>   Row.of(nanoOfDay)
>   );
>   TestCollectionTableFactory.reset();
>   TestCollectionTableFactory.initData(sourceData);
>   tEnv().sqlUpdate("CREATE TABLE SourceTable(s TIME(2)) WITH ('connector' 
> = 'COLLECTION')");
>   tEnv().sqlUpdate("CREATE TABLE SinkTable(s BIGINT) WITH ('connector' = 
> 'COLLECTION')");
>   tEnv().from("SourceTable")
>   .select(call(new TimeScalarFunction(), $("s")))
>   .insertInto("SinkTable");
>   tEnv().execute("Test Job");
>   assertThat(TestCollectionTableFactory.getResult(), equalTo(sinkData));
> }
> public static class TimeScalarFunction extends ScalarFunction {
>   public Long eval(@DataTypeHint("TIME(1)") LocalTime time) {
>   return time.toNanoOfDay();
>   }
> }
> {code}
> fails with:
> {code}
> org.apache.flink.table.api.ValidationException: Invalid function call:
> org$apache$flink$table$planner$runtime$stream$table$FunctionITCase$TimeScalarFunction$a19cd231ba10cbbc0b55ebeda49e2a77(TIME(0))
>   at 
> org.apache.flink.table.types.inference.TypeInferenceUtil.createInvalidCallException(TypeInferenceUtil.java:198)
>   at 
> org.apache.flink.table.planner.functions.inference.TypeInferenceReturnInference.inferReturnType(TypeInferenceReturnInference.java:73)
>   at 
> org.apache.calcite.sql.SqlOperator.inferReturnType(SqlOperator.java:486)
>   at 
> org.apache.calcite.rex.RexBuilder.deriveReturnType(RexBuilder.java:277)
>   at org.apache.calcite.tools.RelBuilder.call(RelBuilder.java:576)
>   at org.apache.calcite.tools.RelBuilder.call(RelBuilder.java:583)
>   at 
> org.apache.flink.table.planner.expressions.converter.FunctionDefinitionConvertRule.convert(FunctionDefinitionConvertRule.java:67)
>   at 
> org.apache.flink.table.planner.expressions.converter.ExpressionConverter.visit(ExpressionConverter.java:97)
>   at 
> org.apache.flink.table.planner.expressions.converter.ExpressionConverter.visit(ExpressionConverter.java:72)
>   at 
> org.apache.flink.table.expressions.CallExpression.accept(CallExpression.java:122)
>   at 
> org.apache.flink.table.planner.plan.QueryOperationConverter.convertExprToRexNode(QueryOperationConverter.java:681)
>   at 
> org.apache.flink.table.planner.plan.QueryOperationConverter.access$800(QueryOperationConverter.java:128)
>   at 
> org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.lambda$convertToRexNodes$2(QueryOperationConverter.java:487)
>   at 
> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
>   at 
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
>   at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
>   at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
>   at 
> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
>   at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>   at 
> 

[GitHub] [flink] godfreyhe commented on a change in pull request #11862: [FLINK-17251] [table] supports INSERT statement in TableEnvironment#executeSql and Table#executeInsert api

2020-04-26 Thread GitBox


godfreyhe commented on a change in pull request #11862:
URL: https://github.com/apache/flink/pull/11862#discussion_r415500318



##
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java
##
@@ -1113,4 +1113,61 @@
 * 
 */
FlatAggregateTable flatAggregate(Expression tableAggregateFunction);
+
+   /**
+* Writes the {@link Table} to a {@link TableSink} that was registered 
under the specified path,
+* and then execute the insert operation.
+*
+* See the documentation of {@link 
TableEnvironment#useDatabase(String)} or
+* {@link TableEnvironment#useCatalog(String)} for the rules on the 
path resolution.
+*
+* A batch {@link Table} can only be written to a
+* {@code org.apache.flink.table.sinks.BatchTableSink}, a streaming 
{@link Table} requires a
+* {@code org.apache.flink.table.sinks.AppendStreamTableSink}, a
+* {@code org.apache.flink.table.sinks.RetractStreamTableSink}, or an
+* {@code org.apache.flink.table.sinks.UpsertStreamTableSink}.
+*
+* Example:
+*
+* 
+* {@code
+*   Table table = tableEnv.fromQuery("select * from MyTable");
+*   TableResult tableResult = table.executeInsert("MySink");
+*   tableResult...
+* }
+* 
+*
+* @param tablePath The path of the registered TableSink to which the 
Table is written.
+* @return The insert operation execution result.
+*/
+   TableResult executeInsert(String tablePath);
+
+   /**
+* Writes the {@link Table} to a {@link TableSink} that was registered 
under the specified path,
+* and then execute the insert operation.
+*
+* See the documentation of {@link 
TableEnvironment#useDatabase(String)} or
+* {@link TableEnvironment#useCatalog(String)} for the rules on the 
path resolution.
+*
+* A batch {@link Table} can only be written to a
+* {@code org.apache.flink.table.sinks.BatchTableSink}, a streaming 
{@link Table} requires a
+* {@code org.apache.flink.table.sinks.AppendStreamTableSink}, a
+* {@code org.apache.flink.table.sinks.RetractStreamTableSink}, or an
+* {@code org.apache.flink.table.sinks.UpsertStreamTableSink}.
+*
+* Example:
+*
+* 
+* {@code
+*   Table table = tableEnv.fromQuery("select * from MyTable");
+*   TableResult tableResult = table.executeInsert("MySink", true);
+*   tableResult...
+* }
+* 
+*
+* @param tablePath The path of the registered TableSink to which the 
Table is written.
+* @param overwrite The flag that indicates whether the insert should 
overwrite existing data or not.
+* @return The insert operation execution result.
+*/
+   TableResult executeInsert(String tablePath, boolean overwrite);

Review comment:
   > IIRC not all the table sinks support overwrite. The logic of dealing 
with `overwrite=true` seems not complete.
   
   both planners have checked whether a sink is an `OverwritableTableSink`. 
   
   I will add some tests about override mode





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #11916: [FLINK-17390][yarn] Fix Hadoop 2.10+ compatibility for WorkerSpecContainerResourceAdapter.

2020-04-26 Thread GitBox


flinkbot edited a comment on pull request #11916:
URL: https://github.com/apache/flink/pull/11916#issuecomment-619702040


   
   ## CI report:
   
   * 92fb1fc93dd94819389bdedf93a13f2dc31397c7 Travis: 
[PENDING](https://travis-ci.com/github/flink-ci/flink/builds/162145331) Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=281)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #11876: [FLINK-17334] HIVE UDF BUGFIX

2020-04-26 Thread GitBox


flinkbot edited a comment on pull request #11876:
URL: https://github.com/apache/flink/pull/11876#issuecomment-618262946


   
   ## CI report:
   
   * 5dc081058c881b0165b44c37e53607ac891814f5 Travis: 
[FAILURE](https://travis-ci.com/github/flink-ci/flink/builds/161774926) Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=170)
 
   * 2a7b13805feb162cf26a374609d6e03af6a58990 Travis: 
[PENDING](https://travis-ci.com/github/flink-ci/flink/builds/162145298) Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=280)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #11791: [FLINK-17210][sql-parser][hive] Implement database DDLs for Hive dialect

2020-04-26 Thread GitBox


flinkbot edited a comment on pull request #11791:
URL: https://github.com/apache/flink/pull/11791#issuecomment-615162346


   
   ## CI report:
   
   * a6a3a9e51fd9ba484d1ab4ccd0646dcacc0692ff Travis: 
[SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/162072493) Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=271)
 
   * 5ce52e3cda62c629df16d95ccb345d830b3a3531 Travis: 
[PENDING](https://travis-ci.com/github/flink-ci/flink/builds/162145244) Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=279)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #11839: [FLINK-17166][dist] Modify the log4j-console.properties to also output logs into the files for WebUI

2020-04-26 Thread GitBox


flinkbot edited a comment on pull request #11839:
URL: https://github.com/apache/flink/pull/11839#issuecomment-617042256


   
   ## CI report:
   
   * 5a26fbb56d996236c296328a6d673bc01982be5f Travis: 
[CANCELED](https://travis-ci.com/github/flink-ci/flink/builds/161221412) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7856)
 
   * ab5e8324f0ace63d1e5b3f292dd6d517b056fd21 UNKNOWN
   * 6e097b629bafde50b88d261cd856d624cf6f89c5 Travis: 
[PENDING](https://travis-ci.com/github/flink-ci/flink/builds/162139302) Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=276)
 
   * 2ca466db8ef9f2c9ef74cdd830078610dd13506a UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] curcur edited a comment on pull request #11725: [FLINK-15670][API] Provide a Kafka Source/Sink pair as KafkaShuffle

2020-04-26 Thread GitBox


curcur edited a comment on pull request #11725:
URL: https://github.com/apache/flink/pull/11725#issuecomment-619703591


   Sorry to write this much again. These are all good questions that I think is 
more clear to answer in a systematic way :-). Let's chat in detail this 
afternoon.
   
     1. Changes upon `SinkFunction`. 
   By "coordination", do you mean that other changes are also made upon 
`SinkFunction` and may need coordination?
   
   I've expected reviewers to have strong reactions to the API change, that's 
fine. But I am a bit confused about what is agreed/disagreed and what is a 
suggested better way, so let me try to clarify some of my thoughts and reason 
about why the API is changed in this way.
   
   As suggested by Stephan, In the PR, I do have a custom operator 
`StreamShuffleSink` and a custom transformation in `SinkTransformation` for the 
custom operator. As Arvid mentioned in the previous reviews, there are a lot of 
code duplications between `StreamShuffleSink` and `StreamSink`. 
   - That's true because they are very similar LOL, but we do want to provide a 
different operator to minimize the impact of changes on existing operators.
   - Personally, I do not prefer to have multi-levels of extends/subclasses, 
especially if the superclass is not abstract. Multi-level extensions make code 
very difficult to read. You can not easily track what functions/members a class 
contains in a straightforward way, especially without a good IDE. 
   - Come back to the duplication. There are in total 100 lines of code, with 
very simple logic. So personally I would prefer to trade these `100` lines of 
code for `readability`.
   
   `SinkFunction` as its name, is the function invoked in the sink operator to 
provide a invoke function to handle record. `FlinkKafkaProducer` itself is a 
TwoPhaseCommitSinkFunction which implements `SinkFunction`.
   If we really want to avoid changing `SinkFunction`, I can have a new 
interface and have the current TwoPhaseCommitSinkFunction implements the new 
interface. It should be safer than the current way, and also avoids conflicts 
if that's the concern.

   Please let me know what do you think of this proposal.
   
    2. `StreamElementSerializer`; 
   I can not simply use `StreamElementSerializer` because the way watermark is 
stored/handled is different. In short, if multiple sink subtasks write to the 
same partition (sink), we need a way to decide the watermark in the source 
(downstream operator from the shuffle perspective). 
   In the current netty shuffle service, we keep N channels and watermarks in 
each channel; while in this case, data and watermarks have been merged when 
writing to partitions.
   
   Please refer to Jira FLINK-15670 for discussion about watermark: 
   You can start from 
[here](https://issues.apache.org/jira/browse/FLINK-15670?focusedCommentId=17053232=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17053232)
 to save some time. It includes my original thoughts, proposals and Stephan's 
enhanced version.
   
    3. "Checkpoints" and "Savepoints"
   As far as I know, savepoints are very similar to checkpoints except that 
savepoints are more or less user-faced. That says user can trigger a replay 
based on save points. I guess I can kind of understanding why you are saying 
"restoring from an old savepoint would completely screw up the data". It is 
true if you think of this problem from a global snapshotting and global 
failover perspective. 
   
   However, let's step back and think of why we we want to have the persistent 
shuffle in the first place. If data is persisted, you do not really need to 
replay the calculation again. Persistency is to unleash the constraints between 
upstream and downstream.
   
   For your concern, we do not need to do a global replay as well. We can 
simply do a regional replay. If there is any constraints in implementation, we 
can disable it for now. In the long term, I do not see it is a problem.
   
   But, Maybe I misunderstand you :-)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] curcur edited a comment on pull request #11725: [FLINK-15670][API] Provide a Kafka Source/Sink pair as KafkaShuffle

2020-04-26 Thread GitBox


curcur edited a comment on pull request #11725:
URL: https://github.com/apache/flink/pull/11725#issuecomment-619703591


   Sorry to write this much again. These are all good questions that I think is 
more clear to answer in a systematic way :-). Let's chat in detail this 
afternoon.
   
     1. Change upon `SinkFunction`. 
   By "coordination", do you mean that other changes are also made upon 
`SinkFunction` and may need coordination?
   
   I've expected reviewers to have strong reactions to the API change, that's 
fine. But I am a bit confused about what is agreed/disagreed and what is a 
suggested better way, so let me try to clarify some of my thoughts and reason 
about why the API is changed in this way.
   
   As suggested by Stephan, In the PR, I do have a custom operator 
`StreamShuffleSink` and a custom transformation in `SinkTransformation` for the 
custom operator. As Arvid mentioned in the previous reviews, there are a lot of 
code duplications between `StreamShuffleSink` and `StreamSink`. 
   - That's true because they are very similar LOL, but we do want to provide a 
different operator to minimize the impact of changes on existing operators.
   - Personally, I do not prefer to have multi-levels of extends/subclasses, 
especially if the superclass is not abstract. Multi-level extensions make code 
very difficult to read. You can not easily track what functions/members a class 
contains in a straightforward way, especially without a good IDE. 
   - Come back to the duplication. There are in total 100 lines of code, with 
very simple logic. So personally I would prefer to trade these `100` lines of 
code for `readability`.
   
   `SinkFunction` as its name, is the function invoked in the sink operator to 
provide a invoke function to handle record. `FlinkKafkaProducer` itself is a 
TwoPhaseCommitSinkFunction which implements `SinkFunction`.
   If we really want to avoid changing `SinkFunction`, I can have a new 
interface and have the current TwoPhaseCommitSinkFunction implements the new 
interface. It should be safer than the current way, and also avoids conflicts 
if that's the concern.

   Please let me know what do you think of this proposal.
   
    2. `StreamElementSerializer`; 
   I can not simply use `StreamElementSerializer` because the way watermark is 
stored/handled is different. In short, if multiple sink subtasks write to the 
same partition (sink), we need a way to decide the watermark in the source 
(downstream operator from the shuffle perspective). 
   In the current netty shuffle service, we keep N channels and watermarks in 
each channel; while in this case, data and watermarks have been merged when 
writing to partitions.
   
   Please refer to Jira FLINK-15670 for discussion about watermark: 
   You can start from 
[here](https://issues.apache.org/jira/browse/FLINK-15670?focusedCommentId=17053232=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17053232)
 to save some time. It includes my original thoughts, proposals and Stephan's 
enhanced version.
   
    3. "Checkpoints" and "Savepoints"
   As far as I know, savepoints are very similar to checkpoints except that 
savepoints are more or less user-faced. That says user can trigger a replay 
based on save points. I guess I can kind of understanding why you are saying 
"restoring from an old savepoint would completely screw up the data". It is 
true if you think of this problem from a global snapshotting and global 
failover perspective. 
   
   However, let's step back and think of why we we want to have the persistent 
shuffle in the first place. If data is persisted, you do not really need to 
replay the calculation again. Persistency is to unleash the constraints between 
upstream and downstream.
   
   For your concern, we do not need to do a global replay as well. We can 
simply do a regional replay. If there is any constraints in implementation, we 
can disable it for now. In the long term, I do not see it is a problem.
   
   But, Maybe I misunderstand you :-)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] curcur edited a comment on pull request #11725: [FLINK-15670][API] Provide a Kafka Source/Sink pair as KafkaShuffle

2020-04-26 Thread GitBox


curcur edited a comment on pull request #11725:
URL: https://github.com/apache/flink/pull/11725#issuecomment-619703591


   Sorry to write this much again. These are all good questions that I think is 
more clear to answer in a systematic way :-). Let's chat in detail this 
afternoon.
   
     1. Change to `SinkFunction`. 
   By "coordination", do you mean that other changes are also made upon 
`SinkFunction` and may need coordination?
   
   I've expected reviewers to have strong reactions to the API change, that's 
fine. But I am a bit confused about what is agreed/disagreed and what is a 
suggested better way, so let me try to clarify some of my thoughts and reason 
about why the API is changed in this way.
   
   As suggested by Stephan, In the PR, I do have a custom operator 
`StreamShuffleSink` and a custom transformation in `SinkTransformation` for the 
custom operator. As Arvid mentioned in the previous reviews, there are a lot of 
code duplications between `StreamShuffleSink` and `StreamSink`. 
   - That's true because they are very similar LOL, but we do want to provide a 
different operator to minimize the impact of changes on existing operators.
   - Personally, I do not prefer to have multi-levels of extends/subclasses, 
especially if the superclass is not abstract. Multi-level extensions make code 
very difficult to read. You can not easily track what functions/members a class 
contains in a straightforward way, especially without a good IDE. 
   - Come back to the duplication. There are in total 100 lines of code, with 
very simple logic. So personally I would prefer to trade these `100` lines of 
code for `readability`.
   
   `SinkFunction` as its name, is the function invoked in the sink operator to 
provide a invoke function to handle record. `FlinkKafkaProducer` itself is a 
TwoPhaseCommitSinkFunction which implements `SinkFunction`.
   If we really want to avoid changing `SinkFunction`, I can have a new 
interface and have the current TwoPhaseCommitSinkFunction implements the new 
interface. It should be safer than the current way, and also avoids conflicts 
if that's the concern.

   Please let me know what do you think of this proposal.
   
    2. `StreamElementSerializer`; 
   I can not simply use `StreamElementSerializer` because the way watermark is 
stored/handled is different. In short, if multiple sink subtasks write to the 
same partition (sink), we need a way to decide the watermark in the source 
(downstream operator from the shuffle perspective). 
   In the current netty shuffle service, we keep N channels and watermarks in 
each channel; while in this case, data and watermarks have been merged when 
writing to partitions.
   
   Please refer to Jira FLINK-15670 for discussion about watermark: 
   You can start from 
[here](https://issues.apache.org/jira/browse/FLINK-15670?focusedCommentId=17053232=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17053232)
 to save some time. It includes my original thoughts, proposals and Stephan's 
enhanced version.
   
    3. "Checkpoints" and "Savepoints"
   As far as I know, savepoints are very similar to checkpoints except that 
savepoints are more or less user-faced. That says user can trigger a replay 
based on save points. I guess I can kind of understanding why you are saying 
"restoring from an old savepoint would completely screw up the data". It is 
true if you think of this problem from a global snapshotting and global 
failover perspective. 
   
   However, let's step back and think of why we we want to have the persistent 
shuffle in the first place. If data is persisted, you do not really need to 
replay the calculation again. Persistency is to unleash the constraints between 
upstream and downstream.
   
   For your concern, we do not need to do a global replay as well. We can 
simply do a regional replay. If there is any constraints in implementation, we 
can disable it for now. In the long term, I do not see it is a problem.
   
   But, Maybe I misunderstand you :-)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] curcur edited a comment on pull request #11725: [FLINK-15670][API] Provide a Kafka Source/Sink pair as KafkaShuffle

2020-04-26 Thread GitBox


curcur edited a comment on pull request #11725:
URL: https://github.com/apache/flink/pull/11725#issuecomment-619703591


   Sorry to write this much again. These are all good questions that I think is 
more clear to answer in a systematic way :-). Let's chat in detail this 
afternoon.
   
     1. Change to `SinkFunction`. 
   By "coordination", do you mean that other changes are also made upon 
`SinkFunction` and may need coordination?
   
   I've expected reviewers to have strong reactions to the API change, that's 
fine. But I am a bit confused about what is agreed/disagreed and what is a 
suggested better way, so let me try to clarify some of my thoughts and reason 
about why the API is changed in this way.
   
   As suggested by Stephan, In the PR, I do have a custom operator 
`StreamShuffleSink` and a custom transformation in `SinkTransformation` for the 
custom operator. As Arvid mentioned in the previous reviews, there are a lot of 
code duplications between `StreamShuffleSink` and `StreamSink`. 
   - That's true because they are very similar LOL, but we do want to provide a 
different operator to minimize the impact of changes on existing operators.
   - Personally, I do not prefer to have multi-levels of extends/subclasses, 
especially if the superclass is not abstract. Multi-level extensions make code 
very difficult to read. You can not easily track what functions/members a class 
contains in a straightforward way, especially without a good IDE. 
   - Come back to the duplication. There are in total 100 lines of code, with 
very simple logic. So personally I would prefer to trade these `100` lines of 
code for `readability`.
   
   `SinkFunction` as its name, is the function invoked in the sink operator to 
provide a invoke function to handle record. `FlinkKafkaProducer` itself is a 
TwoPhaseCommitSinkFunction which implements `SinkFunction`.
   If we really want to avoid changing `SinkFunction`, I can have a new 
interface and have the current TwoPhaseCommitSinkFunction implements the new 
interface. It should be safer than the current way, and also avoids conflicts 
if that's the concern.

   Please let me know what do you think of this proposal.
   
    2. `StreamElementSerializer`; 
   I can not simply use `StreamElementSerializer` because the way watermark is 
stored/handled is different. In short, if multiple sink subtasks write to the 
same partition (sink), we need a way to decide the watermark in the source 
(downstream operator from the shuffle perspective). 
   In the current netty shuffle service, we keep N channels and watermarks in 
each channel; while in this case, data and watermarks have been merged when 
writing to partitions.
   
   Please refer to Jira FLINK-15670 for discussion about watermark: 
   You can start from 
[here](https://issues.apache.org/jira/browse/FLINK-15670?focusedCommentId=17053232=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17053232)
   
   It includes my original thoughts, proposals and Stephan's enhanced version.
   
    3. "Checkpoints" and "Savepoints"
   As far as I know, savepoints are very similar to checkpoints except that 
savepoints are more or less user-faced. That says user can trigger a replay 
based on save points. I guess I can kind of understanding why you are saying 
"restoring from an old savepoint would completely screw up the data". It is 
true if you think of this problem from a global snapshotting and global 
failover perspective. 
   
   However, let's step back and think of why we we want to have the persistent 
shuffle in the first place. If data is persisted, you do not really need to 
replay the calculation again. Persistency is to unleash the constraints between 
upstream and downstream.
   
   For your concern, we do not need to do a global replay as well. We can 
simply do a regional replay. If there is any constraints in implementation, we 
can disable it for now. In the long term, I do not see it is a problem.
   
   But, Maybe I misunderstand you :-)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] curcur edited a comment on pull request #11725: [FLINK-15670][API] Provide a Kafka Source/Sink pair as KafkaShuffle

2020-04-26 Thread GitBox


curcur edited a comment on pull request #11725:
URL: https://github.com/apache/flink/pull/11725#issuecomment-619703591


   Sorry to write this much again. These are all good questions that I think is 
more clear to answer in a systematic way :-). Let's chat in detail this 
afternoon.
   
     1. Change to `SinkFunction`. 
   By "coordination", do you mean that other changes are also made upon 
`SinkFunction` and may need coordination?
   
   I've expected reviewers to have strong reactions to the API change, that's 
fine. But I am a bit confused about what is agreed/disagreed and what is a 
suggested better way, so let me try to clarify some of my thoughts and reason 
about why the API is changed in this way.
   
   As suggested by Stephan, In the PR, I do have a custom operator 
`StreamShuffleSink` and a custom transformation in `SinkTransformation` for the 
custom operator. As Arvid mentioned in the previous reviews, there are a lot of 
code duplications between `StreamShuffleSink` and `StreamSink`. 
   - That's true because they are very similar LOL, but we do want to provide a 
different operator to minimize the impact of changes on existing operators.
   - Personally, I do not prefer to have multi-levels of extends/subclasses, 
especially if the superclass is not abstract. Multi-level extensions make code 
very difficult to read. You can not easily track what functions/members a class 
contains in a straightforward way, especially without a good IDE. 
   - Come back to the duplication. There are in total 100 lines of code, with 
very simple logic. So personally I would prefer to trade these `100` lines of 
code for `readability`.
   
   `SinkFunction` as its name, is the function invoked in the sink operator to 
provide a invoke function to handle record. `FlinkKafkaProducer` itself is a 
TwoPhaseCommitSinkFunction which implements `SinkFunction`.
   If we really want to avoid changing `SinkFunction`, I can have a new 
interface and have the current TwoPhaseCommitSinkFunction implements the new 
interface. It should be safer than the current way, and also avoids conflicts 
if that's the concern.

   Please let me know what do you think of this proposal.
   
    2. `StreamElementSerializer`; 
   I can not simply use `StreamElementSerializer` because the way watermark is 
stored/handled is different. In short, if multiple sink subtasks write to the 
same partition (sink), we need a way to decide the watermark in the source 
(downstream operator from the shuffle perspective). 
   In the current netty shuffle service, we keep N channels and watermarks in 
each channel; while in this case, data and watermarks have been merged when 
writing to partitions.
   
   Please refer to Jira FLINK-15670 for discussion about watermark: 
https://issues.apache.org/jira/browse/FLINK-15670
   
   You can start from 
[here](https://issues.apache.org/jira/browse/FLINK-15670?focusedCommentId=17053232=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17053232)
   
   It includes my original thoughts, proposals and Stephan's enhanced version.
   
    3. "Checkpoints" and "Savepoints"
   As far as I know, savepoints are very similar to checkpoints except that 
savepoints are more or less user-faced. That says user can trigger a replay 
based on save points. I guess I can kind of understanding why you are saying 
"restoring from an old savepoint would completely screw up the data". It is 
true if you think of this problem from a global snapshotting and global 
failover perspective. 
   
   However, let's step back and think of why we we want to have the persistent 
shuffle in the first place. If data is persisted, you do not really need to 
replay the calculation again. Persistency is to unleash the constraints between 
upstream and downstream.
   
   For your concern, we do not need to do a global replay as well. We can 
simply do a regional replay. If there is any constraints in implementation, we 
can disable it for now. In the long term, I do not see it is a problem.
   
   But, Maybe I misunderstand you :-)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] curcur edited a comment on pull request #11725: [FLINK-15670][API] Provide a Kafka Source/Sink pair as KafkaShuffle

2020-04-26 Thread GitBox


curcur edited a comment on pull request #11725:
URL: https://github.com/apache/flink/pull/11725#issuecomment-619703591


     1. Change to `SinkFunction`. 
   By "coordination", do you mean that other changes are also made upon 
`SinkFunction` and may need coordination?
   
   I've expected reviewers to have strong reactions to the API change, that's 
fine. But I am a bit confused about what is agreed/disagreed and what is a 
suggested better way, so let me try to clarify some of my thoughts and reason 
about why the API is changed in this way.
   
   As suggested by Stephan, In the PR, I do have a custom operator 
`StreamShuffleSink` and a custom transformation in `SinkTransformation` for the 
custom operator. As Arvid mentioned in the previous reviews, there are a lot of 
code duplications between `StreamShuffleSink` and `StreamSink`. 
   - That's true because they are very similar LOL, but we do want to provide a 
different operator to minimize the impact of changes on existing operators.
   - Personally, I do not prefer to have multi-levels of extends/subclasses, 
especially if the superclass is not abstract. Multi-level extensions make code 
very difficult to read. You can not easily track what functions/members a class 
contains in a straightforward way, especially without a good IDE. 
   - Come back to the duplication. There are in total 100 lines of code, with 
very simple logic. So personally I would prefer to trade these `100` lines of 
code for `readability`.
   
   `SinkFunction` as its name, is the function invoked in the sink operator to 
provide a invoke function to handle record. `FlinkKafkaProducer` itself is a 
TwoPhaseCommitSinkFunction which implements `SinkFunction`.
   If we really want to avoid changing `SinkFunction`, I can have a new 
interface and have the current TwoPhaseCommitSinkFunction implements the new 
interface. It should be safer than the current way, and also avoids conflicts 
if that's the concern.

   Please let me know what do you think of this proposal.
   
    2. `StreamElementSerializer`; 
   I can not simply use `StreamElementSerializer` because the way watermark is 
stored/handled is different. In short, if multiple sink subtasks write to the 
same partition (sink), we need a way to decide the watermark in the source 
(downstream operator from the shuffle perspective). 
   In the current netty shuffle service, we keep N channels and watermarks in 
each channel; while in this case, data and watermarks have been merged when 
writing to partitions.
   
   Please refer to Jira FLINK-15670 for discussion about watermark: 
https://issues.apache.org/jira/browse/FLINK-15670
   
   You can start from 
[here](https://issues.apache.org/jira/browse/FLINK-15670?focusedCommentId=17053232=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17053232)
   
   It includes my original thoughts, proposals and Stephan's enhanced version.
   
    3. "Checkpoints" and "Savepoints"
   As far as I know, savepoints are very similar to checkpoints except that 
savepoints are more or less user-faced. That says user can trigger a replay 
based on save points. I guess I can kind of understanding why you are saying 
"restoring from an old savepoint would completely screw up the data". It is 
true if you think of this problem from a global snapshotting and global 
failover perspective. 
   
   However, let's step back and think of why we we want to have the persistent 
shuffle in the first place. If data is persisted, you do not really need to 
replay the calculation again. Persistency is to unleash the constraints between 
upstream and downstream.
   
   For your concern, we do not need to do a global replay as well. We can 
simply do a regional replay. If there is any constraints in implementation, we 
can disable it for now. In the long term, I do not see it is a problem.
   
   But, Maybe I misunderstand you :-)
   
   Sorry to write this much again. These are all good questions that I think is 
more clear to answer in a systematic way :-). Let's chat in details this 
afternoon. 
   
   
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] curcur edited a comment on pull request #11725: [FLINK-15670][API] Provide a Kafka Source/Sink pair as KafkaShuffle

2020-04-26 Thread GitBox


curcur edited a comment on pull request #11725:
URL: https://github.com/apache/flink/pull/11725#issuecomment-619703591


   ## 1. Change to `SinkFunction`. 
   By "coordination", do you mean that other changes are also made upon 
`SinkFunction` and may need coordination?
   
   I've expected reviewers to have strong reactions to the API change, that's 
fine. But I am a bit confused about what is agreed/disagreed and what is a 
suggested better way, so let me try to clarify some of my thoughts and reason 
about why the API is changed in this way.
   
   As suggested by Stephan, In the PR, I do have a custom operator 
`StreamShuffleSink` and a custom transformation in `SinkTransformation` for the 
custom operator. As Arvid mentioned in the previous reviews, there are a lot of 
code duplications between `StreamShuffleSink` and `StreamSink`. 
   - That's true because they are very similar LOL, but we do want to provide a 
different operator to minimize the impact of changes on existing operators.
   - Personally, I do not prefer to have multi-levels of extends/subclasses, 
especially if the superclass is not abstract. Multi-level extensions make code 
very difficult to read. You can not easily track what functions/members a class 
contains in a straightforward way, especially without a good IDE. 
   - Come back to the duplication. There are in total 100 lines of code, with 
very simple logic. So personally I would prefer to trade these `100` lines of 
code for `readability`.
   
   `SinkFunction` as its name, is the function invoked in the sink operator to 
provide a invoke function to handle record. `FlinkKafkaProducer` itself is a 
TwoPhaseCommitSinkFunction which implements `SinkFunction`.
   If we really want to avoid changing `SinkFunction`, I can have a new 
interface and have the current TwoPhaseCommitSinkFunction implements the new 
interface. It should be safer than the current way, and also avoids conflicts 
if that's the concern.

   Please let me know what do you think of this proposal.
   
   ## 2. `StreamElementSerializer`; 
   I can not simply use `StreamElementSerializer` because the way watermark is 
stored/handled is different. In short, if multiple sink subtasks write to the 
same partition (sink), we need a way to decide the watermark in the source 
(downstream operator from the shuffle perspective). 
   In the current netty shuffle service, we keep N channels and watermarks in 
each channel; while in this case, data and watermarks have been merged when 
writing to partitions.
   
   Please refer to Jira FLINK-15670 for discussion about watermark: 
https://issues.apache.org/jira/browse/FLINK-15670
   
   You can start from 
[here](https://issues.apache.org/jira/browse/FLINK-15670?focusedCommentId=17053232=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17053232)
   
   It includes my original thoughts, proposals and Stephan's enhanced version.
   
   ## 3. "Checkpoints" and "Savepoints"
   As far as I know, savepoints are very similar to checkpoints except that 
savepoints are more or less user-faced. That says user can trigger a replay 
based on save points. I guess I can kind of understanding why you are saying 
"restoring from an old savepoint would completely screw up the data". It is 
true if you think of this problem from a global snapshotting and global 
failover perspective. 
   
   However, let's step back and think of why we we want to have the persistent 
shuffle in the first place. If data is persisted, you do not really need to 
replay the calculation again. Persistency is to unleash the constraints between 
upstream and downstream.
   
   For your concern, we do not need to do a global replay as well. We can 
simply do a regional replay. If there is any constraints in implementation, we 
can disable it for now. In the long term, I do not see it is a problem.
   
   But, Maybe I misunderstand you :-)
   
   Sorry to write this much again. These are all good questions that I think is 
more clear to answer in a systematic way :-). Let's chat in details this 
afternoon. 
   
   
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] curcur edited a comment on pull request #11725: [FLINK-15670][API] Provide a Kafka Source/Sink pair as KafkaShuffle

2020-04-26 Thread GitBox


curcur edited a comment on pull request #11725:
URL: https://github.com/apache/flink/pull/11725#issuecomment-619703591


   ###  1. Change to `SinkFunction`. 
   By "coordination", do you mean that other changes are also made upon 
`SinkFunction` and may need coordination?
   
   I've expected reviewers to have strong reactions to the API change, that's 
fine. But I am a bit confused about what is agreed/disagreed and what is a 
suggested better way, so let me try to clarify some of my thoughts and reason 
about why the API is changed in this way.
   
   As suggested by Stephan, In the PR, I do have a custom operator 
`StreamShuffleSink` and a custom transformation in `SinkTransformation` for the 
custom operator. As Arvid mentioned in the previous reviews, there are a lot of 
code duplications between `StreamShuffleSink` and `StreamSink`. 
   - That's true because they are very similar LOL, but we do want to provide a 
different operator to minimize the impact of changes on existing operators.
   - Personally, I do not prefer to have multi-levels of extends/subclasses, 
especially if the superclass is not abstract. Multi-level extensions make code 
very difficult to read. You can not easily track what functions/members a class 
contains in a straightforward way, especially without a good IDE. 
   - Come back to the duplication. There are in total 100 lines of code, with 
very simple logic. So personally I would prefer to trade these `100` lines of 
code for `readability`.
   
   `SinkFunction` as its name, is the function invoked in the sink operator to 
provide a invoke function to handle record. `FlinkKafkaProducer` itself is a 
TwoPhaseCommitSinkFunction which implements `SinkFunction`.
   If we really want to avoid changing `SinkFunction`, I can have a new 
interface and have the current TwoPhaseCommitSinkFunction implements the new 
interface. It should be safer than the current way, and also avoids conflicts 
if that's the concern.

   Please let me know what do you think of this proposal.
   
   ### 2. `StreamElementSerializer`; 
   I can not simply use `StreamElementSerializer` because the way watermark is 
stored/handled is different. In short, if multiple sink subtasks write to the 
same partition (sink), we need a way to decide the watermark in the source 
(downstream operator from the shuffle perspective). 
   In the current netty shuffle service, we keep N channels and watermarks in 
each channel; while in this case, data and watermarks have been merged when 
writing to partitions.
   
   Please refer to Jira FLINK-15670 for discussion about watermark: 
https://issues.apache.org/jira/browse/FLINK-15670
   
   You can start from 
[here](https://issues.apache.org/jira/browse/FLINK-15670?focusedCommentId=17053232=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17053232)
   
   It includes my original thoughts, proposals and Stephan's enhanced version.
   
   ### 3. "Checkpoints" and "Savepoints"
   As far as I know, savepoints are very similar to checkpoints except that 
savepoints are more or less user-faced. That says user can trigger a replay 
based on save points. I guess I can kind of understanding why you are saying 
"restoring from an old savepoint would completely screw up the data". It is 
true if you think of this problem from a global snapshotting and global 
failover perspective. 
   
   However, let's step back and think of why we we want to have the persistent 
shuffle in the first place. If data is persisted, you do not really need to 
replay the calculation again. Persistency is to unleash the constraints between 
upstream and downstream.
   
   For your concern, we do not need to do a global replay as well. We can 
simply do a regional replay. If there is any constraints in implementation, we 
can disable it for now. In the long term, I do not see it is a problem.
   
   But, Maybe I misunderstand you :-)
   
   Sorry to write this much again. These are all good questions that I think is 
more clear to answer in a systematic way :-). Let's chat in details this 
afternoon. 
   
   
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] curcur commented on pull request #11725: [FLINK-15670][API] Provide a Kafka Source/Sink pair as KafkaShuffle

2020-04-26 Thread GitBox


curcur commented on pull request #11725:
URL: https://github.com/apache/flink/pull/11725#issuecomment-619703591


   # 1. Change to `SinkFunction`. 
   By "coordination", do you mean that other changes are also made upon 
`SinkFunction` and may need coordination?
   
   I've expected reviewers to have strong reactions to the API change, that's 
fine. But I am a bit confused about what is agreed/disagreed and what is a 
suggested better way, so let me try to clarify some of my thoughts and reason 
about why the API is changed in this way.
   
   As suggested by Stephan, In the PR, I do have a custom operator 
`StreamShuffleSink` and a custom transformation in `SinkTransformation` for the 
custom operator. As Arvid mentioned in the previous reviews, there are a lot of 
code duplications between `StreamShuffleSink` and `StreamSink`. 
   - That's true because they are very similar LOL, but we do want to provide a 
different operator to minimize the impact of changes on existing operators.
   - Personally, I do not prefer to have multi-levels of extends/subclasses, 
especially if the superclass is not abstract. Multi-level extensions make code 
very difficult to read. You can not easily track what functions/members a class 
contains in a straightforward way, especially without a good IDE. 
   - Come back to the duplication. There are in total 100 lines of code, with 
very simple logic. So personally I would prefer to trade these `100` lines of 
code for `readability`.
   
   `SinkFunction` as its name, is the function invoked in the sink operator to 
provide a invoke function to handle record. `FlinkKafkaProducer` itself is a 
TwoPhaseCommitSinkFunction which implements `SinkFunction`.
   If we really want to avoid changing `SinkFunction`, I can have a new 
interface and have the current TwoPhaseCommitSinkFunction implements the new 
interface. It should be safer than the current way, and also avoids conflicts 
if that's the concern.

   Please let me know what do you think of this proposal.
   
   # 2. `StreamElementSerializer`; 
   I can not simply use `StreamElementSerializer` because the way watermark is 
stored/handled is different. In short, if multiple sink subtasks write to the 
same partition (sink), we need a way to decide the watermark in the source 
(downstream operator from the shuffle perspective). 
   In the current netty shuffle service, we keep N channels and watermarks in 
each channel; while in this case, data and watermarks have been merged when 
writing to partitions.
   
   Please refer to Jira FLINK-15670 for discussion about watermark: 
https://issues.apache.org/jira/browse/FLINK-15670
   
   You can start from 
[here](https://issues.apache.org/jira/browse/FLINK-15670?focusedCommentId=17053232=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17053232)
   
   It includes my original thoughts, proposals and Stephan's enhanced version.
   
   # 3. "Checkpoints" and "Savepoints"
   As far as I know, savepoints are very similar to checkpoints except that 
savepoints are more or less user-faced. That says user can trigger a replay 
based on save points. I guess I can kind of understanding why you are saying 
"restoring from an old savepoint would completely screw up the data". It is 
true if you think of this problem from a global snapshotting and global 
failover perspective. 
   
   However, let's step back and think of why we we want to have the persistent 
shuffle in the first place. If data is persisted, you do not really need to 
replay the calculation again. Persistency is to unleash the constraints between 
upstream and downstream.
   
   For your concern, we do not need to do a global replay as well. We can 
simply do a regional replay. If there is any constraints in implementation, we 
can disable it for now. In the long term, I do not see it is a problem.
   
   But, Maybe I misunderstand you :-)
   
   Sorry to write this much again. These are all good questions that I think is 
more clear to answer in a systematic way :-). Let's chat in details this 
afternoon. 
   
   
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #11876: [FLINK-17334] HIVE UDF BUGFIX

2020-04-26 Thread GitBox


flinkbot edited a comment on pull request #11876:
URL: https://github.com/apache/flink/pull/11876#issuecomment-618262946


   
   ## CI report:
   
   * 5dc081058c881b0165b44c37e53607ac891814f5 Travis: 
[FAILURE](https://travis-ci.com/github/flink-ci/flink/builds/161774926) Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=170)
 
   * 2a7b13805feb162cf26a374609d6e03af6a58990 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot commented on pull request #11916: [FLINK-17390][yarn] Fix Hadoop 2.10+ compatibility for WorkerSpecContainerResourceAdapter.

2020-04-26 Thread GitBox


flinkbot commented on pull request #11916:
URL: https://github.com/apache/flink/pull/11916#issuecomment-619702040


   
   ## CI report:
   
   * 92fb1fc93dd94819389bdedf93a13f2dc31397c7 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #11915: [FLINK-17395][python] Add the sign and sha logic for PyFlink wheel packages

2020-04-26 Thread GitBox


flinkbot edited a comment on pull request #11915:
URL: https://github.com/apache/flink/pull/11915#issuecomment-619697532


   
   ## CI report:
   
   * dc1358b12a72c8fb87289083c585dd8345f41d86 Travis: 
[PENDING](https://travis-ci.com/github/flink-ci/flink/builds/162143714) Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=278)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #11791: [FLINK-17210][sql-parser][hive] Implement database DDLs for Hive dialect

2020-04-26 Thread GitBox


flinkbot edited a comment on pull request #11791:
URL: https://github.com/apache/flink/pull/11791#issuecomment-615162346


   
   ## CI report:
   
   * a6a3a9e51fd9ba484d1ab4ccd0646dcacc0692ff Travis: 
[SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/162072493) Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=271)
 
   * 5ce52e3cda62c629df16d95ccb345d830b3a3531 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] KarmaGYZ commented on a change in pull request #11916: [FLINK-17390][yarn] Fix Hadoop 2.10+ compatibility for WorkerSpecContainerResourceAdapter.

2020-04-26 Thread GitBox


KarmaGYZ commented on a change in pull request #11916:
URL: https://github.com/apache/flink/pull/11916#discussion_r415494145



##
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/WorkerSpecContainerResourceAdapter.java
##
@@ -128,12 +142,61 @@ private int normalize(final int value, final int 
unitValue) {
return MathUtils.divideRoundUp(value, unitValue) * unitValue;
}
 
-   boolean resourceWithinMaxAllocation(final Resource resource) {
-   return resource.getMemory() <= maxMemMB && 
resource.getVirtualCores() <= maxVcore;
+   boolean resourceWithinMaxAllocation(final InternalContainerResource 
resource) {
+   return resource.memory <= maxMemMB && resource.vcores <= 
maxVcore;
}
 
enum MatchingStrategy {
MATCH_VCORE,
IGNORE_VCORE
}
+
+   /**
+* An {@link InternalContainerResource} corresponds to a {@link 
Resource}.
+* This class is for {@link WorkerSpecContainerResourceAdapter} 
internal usages only, to overcome the problem that
+* hash codes are calculated inconsistently across different {@link 
Resource} implementations.
+*/
+   private class InternalContainerResource {
+   private final int memory;
+   private final int vcores;
+
+   private InternalContainerResource(final int memory, final int 
vcores) {
+   this.memory = memory;
+   this.vcores = vcores;
+   }
+
+   private InternalContainerResource(final Resource resource) {
+   this(
+   
Preconditions.checkNotNull(resource).getMemory(),
+   
Preconditions.checkNotNull(resource).getVirtualCores());
+   }
+
+   private Resource toResource() {
+   return Resource.newInstance(memory, vcores);
+   }
+
+   @Override
+   public boolean equals(Object obj) {
+   if (obj == this) {
+   return true;
+   } else if (obj != null && obj.getClass() == 
InternalContainerResource.class) {

Review comment:
   It might be better to replace "InternalContainerResource.class" with 
"getClass()".





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] wangyang0918 edited a comment on pull request #11839: [FLINK-17166][dist] Modify the log4j-console.properties to also output logs into the files for WebUI

2020-04-26 Thread GitBox


wangyang0918 edited a comment on pull request #11839:
URL: https://github.com/apache/flink/pull/11839#issuecomment-619471329


   @zentol Thanks a lot for your review.
   
   I agree with you that introducing the stream redirections may take some 
problems. So we need to be very careful to enable the redirection via 
`StdOutErrRedirector.redirectStdOutErr`. Currently, i only see the requirements 
when we want to output the stdout/err to log files and console at the same 
time. Also i do not have seen a better solution. Using the `tee` will make us 
have to use the `grep` to filter out the log4j loggings. Just as the discussion 
in the 
[ticket](https://issues.apache.org/jira/browse/FLINK-17166?focusedCommentId=17087643=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17087643).
   
   Not all entrypoints need to do redirect the streams. Currently, i think only 
the docker environment deployment need(e.g. standalone, K8s). We could put it 
in the first line in the main of `XXXEntrypoint` and i do not find a logging 
system cache problem here. As you have said that the logging framework will 
cache the `OutputStream` at creation time, could you share me more insights 
here?
   
   The most important concerns when i have to introduce the stream redirection 
is not about the mixing the `.out` and `.err` file. It is we have to filter out 
the log4j logging from the stdout/err when using `tee`. And the filtering is 
not a stable and good solution.
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] godfreyhe commented on a change in pull request #11862: [FLINK-17251] [table] supports INSERT statement in TableEnvironment#executeSql and Table#executeInsert api

2020-04-26 Thread GitBox


godfreyhe commented on a change in pull request #11862:
URL: https://github.com/apache/flink/pull/11862#discussion_r415491412



##
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentInternal.java
##
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.internal;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.catalog.CatalogManager;
+import org.apache.flink.table.delegation.Parser;
+import org.apache.flink.table.operations.ModifyOperation;
+
+import java.util.List;
+
+/**
+ * An internal interface of {@link TableEnvironment}
+ * that defines extended methods used for {@link TableImpl}.
+ */
+@Internal
+public interface TableEnvironmentInternal extends TableEnvironment {
+
+   /**
+* Return a {@link Parser} that provides methods for parsing a SQL 
string.
+*
+* @return initialized {@link Parser}.
+*/
+   Parser getParser();
+
+   /**
+* Returns a {@link CatalogManager} that deals with all catalog objects.
+*/
+   CatalogManager getCatalogManager();
+
+   /**
+* Execute the given operations and return the execution result.
+*
+* @param operations The operations to be executed.
+* @return the affected row counts (-1 means unknown).
+*/
+   TableResult executeOperations(List operations);

Review comment:
   we may support update operation in the future which also extends from 
`ModifyOperation`. How about rename it to `execute(List 
operations)` ?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] godfreyhe commented on a change in pull request #11862: [FLINK-17251] [table] supports INSERT statement in TableEnvironment#executeSql and Table#executeInsert api

2020-04-26 Thread GitBox


godfreyhe commented on a change in pull request #11862:
URL: https://github.com/apache/flink/pull/11862#discussion_r415490512



##
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentInternal.java
##
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.internal;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.catalog.CatalogManager;
+import org.apache.flink.table.delegation.Parser;
+import org.apache.flink.table.operations.ModifyOperation;
+
+import java.util.List;
+
+/**
+ * An internal interface of {@link TableEnvironment}
+ * that defines extended methods used for {@link TableImpl}.
+ */
+@Internal
+public interface TableEnvironmentInternal extends TableEnvironment {

Review comment:
   `@Internal` is weak constraint, If users do not look into source code, 
he/she even does not find whether a method is internal method. These internal 
methods may be unstable, I trend to we do not expose them into public interface.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot commented on pull request #11915: [FLINK-17395][python] Add the sign and sha logic for PyFlink wheel packages

2020-04-26 Thread GitBox


flinkbot commented on pull request #11915:
URL: https://github.com/apache/flink/pull/11915#issuecomment-619697532


   
   ## CI report:
   
   * dc1358b12a72c8fb87289083c585dd8345f41d86 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #11860: [FLINK-17311][python][AZP] Add the logic of compressed in tgz before uploading artifacts

2020-04-26 Thread GitBox


flinkbot edited a comment on pull request #11860:
URL: https://github.com/apache/flink/pull/11860#issuecomment-617666482


   
   ## CI report:
   
   * c94cfb4fb285019fc788b683f006b3bd233da1e0 Travis: 
[CANCELED](https://travis-ci.com/github/flink-ci/flink/builds/162139324) Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=277)
 
   * db62ebb5051ceaadac054238d6c82d7f2a72 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] lirui-apache commented on a change in pull request #11791: [FLINK-17210][sql-parser][hive] Implement database DDLs for Hive dialect

2020-04-26 Thread GitBox


lirui-apache commented on a change in pull request #11791:
URL: https://github.com/apache/flink/pull/11791#discussion_r415487853



##
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/FlinkSqlParserImplFactory.java
##
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.delegation;
+
+import org.apache.flink.sql.parser.hive.impl.FlinkHiveSqlParserImpl;
+import org.apache.flink.sql.parser.impl.FlinkSqlParserImpl;
+import org.apache.flink.sql.parser.validate.FlinkSqlConformance;
+
+import org.apache.calcite.sql.parser.SqlAbstractParserImpl;
+import org.apache.calcite.sql.parser.SqlParserImplFactory;
+import org.apache.calcite.sql.validate.SqlConformance;
+
+import java.io.Reader;
+
+/**
+ * A SqlParserImplFactory that creates the parser according to SqlConformance.
+ */
+public class FlinkSqlParserImplFactory implements SqlParserImplFactory {
+
+   private final SqlConformance conformance;
+
+   public FlinkSqlParserImplFactory(SqlConformance conformance) {
+   this.conformance = conformance;
+   }
+
+   @Override
+   public SqlAbstractParserImpl getParser(Reader stream) {
+   if (conformance == FlinkSqlConformance.HIVE) {
+   return FlinkHiveSqlParserImpl.FACTORY.getParser(stream);
+   } else {
+   return FlinkSqlParserImpl.FACTORY.getParser(stream);
+   }
+   }

Review comment:
   OK, sounds good.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] Ruan-Xin commented on pull request #11876: [FLINK-17334] HIVE UDF BUGFIX

2020-04-26 Thread GitBox


Ruan-Xin commented on pull request #11876:
URL: https://github.com/apache/flink/pull/11876#issuecomment-619694919


   @flinkbot run travis re-run the last Travis build
   @flinkbot run azure re-run the last Azure build



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (FLINK-17398) Filesystem support flexible path reading

2020-04-26 Thread Jingsong Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17398?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee updated FLINK-17398:
-
Summary: Filesystem support flexible path reading  (was: Support flexible 
path reading)

> Filesystem support flexible path reading
> 
>
> Key: FLINK-17398
> URL: https://issues.apache.org/jira/browse/FLINK-17398
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Jingsong Lee
>Priority: Major
>
> Like:
>  * Single file reading
>  * wildcard path reading



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17398) Support flexible path reading

2020-04-26 Thread Jingsong Lee (Jira)
Jingsong Lee created FLINK-17398:


 Summary: Support flexible path reading
 Key: FLINK-17398
 URL: https://issues.apache.org/jira/browse/FLINK-17398
 Project: Flink
  Issue Type: Sub-task
Reporter: Jingsong Lee


Like:
 * Single file reading
 * wildcard path reading



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] godfreyhe commented on a change in pull request #11892: [FLINK-17112][table] Support DESCRIBE statement in Flink SQL

2020-04-26 Thread GitBox


godfreyhe commented on a change in pull request #11892:
URL: https://github.com/apache/flink/pull/11892#discussion_r415486982



##
File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
##
@@ -721,6 +721,15 @@ abstract class TableEnvImpl(
 dropViewOperation.isIfExists)
 }
 TableResultImpl.TABLE_RESULT_OK
+  case descOperation: DescribeTableOperation =>
+val result = catalogManager.getTable(descOperation.getSqlIdentifier)
+if (result.isPresent) {
+  buildShowResult(Array(result.get().getTable.getSchema.toString))

Review comment:
   I find SQL parser support column comments 
([see](https://github.com/apache/flink/blob/master/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java#L225)),
 but we do not store the comments into TableSchema. we ignore `comment` column.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot commented on pull request #11916: [FLINK-17390][yarn] Fix Hadoop 2.10+ compatibility for WorkerSpecContainerResourceAdapter.

2020-04-26 Thread GitBox


flinkbot commented on pull request #11916:
URL: https://github.com/apache/flink/pull/11916#issuecomment-619694310


   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Automated Checks
   Last check on commit 92fb1fc93dd94819389bdedf93a13f2dc31397c7 (Mon Apr 27 
03:36:23 UTC 2020)
   
   **Warnings:**
* No documentation files were touched! Remember to keep the Flink docs up 
to date!
* **This pull request references an unassigned [Jira 
ticket](https://issues.apache.org/jira/browse/FLINK-17390).** According to the 
[code contribution 
guide](https://flink.apache.org/contributing/contribute-code.html), tickets 
need to be assigned before starting with the implementation work.
   
   
   Mention the bot in a comment to re-run the automated checks.
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full 
explanation of the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (FLINK-17397) Filesystem support lookup table source

2020-04-26 Thread Jingsong Lee (Jira)
Jingsong Lee created FLINK-17397:


 Summary: Filesystem support lookup table source
 Key: FLINK-17397
 URL: https://issues.apache.org/jira/browse/FLINK-17397
 Project: Flink
  Issue Type: Sub-task
  Components: Connectors / FileSystem
Reporter: Jingsong Lee






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-17390) Container resource cannot be mapped on Hadoop 2.10+

2020-04-26 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17390?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-17390:
---
Labels: pull-request-available  (was: )

> Container resource cannot be mapped on Hadoop 2.10+
> ---
>
> Key: FLINK-17390
> URL: https://issues.apache.org/jira/browse/FLINK-17390
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / YARN
>Affects Versions: 1.11.0
>Reporter: Xintong Song
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.11.0
>
>
> In FLINK-16438, we introduced {{WorkerSpecContainerResourceAdapter}} for 
> mapping Yarn container {{Resource}} with Flink {{WorkerResourceSpec}}. Inside 
> this class, we use {{Resource}} for hash map keys and set elements, assuming 
> that {{Resource}} instances that describes the same set of resources have the 
> same hash code.
> This assumption is not always true. {{Resource}} is an abstract class and may 
> have different implementations. In Hadoop 2.10+, {{LightWeightResource}}, a 
> new implementation of {{Resource}}, is introduced for {{Resource}} generated 
> by {{Resource.newInstance}} on the AM side, which overrides the {{hashCode}} 
> method. That means, a {{Resource}} generated on AM may have a different hash 
> code compared to an equal {{Resource}} returned from Yarn.
> To solve this problem, we may introduce an {{InternalResource}} as an inner 
> class of {{WorkerSpecContainerResourceAdapter}}, with {{hashCode}} method 
> depends only on the fields needed by Flink (ATM memroy and vcores). 
> {{WorkerSpecContainerResourceAdapter}} should only use {{InternalResource}} 
> for internal state management, and do conversions for {{Resource}} passed 
> into and returned from it.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] xintongsong opened a new pull request #11916: [FLINK-17390][yarn] Fix Hadoop 2.10+ compatibility for WorkerSpecContainerResourceAdapter.

2020-04-26 Thread GitBox


xintongsong opened a new pull request #11916:
URL: https://github.com/apache/flink/pull/11916


   ## What is the purpose of the change
   
   This PR fix the problem that requested/allocated containers cannot be 
matched on Hadoop 2.10+.
   
   The problem is due to relying on the hash code of the Hadoop abstract class 
`Resource` for mapping, which is calculated inconsistently by different 
`Resource` implementations.
   
   ## Verifying this change
   
   - Add 
`WorkerSpecContainerResourceAdapterTest#testMatchResourceWithDifferentImplementation`.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (yes)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] docete commented on a change in pull request #11892: [FLINK-17112][table] Support DESCRIBE statement in Flink SQL

2020-04-26 Thread GitBox


docete commented on a change in pull request #11892:
URL: https://github.com/apache/flink/pull/11892#discussion_r415484898



##
File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
##
@@ -721,6 +721,15 @@ abstract class TableEnvImpl(
 dropViewOperation.isIfExists)
 }
 TableResultImpl.TABLE_RESULT_OK
+  case descOperation: DescribeTableOperation =>
+val result = catalogManager.getTable(descOperation.getSqlIdentifier)
+if (result.isPresent) {
+  buildShowResult(Array(result.get().getTable.getSchema.toString))

Review comment:
   IMO the `comment` column is not needed since our SQL parser doesn't 
support column comments(We only support table comment now).





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot commented on pull request #11915: [FLINK-17395][python] Add the sign and sha logic for PyFlink wheel packages

2020-04-26 Thread GitBox


flinkbot commented on pull request #11915:
URL: https://github.com/apache/flink/pull/11915#issuecomment-619691350


   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Automated Checks
   Last check on commit dc1358b12a72c8fb87289083c585dd8345f41d86 (Mon Apr 27 
03:27:16 UTC 2020)
   
   **Warnings:**
* No documentation files were touched! Remember to keep the Flink docs up 
to date!
* **This pull request references an unassigned [Jira 
ticket](https://issues.apache.org/jira/browse/FLINK-17395).** According to the 
[code contribution 
guide](https://flink.apache.org/contributing/contribute-code.html), tickets 
need to be assigned before starting with the implementation work.
   
   
   Mention the bot in a comment to re-run the automated checks.
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full 
explanation of the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (FLINK-17146) Support conversion between PyFlink Table and Pandas DataFrame

2020-04-26 Thread Dian Fu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17146?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dian Fu updated FLINK-17146:

Fix Version/s: 1.11.0

> Support conversion between PyFlink Table and Pandas DataFrame
> -
>
> Key: FLINK-17146
> URL: https://issues.apache.org/jira/browse/FLINK-17146
> Project: Flink
>  Issue Type: New Feature
>  Components: API / Python
>Reporter: Dian Fu
>Assignee: Dian Fu
>Priority: Major
> Fix For: 1.11.0
>
>
> Pandas dataframe is the de-facto standard to work with tabular data in Python 
> community. PyFlink table is Flink’s representation of the tabular data in 
> Python language. It would be nice to provide the ability to convert between 
> the PyFlink table and Pandas dataframe in PyFlink Table API which has the 
> following benefits:
>  * It provides users the ability to switch between PyFlink and Pandas 
> seamlessly when processing data in Python language. Users could process data 
> using one execution engine and switch to another seamlessly. For example, it 
> may happen that users have already got a Pandas dataframe at hand and want to 
> perform some expensive transformation of it. Then they could convert it to a 
> PyFlink table and leverage the power of Flink engine. Users could also 
> convert a PyFlink table to Pandas dataframe and perform transformation of it 
> with the rich functionalities provided by the Pandas ecosystem.
>  * No intermediate connectors are needed when converting between them.
> More details could be found in 
> [FLIP-120|https://cwiki.apache.org/confluence/display/FLINK/FLIP-120%3A+Support+conversion+between+PyFlink+Table+and+Pandas+DataFrame].



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-17395) Add the sign and sha logic for PyFlink wheel packages

2020-04-26 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17395?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-17395:
---
Labels: pull-request-available  (was: )

> Add the sign and sha logic for PyFlink wheel packages
> -
>
> Key: FLINK-17395
> URL: https://issues.apache.org/jira/browse/FLINK-17395
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Python
>Reporter: Huang Xingbo
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.11.0
>
>
> Add the sign and sha logic for PyFlink wheel packages



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] HuangXingBo opened a new pull request #11915: [FLINK-17395][python] Add the sign and sha logic for PyFlink wheel packages

2020-04-26 Thread GitBox


HuangXingBo opened a new pull request #11915:
URL: https://github.com/apache/flink/pull/11915


   ## What is the purpose of the change
   
   *This pull request will Add the sign and sha logic for PyFlink wheel 
packages*
   
   
   ## Brief change log
   
 - *Add the sign and sha logic for PyFlink wheel packages in 
create_binary_release.sh*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (FLINK-17396) Support SHOW MODULES in Flink SQL

2020-04-26 Thread Caizhi Weng (Jira)
Caizhi Weng created FLINK-17396:
---

 Summary: Support SHOW MODULES in Flink SQL
 Key: FLINK-17396
 URL: https://issues.apache.org/jira/browse/FLINK-17396
 Project: Flink
  Issue Type: New Feature
  Components: Table SQL / API
Reporter: Caizhi Weng


We currently have this {{module}} concept but SQL users currently could not 
list all available modules. SQL client does have this support but it is parsed 
by its own parser.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on pull request #11860: [FLINK-17311][python][AZP] Add the logic of compressed in tgz before uploading artifacts

2020-04-26 Thread GitBox


flinkbot edited a comment on pull request #11860:
URL: https://github.com/apache/flink/pull/11860#issuecomment-617666482


   
   ## CI report:
   
   * 714c695075126b455af4b8cf4d04875d597342a2 Travis: 
[FAILURE](https://travis-ci.com/github/flink-ci/flink/builds/161799608) Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=189)
 
   * c94cfb4fb285019fc788b683f006b3bd233da1e0 Travis: 
[PENDING](https://travis-ci.com/github/flink-ci/flink/builds/162139324) Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=277)
 
   * db62ebb5051ceaadac054238d6c82d7f2a72 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #11839: [FLINK-17166][dist] Modify the log4j-console.properties to also output logs into the files for WebUI

2020-04-26 Thread GitBox


flinkbot edited a comment on pull request #11839:
URL: https://github.com/apache/flink/pull/11839#issuecomment-617042256


   
   ## CI report:
   
   * 5a26fbb56d996236c296328a6d673bc01982be5f Travis: 
[CANCELED](https://travis-ci.com/github/flink-ci/flink/builds/161221412) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7856)
 
   * ab5e8324f0ace63d1e5b3f292dd6d517b056fd21 UNKNOWN
   * 6e097b629bafde50b88d261cd856d624cf6f89c5 Travis: 
[PENDING](https://travis-ci.com/github/flink-ci/flink/builds/162139302) Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=276)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #11351: [FLINK-16404][runtime] Avoid caching buffers for blocked input channels before barrier alignment

2020-04-26 Thread GitBox


flinkbot edited a comment on pull request #11351:
URL: https://github.com/apache/flink/pull/11351#issuecomment-596351676


   
   ## CI report:
   
   * 715889a35cfcc3aaf1b17f39dadaa86f755cc75d UNKNOWN
   * 548b22258f5e87fd53b45c7f4bb6de40bfd4e6d2 UNKNOWN
   * 9179aab74eeaf80ea30c0894f3e5a0171338baed UNKNOWN
   * cd385c55e9dc111a061e19e2387f2ae9ce21369e UNKNOWN
   * e869d82f1aa639c02af4a82c5026f939bf4e8f6c UNKNOWN
   * f86372054c1c4724b8dabc8d06e369475e64ac29 UNKNOWN
   * a0c7ff983988f27f5e47f4c71c8fb1ef28f8a24a UNKNOWN
   * f2ba8a55cb8e441a1377b2cc00957e13e7445e47 UNKNOWN
   * 3f566d7286ffa73fb26b729d76ccb5129ca3f974 UNKNOWN
   * 0058feb1dfb56caaf7e4322948f5b9c05782f8c1 Travis: 
[SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/161384600) Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=42)
 
   * d05bfb1ccc492a22a738da35135e372f7f2c48dc Travis: 
[PENDING](https://travis-ci.com/github/flink-ci/flink/builds/162139169) Azure: 
[PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=275)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (FLINK-17311) Add the logic of compressed in tgz before uploading artifacts

2020-04-26 Thread Huang Xingbo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17311?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Huang Xingbo updated FLINK-17311:
-
Summary: Add the logic of compressed in tgz before uploading artifacts  
(was: Add executable permissions to the script in the wheel package)

> Add the logic of compressed in tgz before uploading artifacts
> -
>
> Key: FLINK-17311
> URL: https://issues.apache.org/jira/browse/FLINK-17311
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Python, Build System / Azure Pipelines
>Reporter: Huang Xingbo
>Assignee: Huang Xingbo
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.11.0
>
>
>  Script files in packages downloaded from Azure will lose executable 
> permissions [https://github.com/microsoft/azure-pipelines-tasks/issues/6364].
> So We need to add the logic of compressing the built result to tgz before 
> uploading artifacts



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-17311) Add executable permissions to the script in the wheel package

2020-04-26 Thread Huang Xingbo (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17311?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Huang Xingbo updated FLINK-17311:
-
Description: 
 Script files in packages downloaded from Azure will lose executable 
permissions [https://github.com/microsoft/azure-pipelines-tasks/issues/6364].

So We need to add the logic of compressing the built result to tgz before 
uploading artifacts

  was:
 Script files in packages downloaded from Azure will lose executable 
permissions [https://github.com/microsoft/azure-pipelines-tasks/issues/6364].

So We need to add a tool to add executable permissions to the script in the 
wheel package.


> Add executable permissions to the script in the wheel package
> -
>
> Key: FLINK-17311
> URL: https://issues.apache.org/jira/browse/FLINK-17311
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Python, Build System / Azure Pipelines
>Reporter: Huang Xingbo
>Assignee: Huang Xingbo
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.11.0
>
>
>  Script files in packages downloaded from Azure will lose executable 
> permissions [https://github.com/microsoft/azure-pipelines-tasks/issues/6364].
> So We need to add the logic of compressing the built result to tgz before 
> uploading artifacts



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] godfreyhe commented on a change in pull request #11892: [FLINK-17112][table] Support DESCRIBE statement in Flink SQL

2020-04-26 Thread GitBox


godfreyhe commented on a change in pull request #11892:
URL: https://github.com/apache/flink/pull/11892#discussion_r415473253



##
File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
##
@@ -721,6 +721,15 @@ abstract class TableEnvImpl(
 dropViewOperation.isIfExists)
 }
 TableResultImpl.TABLE_RESULT_OK
+  case descOperation: DescribeTableOperation =>
+val result = catalogManager.getTable(descOperation.getSqlIdentifier)
+if (result.isPresent) {
+  buildShowResult(Array(result.get().getTable.getSchema.toString))

Review comment:
   I find the print (through `TableResult#print`) result of `describe xx` 
looks a little strange:
   ```
   ++
   | result |
   ++
   |  root
|-- a: INT
|-- b: S... |
   ++
   1 row in set
   ```
   
   Users have to parse the unstructured result if he/she want to use the result 
to do sth through  TableResult#collect method.
   
   How about we return a structured tableau result, e.g. 
   ```
   +-+-+-+
   | name|   type  |   comment   |
   +-+-+-+
   |  a  |   INT   | |
   |  b  |  STRING | |
   +-+-+-+
2 rows in set
   ```
   This is different from the describe result in SQL client.
   
   Anther thing we should consider is how to print `watermarkSpecs` and compute 
column.
   How about we add a column named `expr` to represent `watermarkSpecs` and 
compute column.
   ```
 +-+-+-+-+
 | name| type|   comment   |   expr  |
 +-+-+-+-+
 |  a  |INT  | |  (NULL) |
 |  b  |INT  | |   a + 1 |
 |  c  |TIMESTAMP(3) | |  (NULL) |
 | WATERMARK   |  (NULL) | | c AS now()  |
 +-+-+-+-+
   ```
   

##
File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
##
@@ -721,6 +721,15 @@ abstract class TableEnvImpl(
 dropViewOperation.isIfExists)
 }
 TableResultImpl.TABLE_RESULT_OK
+  case descOperation: DescribeTableOperation =>
+val result = catalogManager.getTable(descOperation.getSqlIdentifier)
+if (result.isPresent) {
+  buildShowResult(Array(result.get().getTable.getSchema.toString))

Review comment:
   cc @twalthr 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (FLINK-17395) Add the sign and sha logic for PyFlink wheel packages

2020-04-26 Thread Huang Xingbo (Jira)
Huang Xingbo created FLINK-17395:


 Summary: Add the sign and sha logic for PyFlink wheel packages
 Key: FLINK-17395
 URL: https://issues.apache.org/jira/browse/FLINK-17395
 Project: Flink
  Issue Type: Sub-task
  Components: API / Python
Reporter: Huang Xingbo
 Fix For: 1.11.0


Add the sign and sha logic for PyFlink wheel packages



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on pull request #11860: [FLINK-17311][python] Add executable permissions to the script in the wheel package

2020-04-26 Thread GitBox


flinkbot edited a comment on pull request #11860:
URL: https://github.com/apache/flink/pull/11860#issuecomment-617666482


   
   ## CI report:
   
   * 714c695075126b455af4b8cf4d04875d597342a2 Travis: 
[FAILURE](https://travis-ci.com/github/flink-ci/flink/builds/161799608) Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=189)
 
   * c94cfb4fb285019fc788b683f006b3bd233da1e0 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (FLINK-17311) Add executable permissions to the script in the wheel package

2020-04-26 Thread Dian Fu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17311?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dian Fu updated FLINK-17311:

Component/s: Build System / Azure Pipelines

> Add executable permissions to the script in the wheel package
> -
>
> Key: FLINK-17311
> URL: https://issues.apache.org/jira/browse/FLINK-17311
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Python, Build System / Azure Pipelines
>Reporter: Huang Xingbo
>Assignee: Huang Xingbo
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.11.0
>
>
>  Script files in packages downloaded from Azure will lose executable 
> permissions [https://github.com/microsoft/azure-pipelines-tasks/issues/6364].
> So We need to add a tool to add executable permissions to the script in the 
> wheel package.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on pull request #11839: [FLINK-17166][dist] Modify the log4j-console.properties to also output logs into the files for WebUI

2020-04-26 Thread GitBox


flinkbot edited a comment on pull request #11839:
URL: https://github.com/apache/flink/pull/11839#issuecomment-617042256


   
   ## CI report:
   
   * 5a26fbb56d996236c296328a6d673bc01982be5f Travis: 
[CANCELED](https://travis-ci.com/github/flink-ci/flink/builds/161221412) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7856)
 
   * ab5e8324f0ace63d1e5b3f292dd6d517b056fd21 UNKNOWN
   * 6e097b629bafde50b88d261cd856d624cf6f89c5 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #11351: [FLINK-16404][runtime] Avoid caching buffers for blocked input channels before barrier alignment

2020-04-26 Thread GitBox


flinkbot edited a comment on pull request #11351:
URL: https://github.com/apache/flink/pull/11351#issuecomment-596351676


   
   ## CI report:
   
   * 715889a35cfcc3aaf1b17f39dadaa86f755cc75d UNKNOWN
   * 548b22258f5e87fd53b45c7f4bb6de40bfd4e6d2 UNKNOWN
   * 9179aab74eeaf80ea30c0894f3e5a0171338baed UNKNOWN
   * cd385c55e9dc111a061e19e2387f2ae9ce21369e UNKNOWN
   * e869d82f1aa639c02af4a82c5026f939bf4e8f6c UNKNOWN
   * f86372054c1c4724b8dabc8d06e369475e64ac29 UNKNOWN
   * a0c7ff983988f27f5e47f4c71c8fb1ef28f8a24a UNKNOWN
   * f2ba8a55cb8e441a1377b2cc00957e13e7445e47 UNKNOWN
   * 3f566d7286ffa73fb26b729d76ccb5129ca3f974 UNKNOWN
   * 0058feb1dfb56caaf7e4322948f5b9c05782f8c1 Travis: 
[SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/161384600) Azure: 
[FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=42)
 
   * d05bfb1ccc492a22a738da35135e372f7f2c48dc UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] danny0405 commented on a change in pull request #11791: [FLINK-17210][sql-parser][hive] Implement database DDLs for Hive dialect

2020-04-26 Thread GitBox


danny0405 commented on a change in pull request #11791:
URL: https://github.com/apache/flink/pull/11791#discussion_r415475644



##
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/FlinkSqlParserImplFactory.java
##
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.delegation;
+
+import org.apache.flink.sql.parser.hive.impl.FlinkHiveSqlParserImpl;
+import org.apache.flink.sql.parser.impl.FlinkSqlParserImpl;
+import org.apache.flink.sql.parser.validate.FlinkSqlConformance;
+
+import org.apache.calcite.sql.parser.SqlAbstractParserImpl;
+import org.apache.calcite.sql.parser.SqlParserImplFactory;
+import org.apache.calcite.sql.validate.SqlConformance;
+
+import java.io.Reader;
+
+/**
+ * A SqlParserImplFactory that creates the parser according to SqlConformance.
+ */
+public class FlinkSqlParserImplFactory implements SqlParserImplFactory {
+
+   private final SqlConformance conformance;
+
+   public FlinkSqlParserImplFactory(SqlConformance conformance) {
+   this.conformance = conformance;
+   }
+
+   @Override
+   public SqlAbstractParserImpl getParser(Reader stream) {
+   if (conformance == FlinkSqlConformance.HIVE) {
+   return FlinkHiveSqlParserImpl.FACTORY.getParser(stream);
+   } else {
+   return FlinkSqlParserImpl.FACTORY.getParser(stream);
+   }
+   }

Review comment:
   We do not need to extend `SqlParserImplFactory `, how about just a tool 
class named `FlinkSqlParserFactories` and there is a method to return the 
factory by conformance `FlinkSqlParserFactories#createFactory(SqlConformance)` ?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] danny0405 commented on a change in pull request #11791: [FLINK-17210][sql-parser][hive] Implement database DDLs for Hive dialect

2020-04-26 Thread GitBox


danny0405 commented on a change in pull request #11791:
URL: https://github.com/apache/flink/pull/11791#discussion_r415474853



##
File path: flink-table/flink-table-planner-blink/pom.xml
##
@@ -100,6 +100,18 @@ under the License.


 
+   
+   org.apache.flink
+   flink-sql-parser-hive
+   ${project.version}
+   
+   

Review comment:
   I'm fine with that ~





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] leonardBang commented on a change in pull request #11909: [FLINK-16103] Translate "Configuration" page of "Table API & SQL" int…

2020-04-26 Thread GitBox


leonardBang commented on a change in pull request #11909:
URL: https://github.com/apache/flink/pull/11909#discussion_r415465118



##
File path: docs/dev/table/config.zh.md
##
@@ -22,29 +22,22 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-By default, the Table & SQL API is preconfigured for producing accurate 
results with acceptable
-performance.
+默认情况下,Table & SQL API 已经预先配置为在产生准确结果的同时性能也可接受。

Review comment:
   > 默认情况下,Table & SQL API 已经预先配置为在产生准确结果的同时性能也可接受。
   
   Table 和 SQL API 的默认配置能够确保结果准确,同时也提供可接受的性能。
   (想了下, 我们不要直译吧,这里意译会通顺些)

##
File path: docs/dev/table/config.zh.md
##
@@ -22,29 +22,22 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-By default, the Table & SQL API is preconfigured for producing accurate 
results with acceptable
-performance.
+默认情况下,Table & SQL API 已经预先配置为在产生准确结果的同时性能也可接受。
 
-Depending on the requirements of a table program, it might be necessary to 
adjust
-certain parameters for optimization. For example, unbounded streaming programs 
may need to ensure
-that the required state size is capped (see [streaming 
concepts](./streaming/query_configuration.html)).
+根据 Table 程序的要求,可能需要调整特定的参数用于优化。例如,无界流程序可能需要保证所需的状态大小是有上限的(请参阅 
[流式概念](./streaming/query_configuration.html)).

Review comment:
   > 根据 Table 程序的要求,可能需要调整特定的参数用于优化。例如,无界流程序可能需要保证所需的状态大小是有上限的
   根据 Table 程序的需求,可能需要调整特定的参数用于优化。例如,无界流程序可能需要保证所需的状态是有限的

##
File path: docs/dev/table/config.zh.md
##
@@ -22,29 +22,22 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-By default, the Table & SQL API is preconfigured for producing accurate 
results with acceptable
-performance.
+默认情况下,Table & SQL API 已经预先配置为在产生准确结果的同时性能也可接受。
 
-Depending on the requirements of a table program, it might be necessary to 
adjust
-certain parameters for optimization. For example, unbounded streaming programs 
may need to ensure
-that the required state size is capped (see [streaming 
concepts](./streaming/query_configuration.html)).
+根据 Table 程序的要求,可能需要调整特定的参数用于优化。例如,无界流程序可能需要保证所需的状态大小是有上限的(请参阅 
[流式概念](./streaming/query_configuration.html)).
 
 * This will be replaced by the TOC
 {:toc}
 
-### Overview
+### 概览
 
-In every table environment, the `TableConfig` offers options for configuring 
the current session.
+在每个 TableEnvironment 中,`TableConfig` 提供用于配置当前会话的选项。
 
-For common or important configuration options, the `TableConfig` provides 
getters and setters methods
-with detailed inline documentation.
+对于常见或者重要的配置选项,`TableConfig` 提供带有详细注释的 `getters` 和 `setters` 方法。
 
-For more advanced configuration, users can directly access the underlying 
key-value map. The following
-sections list all available options that can be used to adjust Flink Table & 
SQL API programs.
+对于更加高级的配置,用户可以直接访问底层的键值 map 对象。以下章节列举了所有可用于调整 Flink Table & SQL API 程序的选项。

Review comment:
   >对于更加高级的配置,用户可以直接访问底层的键值 map 对象。以下章节列举了所有可用于调整 Flink Table & SQL API 
程序的选项。
   
   对于更加高级的配置,用户可以直接访问底层的 key-value 配置项。以下章节列举了所有可用于调整 Flink Table 和 SQL API 
程序的配置项。

##
File path: docs/dev/table/config.zh.md
##
@@ -90,17 +83,16 @@ configuration.set_string("table.exec.mini-batch.size", 
"5000");
 
 
 
-Attention Currently, key-value options 
are only supported
-for the Blink planner.
+注意 目前,键值选项仅被 Blink planner 支持。
 
-### Execution Options
+### 执行选项

Review comment:
   执行配置

##
File path: docs/dev/table/config.zh.md
##
@@ -22,29 +22,22 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-By default, the Table & SQL API is preconfigured for producing accurate 
results with acceptable
-performance.
+默认情况下,Table & SQL API 已经预先配置为在产生准确结果的同时性能也可接受。
 
-Depending on the requirements of a table program, it might be necessary to 
adjust
-certain parameters for optimization. For example, unbounded streaming programs 
may need to ensure
-that the required state size is capped (see [streaming 
concepts](./streaming/query_configuration.html)).
+根据 Table 程序的要求,可能需要调整特定的参数用于优化。例如,无界流程序可能需要保证所需的状态大小是有上限的(请参阅 
[流式概念](./streaming/query_configuration.html)).
 
 * This will be replaced by the TOC
 {:toc}
 
-### Overview
+### 概览
 
-In every table environment, the `TableConfig` offers options for configuring 
the current session.
+在每个 TableEnvironment 中,`TableConfig` 提供用于配置当前会话的选项。

Review comment:
   `TableConfig` 提供用于当前会话的配置项。


##
File path: docs/dev/table/config.zh.md
##
@@ -90,17 +83,16 @@ configuration.set_string("table.exec.mini-batch.size", 
"5000");
 
 
 
-Attention Currently, key-value options 
are only supported
-for the Blink planner.
+注意 目前,键值选项仅被 Blink planner 支持。
 
-### Execution Options
+### 执行选项
 
-The following options can be used to tune the performance of the query 
execution.
+以下选项可用于优化查询执行的性能。
 
 {% include generated/execution_config_configuration.html %}
 
-### Optimizer Options
+### 优化器选项

Review comment:
   优化器配置


[GitHub] [flink] leonardBang commented on a change in pull request #11909: [FLINK-16103] Translate "Configuration" page of "Table API & SQL" int…

2020-04-26 Thread GitBox


leonardBang commented on a change in pull request #11909:
URL: https://github.com/apache/flink/pull/11909#discussion_r415465664



##
File path: docs/dev/table/config.zh.md
##
@@ -22,29 +22,22 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-By default, the Table & SQL API is preconfigured for producing accurate 
results with acceptable
-performance.
+默认情况下,Table & SQL API 已经预先配置为在产生准确结果的同时性能也可接受。
 
-Depending on the requirements of a table program, it might be necessary to 
adjust
-certain parameters for optimization. For example, unbounded streaming programs 
may need to ensure
-that the required state size is capped (see [streaming 
concepts](./streaming/query_configuration.html)).
+根据 Table 程序的要求,可能需要调整特定的参数用于优化。例如,无界流程序可能需要保证所需的状态大小是有上限的(请参阅 
[流式概念](./streaming/query_configuration.html)).

Review comment:
   > 根据 Table 程序的要求,可能需要调整特定的参数用于优化。例如,无界流程序可能需要保证所需的状态大小是有上限的
   
   根据 Table 程序的需求,可能需要调整特定的参数用于优化。例如,无界流程序可能需要保证所需的状态是有限的





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (FLINK-17392) enable configuring minicluster resources in Flink SQL in IDE

2020-04-26 Thread Kurt Young (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17392?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young updated FLINK-17392:
---
Fix Version/s: (was: 1.11.0)

> enable configuring minicluster resources in Flink SQL in IDE
> 
>
> Key: FLINK-17392
> URL: https://issues.apache.org/jira/browse/FLINK-17392
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Affects Versions: 1.11.0
>Reporter: Bowen Li
>Assignee: Kurt Young
>Priority: Major
>
> It's very common case that users who want to learn and test Flink SQL will 
> try to run a SQL job in IDE like Intellij, with Flink minicluster. Currently 
> it's fine to do so with a simple job requiring only one task slot, which is 
> the default resource config of minicluster.
> However, users cannot run even a little bit more complicated job since they 
> cannot configure task slots of minicluster thru Flink SQL, e.g. single 
> parallelism job requires shuffle. This incapability has been very frustrating 
> to new users.
> There are two solutions to this problem:
> - in minicluster, if it is single parallelism job, then chain all operators 
> together
> - enable configuring minicluster in Flink SQL in IDE.
> The latter feels more proper.
> Expected: users can configure minicluster resources via either SQL ("set 
> ...=...") or TableEnvironment ("tEnv.setMiniclusterResources(..., ...)"). 
> [~jark] [~lzljs3620320]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17392) enable configuring minicluster resources in Flink SQL in IDE

2020-04-26 Thread Jingsong Lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17392?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17092930#comment-17092930
 ] 

Jingsong Lee commented on FLINK-17392:
--

I don't fully understand the problem,

> users cannot run even a little bit more complicated job since they cannot 
>configure task slots of minicluster thru Flink SQL, e.g. single parallelism 
>job requires shuffle. This incapability has been very frustrating to new users.

Why need configure slots? IIUC, Minicluster slot number is depends on 
maximumParallelism of job. So it can run every jobs. 

I don't quite understand what the user's problem is.

> enable configuring minicluster resources in Flink SQL in IDE
> 
>
> Key: FLINK-17392
> URL: https://issues.apache.org/jira/browse/FLINK-17392
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Affects Versions: 1.11.0
>Reporter: Bowen Li
>Assignee: Kurt Young
>Priority: Major
> Fix For: 1.11.0
>
>
> It's very common case that users who want to learn and test Flink SQL will 
> try to run a SQL job in IDE like Intellij, with Flink minicluster. Currently 
> it's fine to do so with a simple job requiring only one task slot, which is 
> the default resource config of minicluster.
> However, users cannot run even a little bit more complicated job since they 
> cannot configure task slots of minicluster thru Flink SQL, e.g. single 
> parallelism job requires shuffle. This incapability has been very frustrating 
> to new users.
> There are two solutions to this problem:
> - in minicluster, if it is single parallelism job, then chain all operators 
> together
> - enable configuring minicluster in Flink SQL in IDE.
> The latter feels more proper.
> Expected: users can configure minicluster resources via either SQL ("set 
> ...=...") or TableEnvironment ("tEnv.setMiniclusterResources(..., ...)"). 
> [~jark] [~lzljs3620320]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17394) Add RemoteEnvironment and RemoteStreamEnvironment in Python

2020-04-26 Thread Huang Xingbo (Jira)
Huang Xingbo created FLINK-17394:


 Summary: Add RemoteEnvironment and RemoteStreamEnvironment in 
Python
 Key: FLINK-17394
 URL: https://issues.apache.org/jira/browse/FLINK-17394
 Project: Flink
  Issue Type: Improvement
  Components: API / Python
Reporter: Huang Xingbo
 Fix For: 1.11.0


Add RemoteEnvironment and RemoteStreamEnvironment in Python



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-17392) enable configuring minicluster resources in Flink SQL in IDE

2020-04-26 Thread Jingsong Lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17392?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17092930#comment-17092930
 ] 

Jingsong Lee edited comment on FLINK-17392 at 4/27/20, 2:08 AM:


Hi sorry, I don't fully understand the problem too,

> users cannot run even a little bit more complicated job since they cannot 
>configure task slots of minicluster thru Flink SQL, e.g. single parallelism 
>job requires shuffle. This incapability has been very frustrating to new users.

Why need configure slots? IIUC, Minicluster slot number is depends on 
maximumParallelism of job. So it can run every jobs. 

I don't quite understand what the user's problem is.


was (Author: lzljs3620320):
I don't fully understand the problem,

> users cannot run even a little bit more complicated job since they cannot 
>configure task slots of minicluster thru Flink SQL, e.g. single parallelism 
>job requires shuffle. This incapability has been very frustrating to new users.

Why need configure slots? IIUC, Minicluster slot number is depends on 
maximumParallelism of job. So it can run every jobs. 

I don't quite understand what the user's problem is.

> enable configuring minicluster resources in Flink SQL in IDE
> 
>
> Key: FLINK-17392
> URL: https://issues.apache.org/jira/browse/FLINK-17392
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Affects Versions: 1.11.0
>Reporter: Bowen Li
>Assignee: Kurt Young
>Priority: Major
> Fix For: 1.11.0
>
>
> It's very common case that users who want to learn and test Flink SQL will 
> try to run a SQL job in IDE like Intellij, with Flink minicluster. Currently 
> it's fine to do so with a simple job requiring only one task slot, which is 
> the default resource config of minicluster.
> However, users cannot run even a little bit more complicated job since they 
> cannot configure task slots of minicluster thru Flink SQL, e.g. single 
> parallelism job requires shuffle. This incapability has been very frustrating 
> to new users.
> There are two solutions to this problem:
> - in minicluster, if it is single parallelism job, then chain all operators 
> together
> - enable configuring minicluster in Flink SQL in IDE.
> The latter feels more proper.
> Expected: users can configure minicluster resources via either SQL ("set 
> ...=...") or TableEnvironment ("tEnv.setMiniclusterResources(..., ...)"). 
> [~jark] [~lzljs3620320]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17289) Translate tutorials/etl.md to chinese

2020-04-26 Thread Li Ying (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17092931#comment-17092931
 ] 

Li Ying commented on FLINK-17289:
-

Thank you. I will finish it this week.

> Translate tutorials/etl.md to chinese
> -
>
> Key: FLINK-17289
> URL: https://issues.apache.org/jira/browse/FLINK-17289
> Project: Flink
>  Issue Type: Improvement
>  Components: chinese-translation, Documentation / Training
>Reporter: David Anderson
>Assignee: Li Ying
>Priority: Major
>
> This is one of the new tutorials, and it needs translation. The file is 
> docs/tutorials/etl.zh.md.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-17392) enable configuring minicluster resources in Flink SQL in IDE

2020-04-26 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17392?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17092914#comment-17092914
 ] 

Jark Wu edited comment on FLINK-17392 at 4/27/20, 1:50 AM:
---

Hi [~phoenixjiangnan], sorry, I don't fully understand this issue. 

> in minicluster, if it is single parallelism job, then chain all operators 
> together
Why do users want to avoid shuffling in minicluster? Performance purpose? Why 
do users care performance in minicluster? I think minicluster is not used for 
production for now. 
However, removing shuffling is in our roadmap but not in a high priority. 
Removing shuffling shouldn't be bound by minicluster. But this is not a small 
effort, we need to be careful with the keyed state.

> enable configuring minicluster in Flink SQL in IDE.
I don't think exposing minicluster to TableEnvironment is a good idea. Even 
{{StreamExecutionEnvironment}} doesn't provide this. If users want to modify 
resources. He can just set parallelism using the {{parallelism.default}} 
configuration.




was (Author: jark):
Hi [~phoenixjiangnan], sorry, I'm not fully understand. 

> in minicluster, if it is single parallelism job, then chain all operators 
> together
Why do users want to avoid shuffling in minicluster? Performance purpose? Why 
do users care performance in minicluster? I think minicluster is not used for 
production for now. 
However, removing shuffling is in our roadmap but not in a high priority. 
Removing shuffling shouldn't be bound by minicluster. But this is not a small 
effort, we need to be careful with the keyed state.

> enable configuring minicluster in Flink SQL in IDE.
I don't think exposing minicluster to TableEnvironment is a good idea. Even 
{{StreamExecutionEnvironment}} doesn't provide this. If users want to modify 
resources. He can just set parallelism using the {{parallelism.default}} 
configuration.



> enable configuring minicluster resources in Flink SQL in IDE
> 
>
> Key: FLINK-17392
> URL: https://issues.apache.org/jira/browse/FLINK-17392
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Affects Versions: 1.11.0
>Reporter: Bowen Li
>Assignee: Kurt Young
>Priority: Major
> Fix For: 1.11.0
>
>
> It's very common case that users who want to learn and test Flink SQL will 
> try to run a SQL job in IDE like Intellij, with Flink minicluster. Currently 
> it's fine to do so with a simple job requiring only one task slot, which is 
> the default resource config of minicluster.
> However, users cannot run even a little bit more complicated job since they 
> cannot configure task slots of minicluster thru Flink SQL, e.g. single 
> parallelism job requires shuffle. This incapability has been very frustrating 
> to new users.
> There are two solutions to this problem:
> - in minicluster, if it is single parallelism job, then chain all operators 
> together
> - enable configuring minicluster in Flink SQL in IDE.
> The latter feels more proper.
> Expected: users can configure minicluster resources via either SQL ("set 
> ...=...") or TableEnvironment ("tEnv.setMiniclusterResources(..., ...)"). 
> [~jark] [~lzljs3620320]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17392) enable configuring minicluster resources in Flink SQL in IDE

2020-04-26 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17392?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17092914#comment-17092914
 ] 

Jark Wu commented on FLINK-17392:
-

Hi [~phoenixjiangnan], sorry, I'm not fully understand. 

> in minicluster, if it is single parallelism job, then chain all operators 
> together
Why do users want to avoid shuffling in minicluster? Performance purpose? Why 
do users care performance in minicluster? I think minicluster is not used for 
production for now. 
However, removing shuffling is in our roadmap but not in a high priority. 
Removing shuffling shouldn't be bound by minicluster. But this is not a small 
effort, we need to be careful with the keyed state.

> enable configuring minicluster in Flink SQL in IDE.
I don't think exposing minicluster to TableEnvironment is a good idea. Even 
{{StreamExecutionEnvironment}} doesn't provide this. If users want to modify 
resources. He can just set parallelism using the {{parallelism.default}} 
configuration.



> enable configuring minicluster resources in Flink SQL in IDE
> 
>
> Key: FLINK-17392
> URL: https://issues.apache.org/jira/browse/FLINK-17392
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Affects Versions: 1.11.0
>Reporter: Bowen Li
>Assignee: Kurt Young
>Priority: Major
> Fix For: 1.11.0
>
>
> It's very common case that users who want to learn and test Flink SQL will 
> try to run a SQL job in IDE like Intellij, with Flink minicluster. Currently 
> it's fine to do so with a simple job requiring only one task slot, which is 
> the default resource config of minicluster.
> However, users cannot run even a little bit more complicated job since they 
> cannot configure task slots of minicluster thru Flink SQL, e.g. single 
> parallelism job requires shuffle. This incapability has been very frustrating 
> to new users.
> There are two solutions to this problem:
> - in minicluster, if it is single parallelism job, then chain all operators 
> together
> - enable configuring minicluster in Flink SQL in IDE.
> The latter feels more proper.
> Expected: users can configure minicluster resources via either SQL ("set 
> ...=...") or TableEnvironment ("tEnv.setMiniclusterResources(..., ...)"). 
> [~jark] [~lzljs3620320]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-17311) Add executable permissions to the script in the wheel package

2020-04-26 Thread Dian Fu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17311?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dian Fu reassigned FLINK-17311:
---

Assignee: Huang Xingbo

> Add executable permissions to the script in the wheel package
> -
>
> Key: FLINK-17311
> URL: https://issues.apache.org/jira/browse/FLINK-17311
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Python
>Reporter: Huang Xingbo
>Assignee: Huang Xingbo
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.11.0
>
>
>  Script files in packages downloaded from Azure will lose executable 
> permissions [https://github.com/microsoft/azure-pipelines-tasks/issues/6364].
> So We need to add a tool to add executable permissions to the script in the 
> wheel package.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-17268) SourceReaderTestBase.testAddSplitToExistingFetcher deadlocks on Travis

2020-04-26 Thread Jiangjie Qin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17268?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin closed FLINK-17268.

Resolution: Fixed

Merged to master.

90a38e8292b629e3d91b55941df00568d60542de

> SourceReaderTestBase.testAddSplitToExistingFetcher deadlocks on Travis
> --
>
> Key: FLINK-17268
> URL: https://issues.apache.org/jira/browse/FLINK-17268
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Common
>Affects Versions: 1.11.0
>Reporter: Till Rohrmann
>Assignee: Jiangjie Qin
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.11.0
>
>
> The {{SourceReaderTestBase.testAddSplitToExistingFetcher}} deadlocks on 
> Travis with the following stack trace:
> {code}
> Picked up JAVA_TOOL_OPTIONS: -XX:+HeapDumpOnOutOfMemoryError
> 2020-04-20 11:40:52
> Full thread dump OpenJDK 64-Bit Server VM (25.242-b08 mixed mode):
> "Attach Listener" #16 daemon prio=9 os_prio=0 tid=0x7f640046d000 
> nid=0x67b3 waiting on condition [0x]
>java.lang.Thread.State: RUNNABLE
> "SourceFetcher" #15 prio=5 os_prio=0 tid=0x7f6400abb000 nid=0x647a 
> waiting on condition [0x7f63e6c4f000]
>java.lang.Thread.State: WAITING (parking)
>   at sun.misc.Unsafe.park(Native Method)
>   - parking to wait for  <0x8fba81e0> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>   at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>   at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
>   at 
> java.util.concurrent.LinkedBlockingDeque.takeFirst(LinkedBlockingDeque.java:492)
>   at 
> java.util.concurrent.LinkedBlockingDeque.take(LinkedBlockingDeque.java:680)
>   at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:104)
>   at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:85)
>   at 
> org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:42)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748)
> "process reaper" #11 daemon prio=10 os_prio=0 tid=0x7f6400774000 
> nid=0x6469 waiting on condition [0x7f63e718e000]
>java.lang.Thread.State: TIMED_WAITING (parking)
>   at sun.misc.Unsafe.park(Native Method)
>   - parking to wait for  <0x80054278> (a 
> java.util.concurrent.SynchronousQueue$TransferStack)
>   at 
> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
>   at 
> java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(SynchronousQueue.java:460)
>   at 
> java.util.concurrent.SynchronousQueue$TransferStack.transfer(SynchronousQueue.java:362)
>   at java.util.concurrent.SynchronousQueue.poll(SynchronousQueue.java:941)
>   at 
> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1073)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
>   at 
> java.util.concurrent.ThreadPoolExecutor$WorknputStream.readInt(DataInputStream.java:387)
>   at 
> org.apache.maven.surefire.booter.MasterProcessCommand.decode(MasterProcessCommand.java:115)
>   at 
> org.apache.maven.surefire.booter.CommandReader$CommandRunnable.run(CommandReader.java:391)
>   at java.lang.Thread.run(Thread.java:748)
> "Service Thread" #8 daemon prio=9 os_prio=0 tid=0x7f6400203000 nid=0x645a 
> runnable [0x]
>java.lang.Thread.State: RUNNABLE
> "C1 CompilerThread1" #7 daemon prio=9 os_prio=0 tid=0x7f640020 
> nid=0x6458 waiting on condition [0x]
>java.lang.Thread.State: RUNNABLE
> "C2 CompilerThread0" #6 daemon prio=9 os_prio=0 tid=0x7f64001fd800 
> nid=0x6457 waiting on condition [0x]
>java.lang.Thread.State: RUNNABLE
> "Signal Dispatcher" #5 daemon prio=9 os_prio=0 tid=0x7f64001fb800 
> nid=0x6456 runnable [0x]
>java.lang.Thread.State: RUNNABLE
> "Surrogate Locker Thread (Concurrent GC)" #4 daemon prio=9 os_prio=0 
> tid=0x7f64001fa000 nid=0x6454 waiting on condition [0x]
>java.lang.Thread.State: RUNNABLE
> "Finalizer" #3 daemon prio=8 os_prio=0 tid=0x7f64001c9800 nid=0x6450 in 
> Object.wait() [0x7f63e9032000]
>java.lang.Thread.State: WAITING (on 

[jira] [Created] (FLINK-17393) Improve the `FutureCompletingBlockingQueue` to wakeup blocking put() more elegantly.

2020-04-26 Thread Jiangjie Qin (Jira)
Jiangjie Qin created FLINK-17393:


 Summary: Improve the `FutureCompletingBlockingQueue` to wakeup 
blocking put() more elegantly.
 Key: FLINK-17393
 URL: https://issues.apache.org/jira/browse/FLINK-17393
 Project: Flink
  Issue Type: Sub-task
  Components: Connectors / Common
Reporter: Jiangjie Qin


Currently, if a {{FetchTask}} is blocking on 
{{FutureCompletingBlockingQueue.put()}}, interrupt has to be called to wake it 
up, which will result in {{InterruptedException}}. We can avoid the 
interruption by having our own implementation of {{BlockingQueue.Put()}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] becketqin commented on pull request #11856: [FLINK-17268][connector/common] Fix the bug of missing records during SplitFetcher wakeup.

2020-04-26 Thread GitBox


becketqin commented on pull request #11856:
URL: https://github.com/apache/flink/pull/11856#issuecomment-619655010


   > The fact that this Interruption-based flow is hard to get right looks like 
it is at least part of the reason for the original bug, so I think it is worth 
thinking about.
   
   The interruption could happen in two cases:
   1. Thrown from `SplitFetcher.wakeup();`
   2. Thrown from `BlockingQueue.put()` in `FetchTask.run()`;
   
   There isn't much we can do in the first case, but we can probably have an 
implementation of `BlockingQueue` (just like the handover you mentioned) to 
avoid `InterruptedException` on `put()` in the second case.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] becketqin commented on pull request #11856: [FLINK-17268][connector/common] Fix the bug of missing records during SplitFetcher wakeup.

2020-04-26 Thread GitBox


becketqin commented on pull request #11856:
URL: https://github.com/apache/flink/pull/11856#issuecomment-619653062


   @StephanEwen Thanks for the review. You are right, the wakeup is difficult 
to make right. I found the major difficulty comes from the efforts to have 
"fine-grained" wakeup, i.e. avoid waking up the `SplitFetcher` unnecessarily, 
because we don't know if the call to `SplitFetcher.wakeup()` is always 
efficient. Maybe we don't really need to worry about this. If so, the code 
would be much more straightforward with a "coarse-grained" wakeup.
   
   I am curious about your suggestion of using `LockSupport`. Would be glad to 
hear the idea.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] StephanEwen edited a comment on pull request #11856: [FLINK-17268][connector/common] Fix the bug of missing records during SplitFetcher wakeup.

2020-04-26 Thread GitBox


StephanEwen edited a comment on pull request #11856:
URL: https://github.com/apache/flink/pull/11856#issuecomment-619636516


   The changes look fine to me, +1 to merge this.
   
   Maybe some comments for separate improvements, as follow-ups:
 - The use of `AtomicBoolean` is a bit confusing. It looks like these are 
mainly `volatile boolean` to report status with a `happens-before` 
ordering/visibility.
   
 - The whole "interrupting"-based flow control is difficult to follow, and 
I believe very difficult to get right. One interrupt flag clearing/setting can 
make the difference between correctness, a deadlock, or a "hot loop lock". And 
the interrupt flags are hellishly hard to reason about, especially because they 
can be mutated by any component outside the fetcher.
   
   One way to solve this is by turning the fetch queue into a "wakeup-able 
queue". That pulls a good amount of the the wakeup logic out of the fetcher 
class, making that one easier to read and to test. The 
[Handover](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Handover.java)
 in the current KafkaConsumer is like a "wakeup-able size-one queue", as an 
example. We probably need something slightly different (wake up consumer of the 
queue). A simple notify/wait implementation is quite straightforward, and if we 
have time we can make this more efficient with Java's `LockSupport` classes.
   
 - The fact that this Interruption-based flow is hard to get right looks 
like it is at least part of the reason for the original bug, so I think it is 
worth thinking about.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] StephanEwen commented on pull request #11856: [FLINK-17268][connector/common] Fix the bug of missing records during SplitFetcher wakeup.

2020-04-26 Thread GitBox


StephanEwen commented on pull request #11856:
URL: https://github.com/apache/flink/pull/11856#issuecomment-619636516


   The changes look fine to me, +1 to merge this.
   
   Maybe some comments for separate improvements, as follow-ups:
 - The use of `AtomicBoolean` is a bit confusing. It looks like these are 
mainly `volatile boolean` to report status with a `happens-before` 
ordering/visibility.
   
 - The whole "interrupting"-based flow control is difficult to follow, and 
I believe very difficult to get right. One interrupt flag clearing/setting can 
make the difference between correctness, a deadlock, or a "hot loop lock". And 
the interrupt flags are hellishly hard to reason about, especially because they 
can be mutated by any component outside the fetcher.
   
   One way to solve this is by turning the fetch queue into a "wakeup-able 
queue". That pulls a good amount of the the wakeup logic out of the fetcher 
class, making that one easier to read and to test. The 
[Handover](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Handover.java)
 in the current KafkaConsumer is like a "wakeup-able size-one queue", as an 
example. We probably need something slightly different (wake up consumer of the 
queue). A simple notify/wait implementation is quite straightforward, and if we 
have time we can make this more efficient with Java's `LockSupport` classes.
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (FLINK-17392) enable configuring minicluster in Flink SQL in IDE

2020-04-26 Thread Bowen Li (Jira)
Bowen Li created FLINK-17392:


 Summary: enable configuring minicluster in Flink SQL in IDE
 Key: FLINK-17392
 URL: https://issues.apache.org/jira/browse/FLINK-17392
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / API
Affects Versions: 1.11.0
Reporter: Bowen Li
Assignee: Kurt Young
 Fix For: 1.11.0


It's very common case that users who want to learn and test Flink SQL will try 
to run a SQL job in IDE like Intellij, with Flink minicluster. Currently it's 
fine to do so with a simple job requiring only one task slot, which is the 
default resource config of minicluster.

However, users cannot run even a little bit more complicated job since they 
cannot configure task slots of minicluster thru Flink SQL, e.g. single 
parallelism job requires shuffle. This incapability has been very frustrating 
to new users.

There are two solutions to this problem:
- in minicluster, if it is single parallelism job, then chain all operators 
together
- enable configuring minicluster in Flink SQL in IDE.

The latter feels more proper.

Expected: users can configure minicluster resources via either SQL ("set 
...=...") or TableEnvironment ("tEnv.setMiniclusterResources(..., ...)"). 

[~jark] [~lzljs3620320]




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-17392) enable configuring minicluster resources in Flink SQL in IDE

2020-04-26 Thread Bowen Li (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17392?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li updated FLINK-17392:
-
Summary: enable configuring minicluster resources in Flink SQL in IDE  
(was: enable configuring minicluster in Flink SQL in IDE)

> enable configuring minicluster resources in Flink SQL in IDE
> 
>
> Key: FLINK-17392
> URL: https://issues.apache.org/jira/browse/FLINK-17392
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Affects Versions: 1.11.0
>Reporter: Bowen Li
>Assignee: Kurt Young
>Priority: Major
> Fix For: 1.11.0
>
>
> It's very common case that users who want to learn and test Flink SQL will 
> try to run a SQL job in IDE like Intellij, with Flink minicluster. Currently 
> it's fine to do so with a simple job requiring only one task slot, which is 
> the default resource config of minicluster.
> However, users cannot run even a little bit more complicated job since they 
> cannot configure task slots of minicluster thru Flink SQL, e.g. single 
> parallelism job requires shuffle. This incapability has been very frustrating 
> to new users.
> There are two solutions to this problem:
> - in minicluster, if it is single parallelism job, then chain all operators 
> together
> - enable configuring minicluster in Flink SQL in IDE.
> The latter feels more proper.
> Expected: users can configure minicluster resources via either SQL ("set 
> ...=...") or TableEnvironment ("tEnv.setMiniclusterResources(..., ...)"). 
> [~jark] [~lzljs3620320]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] bowenli86 commented on pull request #11914: [FLINK-17385][jdbc][postgres] Handled problem of numeric with 0 precision

2020-04-26 Thread GitBox


bowenli86 commented on pull request #11914:
URL: https://github.com/apache/flink/pull/11914#issuecomment-619619104


   LGTM, except that can you also add array tests in 
PostgresCatalogITCase#testArrayTypes()?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-17012) Expose stage of task initialization

2020-04-26 Thread Stephan Ewen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17092839#comment-17092839
 ] 

Stephan Ewen commented on FLINK-17012:
--

Given that this is not a "critically urgent" feature (at least how I understand 
it so far), should we go with the proper solution?

> Expose stage of task initialization
> ---
>
> Key: FLINK-17012
> URL: https://issues.apache.org/jira/browse/FLINK-17012
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Metrics, Runtime / Task
>Reporter: Wenlong Lyu
>Priority: Major
>
> Currently a task switches to running before fully initialized, does not take 
> state initialization and operator initialization(#open ) in to account, which 
> may take long time to finish. As a result, there would be a weird phenomenon 
> that all tasks are running but throughput is 0. 
> I think it could be good if we can expose the initialization stage of tasks. 
> What to you think?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on pull request #11906: [FLINK-17356][jdbc][postgres] Support PK and Unique constraints

2020-04-26 Thread GitBox


flinkbot edited a comment on pull request #11906:
URL: https://github.com/apache/flink/pull/11906#issuecomment-619214462


   
   ## CI report:
   
   * 9d527f923222f1fdf7cb43cc634435cda1248dec Travis: 
[SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/162068781) Azure: 
[CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=270)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #11867: [FLINK-17309][e2e tests][WIP]TPC-DS fail to run data generator

2020-04-26 Thread GitBox


flinkbot edited a comment on pull request #11867:
URL: https://github.com/apache/flink/pull/11867#issuecomment-617863974


   
   ## CI report:
   
   * 6d991e80c512cef7385b083446e761625507f5e4 Travis: 
[SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/162072570) Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=272)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] flinkbot edited a comment on pull request #11791: [FLINK-17210][sql-parser][hive] Implement database DDLs for Hive dialect

2020-04-26 Thread GitBox


flinkbot edited a comment on pull request #11791:
URL: https://github.com/apache/flink/pull/11791#issuecomment-615162346


   
   ## CI report:
   
   * a6a3a9e51fd9ba484d1ab4ccd0646dcacc0692ff Travis: 
[SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/162072493) Azure: 
[SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=271)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (FLINK-16611) Datadog reporter should chunk large reports

2020-04-26 Thread Stephen Whelan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16611?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17092780#comment-17092780
 ] 

Stephen Whelan commented on FLINK-16611:


Hi [~yitz589],

 

I was waiting to get some feedback on the above solution before opening a PR. I 
suppose I can move forward with it and receive comments on the PR.

> Datadog reporter should chunk large reports
> ---
>
> Key: FLINK-16611
> URL: https://issues.apache.org/jira/browse/FLINK-16611
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Metrics
>Affects Versions: 1.9.0
>Reporter: Chesnay Schepler
>Priority: Major
>
> Datadog has a maximum size for reports that it accepts.
> If the report exceeds this size it is simply rejected, rendering the reporter 
> unusable.
> We should investigate what this size limit is, and split the report into 
> multiple chunks.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-16638) Flink checkStateMappingCompleteness doesn't include UserDefinedOperatorIDs

2020-04-26 Thread Eduardo Winpenny Tejedor (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17092761#comment-17092761
 ] 

Eduardo Winpenny Tejedor edited comment on FLINK-16638 at 4/26/20, 4:14 PM:


Hi [~pnowojski] and [~basharaj] I volunteer to fix this one, Bashar already 
offered most of the "just do it" solution. I also suggest this opportunity to 
clean up the code.

[JobVertex|https://github.com/apache/flink/blob/a9f8a0b4481719fb511436a61e36cb0e26559c79/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java#L59]
 has a field called _idAlternatives_ that is only used in a test and in a 
separate function that is also only invoked from a unit test (i.e. it can be 
deleted).

It also has fields _operatorIDs_ and _operatorIdsAlternatives_, their getters 
are called in similar situations, _operatorIdsAlternatives_'s getter has an 
extra invocation in 
[ExecutionJobVertex::includeAlternativeOperatorIDs|https://github.com/apache/flink/blob/a9f8a0b4481719fb511436a61e36cb0e26559c79/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java#L667]
 that is only making up for a previous call to get only the _operatorIDs_. It 
seems like both concepts are tightly related and should be encapsulated in a 
single entity. Not surprisingly the bug we're tackling here is due to 
retrieving one set of ids while forgetting the other set of ids.

 

There are 6 production code invocations to 
[ExecutionJobVertex::getOperatorIDs|https://github.com/apache/flink/blob/a9f8a0b4481719fb511436a61e36cb0e26559c79/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java#L296]:
 - The call in [Checkpoints::loadAndValidateCheckpoint|#L140] is soon followed 
by the previously mentioned 
_ExecutionJobVertex::includeAlternativeOperatorIDs_. Reinforcing the argument 
that both are related.

 * The call in 
[StateAssignmentOperation::assignStates|https://github.com/apache/flink/blob/a9f8a0b4481719fb511436a61e36cb0e26559c79/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java#L89]
 is directly followed to call to retrieve the operator id alternatives, also 
reinforcing the argument.
 * The call in 
[StateAssignmentOperation::checkStateMappingCompleteness|https://github.com/apache/flink/blob/a9f8a0b4481719fb511436a61e36cb0e26559c79/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java#L558]
 is the one with the bug!

 - The other 3 in 
[StateAssignmentOperation::assignTaskStateToExecutionJobVertices|https://github.com/apache/flink/blob/a9f8a0b4481719fb511436a61e36cb0e26559c79/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java#L217],
 
[StateAssignmentOperation::assignAttemptState|https://github.com/apache/flink/blob/a9f8a0b4481719fb511436a61e36cb0e26559c79/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java#L118],
 and 
[PendingCheckpoint::acknowledgeTask|https://github.com/apache/flink/blob/a9f8a0b4481719fb511436a61e36cb0e26559c79/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java#L374]
 are not followed by a retrieval of the alternative ids nor do they need one. 
Why these don't need to is puzzling me and they're the only reason not to 
encapsulate both ids together. If someone could explain this we might be able 
to leave the code pretty tidy!

I offer myself volunteer for either the quick dirty fix or the longer cleaner 
solution. The first should take me a couple of days whereas the second should 
take me ~4 days (considering I've already done a lot of the ground work 
locally). I'm open to suggestions.

 

 


was (Author: edu05):
Hi [~pnowojski] and [~basharaj] I volunteer to fix this one, Bashar already 
offered most of the "just do it" solution. I also suggest this opportunity to 
clean up the code.

[JobVertex|https://github.com/apache/flink/blob/a9f8a0b4481719fb511436a61e36cb0e26559c79/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java#L59]
 has a field called _idAlternatives_ that is only used in a test and in a 
separate function that is also only invoked from a unit test (i.e. it can be 
deleted).

It also has fields operatorIDs and operatorIdsAlternatives , their getters are 
called in similar situations, operatorIdsAlternatives's getter has an extra 
invocation in 
[ExecutionJobVertex::includeAlternativeOperatorIDs|https://github.com/apache/flink/blob/a9f8a0b4481719fb511436a61e36cb0e26559c79/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java#L667]
 that is only making up for a previous call to get only the operatorIDs. It 
seems like both concepts are tightly related and should be encapsulated in a 
single entity. Not surprisingly the bug we're tackling here 

[jira] [Comment Edited] (FLINK-16638) Flink checkStateMappingCompleteness doesn't include UserDefinedOperatorIDs

2020-04-26 Thread Eduardo Winpenny Tejedor (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17092761#comment-17092761
 ] 

Eduardo Winpenny Tejedor edited comment on FLINK-16638 at 4/26/20, 4:13 PM:


Hi [~pnowojski] and [~basharaj] I volunteer to fix this one, Bashar already 
offered most of the "just do it" solution. I also suggest this opportunity to 
clean up the code.

[JobVertex|https://github.com/apache/flink/blob/a9f8a0b4481719fb511436a61e36cb0e26559c79/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java#L59]
 has a field called _idAlternatives_ that is only used in a test and in a 
separate function that is also only invoked from a unit test (i.e. it can be 
deleted).

It also has fields operatorIDs and operatorIdsAlternatives , their getters are 
called in similar situations, operatorIdsAlternatives's getter has an extra 
invocation in 
[ExecutionJobVertex::includeAlternativeOperatorIDs|https://github.com/apache/flink/blob/a9f8a0b4481719fb511436a61e36cb0e26559c79/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java#L667]
 that is only making up for a previous call to get only the operatorIDs. It 
seems like both concepts are tightly related and should be encapsulated in a 
single entity. Not surprisingly the bug we're tackling here is due to 
retrieving one set of ids while forgetting the other set of ids.

 

There are 6 production code invocations to 
[ExecutionJobVertex::getOperatorIDs|https://github.com/apache/flink/blob/a9f8a0b4481719fb511436a61e36cb0e26559c79/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java#L296]:
 - The call in [Checkpoints::loadAndValidateCheckpoint|#L140] is soon followed 
by the previously mentioned 
_ExecutionJobVertex::includeAlternativeOperatorIDs_. Reinforcing the argument 
that both are related.

 * The call in 
[StateAssignmentOperation::assignStates|https://github.com/apache/flink/blob/a9f8a0b4481719fb511436a61e36cb0e26559c79/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java#L89]
 is directly followed to call to retrieve the operator id alternatives, also 
reinforcing the argument.
 * The call in 
[StateAssignmentOperation::checkStateMappingCompleteness|https://github.com/apache/flink/blob/a9f8a0b4481719fb511436a61e36cb0e26559c79/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java#L558]
 is the one with the bug!

 - The other 3 in 
[StateAssignmentOperation::assignTaskStateToExecutionJobVertices|https://github.com/apache/flink/blob/a9f8a0b4481719fb511436a61e36cb0e26559c79/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java#L217],
 
[StateAssignmentOperation::assignAttemptState|https://github.com/apache/flink/blob/a9f8a0b4481719fb511436a61e36cb0e26559c79/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java#L118],
 and 
[PendingCheckpoint::acknowledgeTask|https://github.com/apache/flink/blob/a9f8a0b4481719fb511436a61e36cb0e26559c79/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java#L374]
 are not followed by a retrieval of the alternative ids nor do they need one. 
Why these don't need to is puzzling me and they're the only reason not to 
encapsulate both ids together. If someone could explain this we might be able 
to leave the code pretty tidy!

I offer myself volunteer for either the quick dirty fix or the longer cleaner 
solution. The first should take me a couple of days whereas the second should 
take me ~4 days (considering I've already done a lot of the ground work 
locally). I'm open to suggestions.

 

 


was (Author: edu05):
Hi [~pnowojski] and [~basharaj] I volunteer to fix this one, Bashar already 
offered most of the "just do it" solution. I also suggest this opportunity to 
clean up the code.

[JobVertex|https://github.com/apache/flink/blob/a9f8a0b4481719fb511436a61e36cb0e26559c79/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java#L59]
 has a field called _idAlternatives_ that is only used in a test and in a 
separate function that is also only invoked from a unit test (i.e. it can be 
deleted).

It also has fields operatorIDs and operatorIdsAlternatives , their getters are 
called in similar situations, operatorIdsAlternatives's getter has an extra 
invocation in 
[ExecutionJobVertex::includeAlternativeOperatorIDs|https://github.com/apache/flink/blob/a9f8a0b4481719fb511436a61e36cb0e26559c79/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java#L667]
 that is only making up for a previous call to get only the operatorIDs. It 
seems like both concepts are tightly related and should be encapsulated in a 
single entity. Not surprisingly the bug we're tackling here is due 

[jira] [Comment Edited] (FLINK-16638) Flink checkStateMappingCompleteness doesn't include UserDefinedOperatorIDs

2020-04-26 Thread Eduardo Winpenny Tejedor (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17092761#comment-17092761
 ] 

Eduardo Winpenny Tejedor edited comment on FLINK-16638 at 4/26/20, 4:11 PM:


Hi [~pnowojski] and [~basharaj] I volunteer to fix this one, Bashar already 
offered most of the "just do it" solution. I also suggest this opportunity to 
clean up the code.

[JobVertex|https://github.com/apache/flink/blob/a9f8a0b4481719fb511436a61e36cb0e26559c79/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java#L59]
 has a field called _idAlternatives_ that is only used in a test and in a 
separate function that is also only invoked from a unit test (i.e. it can be 
deleted).

It also has fields operatorIDs and operatorIdsAlternatives , their getters are 
called in similar situations, operatorIdsAlternatives's getter has an extra 
invocation in 
[ExecutionJobVertex::includeAlternativeOperatorIDs|https://github.com/apache/flink/blob/a9f8a0b4481719fb511436a61e36cb0e26559c79/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java#L667]
 that is only making up for a previous call to get only the operatorIDs. It 
seems like both concepts are tightly related and should be encapsulated in a 
single entity. Not surprisingly the bug we're tackling here is due to 
retrieving one set of ids while forgetting the other set of ids.

 

There are 6 production code invocations to 
[ExecutionJobVertex::getOperatorIDs|https://github.com/apache/flink/blob/a9f8a0b4481719fb511436a61e36cb0e26559c79/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java#L296]:
 - The call in [Checkpoints::loadAndValidateCheckpoint|#L140] is soon followed 
by the previously mentioned 
_ExecutionJobVertex::includeAlternativeOperatorIDs_. Reinforcing the argument 
that both are related.

 * The call in 
[StateAssignmentOperation::assignStates|https://github.com/apache/flink/blob/a9f8a0b4481719fb511436a61e36cb0e26559c79/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java#L89]
 is directly followed to call to retrieve the operator id alternatives, also 
reinforcing the argument.
 * The call in 
[StateAssignmentOperation::checkStateMappingCompleteness|https://github.com/apache/flink/blob/a9f8a0b4481719fb511436a61e36cb0e26559c79/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java#L558]
 is the one with the bug!

 - The other 3 in 
[StateAssignmentOperation::assignTaskStateToExecutionJobVertices|[https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java|https://github.com/apache/flink/blob/a9f8a0b4481719fb511436a61e36cb0e26559c79/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java#L217]],
 
[StateAssignmentOperation::assignAttemptState|https://github.com/apache/flink/blob/a9f8a0b4481719fb511436a61e36cb0e26559c79/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java#L118],
 and [PendingCheckpoint::acknowledgeTask| 
[https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java]]
 are not followed by a retrieval of the alternative ids nor do they need one. 
Why these don't need to is puzzling me and they're the only reason not to 
encapsulate both ids together. If someone could explain this we might be able 
to leave the code pretty tidy!

I offer myself volunteer for either the quick dirty fix or the longer cleaner 
solution. The first should take me a couple of days whereas the second should 
take me ~4 days (considering I've already done a lot of the ground work 
locally). I'm open to suggestions.

 

 


was (Author: edu05):
Hi [~pnowojski] and [~basharaj] I volunteer to fix this one, Bashar already 
offered most of the "just do it" solution. I also suggest this opportunity to 
clean up the code.

[JobVertex|https://github.com/apache/flink/blob/a9f8a0b4481719fb511436a61e36cb0e26559c79/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java#L59]
 has a field called _idAlternatives_ that is only used in a test and in a 
separate function that is also only invoked from a unit test (i.e. it can be 
deleted).

It also has fields operatorIDs and operatorIdsAlternatives , their getters are 
called in similar situations, operatorIdsAlternatives's getter has an extra 
invocation in 
[ExecutionJobVertex::includeAlternativeOperatorIDs|https://github.com/apache/flink/blob/a9f8a0b4481719fb511436a61e36cb0e26559c79/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java#L667]
 that is only making up for a previous call to get only the operatorIDs. It 
seems like both concepts are tightly 

  1   2   3   >