[GitHub] [flink] HuangZhenQiu commented on issue #7356: [FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers

2019-05-18 Thread GitBox
HuangZhenQiu commented on issue #7356: [FLINK-10868][flink-yarn] Enforce 
maximum TMs failure rate in ResourceManagers
URL: https://github.com/apache/flink/pull/7356#issuecomment-493729416
 
 
   @tillrohrmann 
   As this branch haven't been synced with Master for a while, I will create 
another Pull Request. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-12551) elasticsearch6 connector print log error

2019-05-18 Thread xiongkun (JIRA)
xiongkun created FLINK-12551:


 Summary: elasticsearch6 connector print log error
 Key: FLINK-12551
 URL: https://issues.apache.org/jira/browse/FLINK-12551
 Project: Flink
  Issue Type: Bug
  Components: Connectors / ElasticSearch
Affects Versions: 1.6.3
Reporter: xiongkun


when i use elasticsearch connector ,when my project is running,i find some data 
does not insert elasticsearch ,so i want to read log help me ,but the log does 
contain importance message,so i read source code 
(org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase),i 
find a error on write ERROR log.

 
{code:java}
@Override
public void afterBulk(long executionId, BulkRequest request, BulkResponse 
response) {
 if (response.hasFailures()) {
  BulkItemResponse itemResponse;
  Throwable failure;
  RestStatus restStatus;

  try {
   for (int i = 0; i < response.getItems().length; i++) {
itemResponse = response.getItems()[i];
failure = callBridge.extractFailureCauseFromBulkItemResponse(itemResponse);
if (failure != null) {
 LOG.error("Failed Elasticsearch item request: {}", 
itemResponse.getFailureMessage(), failure);

 restStatus = itemResponse.getFailure().getStatus();
 if (restStatus == null) {
  failureHandler.onFailure(request.requests().get(i), failure, -1, 
requestIndexer);
 } else {
  failureHandler.onFailure(request.requests().get(i), failure, 
restStatus.getStatus(), requestIndexer);
 }
}
   }
  } catch (Throwable t) {
   // fail the sink and skip the rest of the items
   // if the failure handler decides to throw an exception
   failureThrowable.compareAndSet(null, t);
  }
 }

 if (flushOnCheckpoint) {
  numPendingRequests.getAndAdd(-request.numberOfActions());
 }
}
{code}
{code:java}
@Override
 public void afterBulk(long executionId, BulkRequest request, Throwable 
failure) {
  LOG.error("Failed Elasticsearch bulk request: {}", failure.getMessage(), 
failure.getCause());

  try {
   for (ActionRequest action : request.requests()) {
failureHandler.onFailure(action, failure, -1, requestIndexer);
   }
  } catch (Throwable t) {
   // fail the sink and skip the rest of the items
   // if the failure handler decides to throw an exception
   failureThrowable.compareAndSet(null, t);
  }

  if (flushOnCheckpoint) {
   numPendingRequests.getAndAdd(-request.numberOfActions());
  }
 }
}
{code}
 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] HuangZhenQiu commented on a change in pull request #7356: [FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers

2019-05-18 Thread GitBox
HuangZhenQiu commented on a change in pull request #7356: 
[FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers
URL: https://github.com/apache/flink/pull/7356#discussion_r285362730
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java
 ##
 @@ -55,6 +55,18 @@
" default, the port of the JobManager, because the same 
ActorSystem is used." +
" Its not possible to use this configuration key to 
define port ranges.");
 
+   /**
+* Defines the maximum number of workers (YARN / Mesos) failure can 
happen in a minute.
 
 Review comment:
   From our production experience, it would be better to let the job failure 
immediately to make engineer to involve into the issue. The failure rate here 
is to just distinguish whether it is a transient failure or real issue caused 
by external systems such as hdfs. Thus, I am not sure it is needed for wait for 
failure rate falls below the  maximum.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on issue #8477: [FLINK-12241][hive] Support function related operations in GenericHiveMetastoreCatalog

2019-05-18 Thread GitBox
bowenli86 commented on issue #8477: [FLINK-12241][hive] Support function 
related operations in GenericHiveMetastoreCatalog
URL: https://github.com/apache/flink/pull/8477#issuecomment-493695815
 
 
   cc @xuefuz @lirui-apache 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-12550) hostnames with a dot never receive local input splits

2019-05-18 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12550?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-12550:
---
Labels: pull-request-available  (was: )

> hostnames with a dot never receive local input splits
> -
>
> Key: FLINK-12550
> URL: https://issues.apache.org/jira/browse/FLINK-12550
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataSet
>Affects Versions: 1.8.0
>Reporter: Felix seibert
>Priority: Major
>  Labels: pull-request-available
>
> LocatableInputSplitAssigner (in package api.common.io) fails to assign local 
> input splits to hosts whose hostname contains a dot ("."). To reproduce add 
> the following test to LocatableSplitAssignerTest and execute it. It will 
> always fail. In my mind, this is contrary to the expected behaviour, which is 
> that the host should obtain the one split that is stored on the very same 
> machine.
>  
> {code:java}
> @Test
> public void testLocalSplitAssignmentForHostWithDomainName() {
>try {
>   String hostNameWithDot = "testhost.testdomain";
>   // load one split
>   Set splits = new HashSet();
>   splits.add(new LocatableInputSplit(0, hostNameWithDot));
>   // get all available splits
>   LocatableInputSplitAssigner ia = new 
> LocatableInputSplitAssigner(splits);
>   InputSplit is = null;
>   ia.getNextInputSplit(hostNameWithDot, 0);
>   assertEquals(0, ia.getNumberOfRemoteAssignments());
>   assertEquals(1, ia.getNumberOfLocalAssignments());
>}
>catch (Exception e) {
>   e.printStackTrace();
>   fail(e.getMessage());
>}
> }
> {code}
> I also experienced this error in practice, and will later today open a pull 
> request to fix it.
>  
> Note: I'm not sure if I selected the correct component category.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] flinkbot commented on issue #8478: [FLINK-12550] fix local input split assignment for hostnames containing a dot (".")

2019-05-18 Thread GitBox
flinkbot commented on issue #8478: [FLINK-12550] fix local input split 
assignment for hostnames containing a dot (".")
URL: https://github.com/apache/flink/pull/8478#issuecomment-493695413
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] felse opened a new pull request #8478: [FLINK-12550] fix local input split assignment for hostnames containing a dot (".")

2019-05-18 Thread GitBox
felse opened a new pull request #8478: [FLINK-12550] fix local input split 
assignment for hostnames containing a dot (".")
URL: https://github.com/apache/flink/pull/8478
 
 
   ## What is the purpose of the change
   LocatableInputSplitAssigner does not assign input splits locally if the 
hostname contains a dot ("."), like in hostname.domain. This PR should fix this 
issue.
   
   ## Brief change log
   The hostnames of the input split locations are now prepared in the same way 
as the hostname of the TaskManager that is getting the next input split, before 
they are compared to each other.
   
   ## Verifying this change
   A unit test has been added (testLocalSplitAssignmentForHostWithDomainName in 
LocatableSplitAssignerTest). This test fails with all previous versions of 
LocatableInputSplitAssigner and passes with the change introcuced in this PR.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-12550) hostnames with a dot never receive local input splits

2019-05-18 Thread Felix seibert (JIRA)
Felix seibert created FLINK-12550:
-

 Summary: hostnames with a dot never receive local input splits
 Key: FLINK-12550
 URL: https://issues.apache.org/jira/browse/FLINK-12550
 Project: Flink
  Issue Type: Bug
  Components: API / DataSet
Affects Versions: 1.8.0
Reporter: Felix seibert


LocatableInputSplitAssigner (in package api.common.io) fails to assign local 
input splits to hosts whose hostname contains a dot ("."). To reproduce add the 
following test to LocatableSplitAssignerTest and execute it. It will always 
fail. In my mind, this is contrary to the expected behaviour, which is that the 
host should obtain the one split that is stored on the very same machine.

 
{code:java}
@Test
public void testLocalSplitAssignmentForHostWithDomainName() {
   try {
  String hostNameWithDot = "testhost.testdomain";

  // load one split
  Set splits = new HashSet();
  splits.add(new LocatableInputSplit(0, hostNameWithDot));

  // get all available splits
  LocatableInputSplitAssigner ia = new LocatableInputSplitAssigner(splits);
  InputSplit is = null;
  ia.getNextInputSplit(hostNameWithDot, 0);

  assertEquals(0, ia.getNumberOfRemoteAssignments());
  assertEquals(1, ia.getNumberOfLocalAssignments());
   }
   catch (Exception e) {
  e.printStackTrace();
  fail(e.getMessage());
   }
}
{code}
I also experienced this error in practice, and will later today open a pull 
request to fix it.

 

Note: I'm not sure if I selected the correct component category.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7883) Make savepoints atomic with respect to state and side effects

2019-05-18 Thread Vahid Hashemian (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7883?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16843189#comment-16843189
 ] 

Vahid Hashemian commented on FLINK-7883:


We are also very interested in this enhancement that lets bounded consumption 
by {{FlinkKafkaConsumer}}.

> Make savepoints atomic with respect to state and side effects
> -
>
> Key: FLINK-7883
> URL: https://issues.apache.org/jira/browse/FLINK-7883
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream, Connectors / Kafka, Runtime / State 
> Backends
>Affects Versions: 1.3.2, 1.4.0
>Reporter: Antoine Philippot
>Priority: Major
>
> For a cancel with savepoint command, the JobManager trigger the cancel call 
> once the savepoint is finished, but during the savepoint execution, kafka 
> source continue to poll new messages which will not be part of the savepoint 
> and will be replayed on the next application start.
> A solution could be to stop fetching the source stream task before triggering 
> the savepoint.
> I suggest to add an interface {{StoppableFetchingSourceFunction}} with a 
> method {{stopFetching}} that existant SourceFunction implementations could 
> implement.
> We can add a {{stopFetchingSource}} property in 
>  {{CheckpointOptions}} class to pass the desired behaviour from 
> {{JobManager.handleMessage(CancelJobWithSavepoint)}} to 
> {{SourceStreamTask.triggerCheckpoint}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] azagrebin commented on issue #7757: [FLINK-11630] Triggers the termination of all running Tasks when shutting down TaskExecutor

2019-05-18 Thread GitBox
azagrebin commented on issue #7757: [FLINK-11630] Triggers the termination of 
all running Tasks when shutting down TaskExecutor
URL: https://github.com/apache/flink/pull/7757#issuecomment-493680887
 
 
   @kisimple could you rebase on latest master and squash empty commits? it 
seems there has been a fix merged.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on issue #8416: [FLINK-12331] Introduce partition/gate setup to decouple task registration with NetworkEnvironment

2019-05-18 Thread GitBox
azagrebin commented on issue #8416: [FLINK-12331] Introduce partition/gate 
setup to decouple task registration with NetworkEnvironment
URL: https://github.com/apache/flink/pull/8416#issuecomment-493680455
 
 
   Thanks for the reviews @zentol @zhijiangW , I addressed the comments


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #8416: [FLINK-12331] Introduce partition/gate setup to decouple task registration with NetworkEnvironment

2019-05-18 Thread GitBox
azagrebin commented on a change in pull request #8416: [FLINK-12331] Introduce 
partition/gate setup to decouple task registration with NetworkEnvironment
URL: https://github.com/apache/flink/pull/8416#discussion_r285343015
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java
 ##
 @@ -40,35 +38,38 @@ public static ResultPartition 
createPartition(ResultPartitionType type) {
false);
}
 
-   public static ResultPartition createPartition(ResultPartitionType type, 
int numChannels) {
-   return createPartition(new 
NoOpResultPartitionConsumableNotifier(), type, numChannels, false);
-   }
-
public static ResultPartition createPartition(
ResultPartitionConsumableNotifier notifier,
ResultPartitionType type,
boolean sendScheduleOrUpdateConsumersMessage) {
 
-   return createPartition(notifier, type, 1, 
sendScheduleOrUpdateConsumersMessage);
+   return createPartition(null, notifier, type, 1, 
sendScheduleOrUpdateConsumersMessage);
}
 
public static ResultPartition createPartition(
 
 Review comment:
   Well, certain short-cuts are still shorter if used in many places.. Not 
sure, I'll reduce it a bit more but atm I do not see it strictly being 
necessary.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #8416: [FLINK-12331] Introduce partition/gate setup to decouple task registration with NetworkEnvironment

2019-05-18 Thread GitBox
azagrebin commented on a change in pull request #8416: [FLINK-12331] Introduce 
partition/gate setup to decouple task registration with NetworkEnvironment
URL: https://github.com/apache/flink/pull/8416#discussion_r285342718
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
 ##
 @@ -88,35 +86,62 @@
 
private final TaskEventPublisher taskEventPublisher;
 
-   private final IOManager ioManager;
+   private final ResultPartitionFactory resultPartitionFactory;
+
+   private final SingleInputGateFactory singleInputGateFactory;
 
private boolean isShutdown;
 
-   public NetworkEnvironment(
-   NetworkEnvironmentConfiguration config,
-   TaskEventPublisher taskEventPublisher,
-   MetricGroup metricGroup,
-   IOManager ioManager) {
-   this.config = checkNotNull(config);
+   private NetworkEnvironment(
+   NetworkEnvironmentConfiguration config,
+   NetworkBufferPool networkBufferPool,
+   ConnectionManager connectionManager,
+   ResultPartitionManager resultPartitionManager,
+   TaskEventPublisher taskEventPublisher,
+   ResultPartitionFactory resultPartitionFactory,
+   SingleInputGateFactory singleInputGateFactory) {
+
+   this.config = config;
+   this.networkBufferPool = networkBufferPool;
+   this.connectionManager = connectionManager;
+   this.resultPartitionManager = resultPartitionManager;
+   this.taskEventPublisher = taskEventPublisher;
+   this.resultPartitionFactory = resultPartitionFactory;
+   this.singleInputGateFactory = singleInputGateFactory;
+   this.isShutdown = false;
+   }
 
-   this.networkBufferPool = new 
NetworkBufferPool(config.numNetworkBuffers(), config.networkBufferSize());
+   public static NetworkEnvironment create(
+   NetworkEnvironmentConfiguration config,
+   TaskEventPublisher taskEventPublisher,
+   MetricGroup metricGroup,
+   IOManager ioManager) {
 
-   NettyConfig nettyConfig = config.nettyConfig();
-   if (nettyConfig != null) {
-   this.connectionManager = new 
NettyConnectionManager(nettyConfig, config.isCreditBased());
-   } else {
-   this.connectionManager = new LocalConnectionManager();
-   }
+   NettyConfig nettyConfig = checkNotNull(config).nettyConfig();
+   ConnectionManager connectionManager = nettyConfig != null ?
+   new NettyConnectionManager(nettyConfig, 
config.isCreditBased()) : new LocalConnectionManager();
 
-   this.resultPartitionManager = new ResultPartitionManager();
-
-   this.taskEventPublisher = checkNotNull(taskEventPublisher);
+   NetworkBufferPool networkBufferPool = new NetworkBufferPool(
+   config.numNetworkBuffers(), config.networkBufferSize(), 
config.networkBuffersPerChannel());
 
registerNetworkMetrics(metricGroup, networkBufferPool);
 
-   this.ioManager = checkNotNull(ioManager);
-
-   isShutdown = false;
+   ResultPartitionManager resultPartitionManager = new 
ResultPartitionManager();
+   ResultPartitionFactory resultPartitionFactory = new 
ResultPartitionFactory(
+   resultPartitionManager, checkNotNull(ioManager), 
networkBufferPool,
+   config.networkBuffersPerChannel(), 
config.floatingNetworkBuffersPerGate());
 
 Review comment:
   In general, I would prefer to keep classes/functions to have as least 
dependencies as possible if full `config` is not needed, it is easier to see 
the real dependencies and use in other places, like avoid mocking full config 
in tests.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #8416: [FLINK-12331] Introduce partition/gate setup to decouple task registration with NetworkEnvironment

2019-05-18 Thread GitBox
azagrebin commented on a change in pull request #8416: [FLINK-12331] Introduce 
partition/gate setup to decouple task registration with NetworkEnvironment
URL: https://github.com/apache/flink/pull/8416#discussion_r285342471
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java
 ##
 @@ -133,31 +103,64 @@ public static LocalInputChannel createLocalInputChannel(
int initialBackoff,
int maxBackoff) {
 
-   return new LocalInputChannel(
-   inputGate,
-   0,
-   new ResultPartitionID(),
-   partitionManager,
-   new TaskEventDispatcher(),
-   initialBackoff,
-   maxBackoff,
-   newUnregisteredInputChannelMetrics());
+   return InputChannelBuilder.newBuilder()
+   .setPartitionManager(partitionManager)
+   .setInitialBackoff(initialBackoff)
+   .setMaxBackoff(maxBackoff)
+   .buildLocal(inputGate);
}
 
public static RemoteInputChannel createRemoteInputChannel(
SingleInputGate inputGate,
int channelIndex,
ConnectionManager connectionManager) {
 
-   return new RemoteInputChannel(
-   inputGate,
-   channelIndex,
-   new ResultPartitionID(),
-   STUB_CONNECTION_ID,
-   connectionManager,
-   0,
-   0,
-   newUnregisteredInputChannelMetrics());
+   return InputChannelBuilder.newBuilder()
+   .setChannelIndex(channelIndex)
+   .setConnectionManager(connectionManager)
+   .buildRemote(inputGate);
+   }
+
+   public static RemoteInputChannel createRemoteInputChannel(
+   SingleInputGate inputGate,
+   PartitionRequestClient client,
+   MemorySegmentProvider memorySegmentProvider) {
+
+   return InputChannelBuilder.newBuilder()
+   
.setConnectionManager(mockConnectionManagerWithPartitionRequestClient(client))
+   .setMemorySegmentProvider(memorySegmentProvider)
+   .buildRemote(inputGate);
+   }
+
+   public static ConnectionManager 
mockConnectionManagerWithPartitionRequestClient(PartitionRequestClient client) {
 
 Review comment:
   we could consider it, but this sounds like another refactoring :) for a 
later follow up


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] mr-cloud commented on issue #8464: [hotfix] fix typos.

2019-05-18 Thread GitBox
mr-cloud commented on issue #8464: [hotfix] fix typos.
URL: https://github.com/apache/flink/pull/8464#issuecomment-493675884
 
 
   This is my first trying to contribute. I'd like to know why my PR has not 
been accepted yet, and how 's it going now? What else do I need to do to finish 
this task successfully? Thanks for your help :-) @rmetzger 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #8416: [FLINK-12331] Introduce partition/gate setup to decouple task registration with NetworkEnvironment

2019-05-18 Thread GitBox
azagrebin commented on a change in pull request #8416: [FLINK-12331] Introduce 
partition/gate setup to decouple task registration with NetworkEnvironment
URL: https://github.com/apache/flink/pull/8416#discussion_r285341747
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolDestroyTest.java
 ##
 @@ -48,7 +48,7 @@ public void testDestroyWhileBlockingRequest() throws 
Exception {
LocalBufferPool localBufferPool = null;
 
try {
-   networkBufferPool = new NetworkBufferPool(1, 4096);
+   networkBufferPool = new NetworkBufferPool(1, 4096, 1);
 
 Review comment:
   I did not want to explode number of constructors for tests, especially for 
one, not very important parameters. I think it does not make too long to add it 
in tests.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #8416: [FLINK-12331] Introduce partition/gate setup to decouple task registration with NetworkEnvironment

2019-05-18 Thread GitBox
azagrebin commented on a change in pull request #8416: [FLINK-12331] Introduce 
partition/gate setup to decouple task registration with NetworkEnvironment
URL: https://github.com/apache/flink/pull/8416#discussion_r285341674
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
 ##
 @@ -69,14 +71,19 @@
 
private int numTotalRequiredBuffers;
 
+   private final int numberOfRequestedBuffers;
 
 Review comment:
   I would name it `numberOfSegmentsToRequest`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #8416: [FLINK-12331] Introduce partition/gate setup to decouple task registration with NetworkEnvironment

2019-05-18 Thread GitBox
azagrebin commented on a change in pull request #8416: [FLINK-12331] Introduce 
partition/gate setup to decouple task registration with NetworkEnvironment
URL: https://github.com/apache/flink/pull/8416#discussion_r285341508
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientTest.java
 ##
 @@ -106,15 +106,15 @@ public void testDoublePartitionRequest() throws 
Exception {
final PartitionRequestClient client = new 
PartitionRequestClient(
channel, handler, mock(ConnectionID.class), 
mock(PartitionRequestClientFactory.class));
 
-   final NetworkBufferPool networkBufferPool = new 
NetworkBufferPool(10, 32);
+   final int numExclusiveBuffers = 2;
 
 Review comment:
   I left it where it was used somewhere else, like in assertions, for 
readability. I guess readability was also the original intent.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #8416: [FLINK-12331] Introduce partition/gate setup to decouple task registration with NetworkEnvironment

2019-05-18 Thread GitBox
azagrebin commented on a change in pull request #8416: [FLINK-12331] Introduce 
partition/gate setup to decouple task registration with NetworkEnvironment
URL: https://github.com/apache/flink/pull/8416#discussion_r285341508
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientTest.java
 ##
 @@ -106,15 +106,15 @@ public void testDoublePartitionRequest() throws 
Exception {
final PartitionRequestClient client = new 
PartitionRequestClient(
channel, handler, mock(ConnectionID.class), 
mock(PartitionRequestClientFactory.class));
 
-   final NetworkBufferPool networkBufferPool = new 
NetworkBufferPool(10, 32);
+   final int numExclusiveBuffers = 2;
 
 Review comment:
   I left it where it was used somewhere else, like in assertions, for 
readability


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-12519) Introduce planner rules about semi/anti join

2019-05-18 Thread Kurt Young (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12519?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young closed FLINK-12519.
--
   Resolution: Implemented
Fix Version/s: 1.9.0

merged in 1.9.0: 4fa387164cea44f8e0bac1aadab11433c0f0ff2b

> Introduce planner rules about semi/anti join
> 
>
> Key: FLINK-12519
> URL: https://issues.apache.org/jira/browse/FLINK-12519
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / Planner
>Reporter: godfrey he
>Assignee: godfrey he
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> This issue aims to introduce planner rules about semi/anti join, rules 
> include:
> 1. {{FlinkSemiAntiJoinFilterTransposeRule}} that pushes semi/anti join down 
> in a tree past a filter
> 2. {{FlinkSemiAntiJoinJoinTransposeRule}} that pushes semi/anti join down in 
> a tree past a non semi/anti join
> 3. {{FlinkSemiAntiJoinProjectTransposeRule}} that push semi/anti join down in 
> a tree past a project
> 4. {{ProjectSemiAntiJoinTransposeRule}} that pushes a project down in a tree 
> past a semi/anti join
> planner rules about non semi/anti join will be introduced in 
> [FLINK-12509|https://issues.apache.org/jira/browse/FLINK-12509].



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] KurtYoung merged pull request #8450: [FLINK-12519] [table-planner-blink] Introduce planner rules about semi/anti join

2019-05-18 Thread GitBox
KurtYoung merged pull request #8450: [FLINK-12519] [table-planner-blink] 
Introduce planner rules about semi/anti join
URL: https://github.com/apache/flink/pull/8450
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #8416: [FLINK-12331] Introduce partition/gate setup to decouple task registration with NetworkEnvironment

2019-05-18 Thread GitBox
azagrebin commented on a change in pull request #8416: [FLINK-12331] Introduce 
partition/gate setup to decouple task registration with NetworkEnvironment
URL: https://github.com/apache/flink/pull/8416#discussion_r285340688
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelBuilder.java
 ##
 @@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.consumer;
+
+import org.apache.flink.runtime.io.network.ConnectionID;
+import org.apache.flink.runtime.io.network.ConnectionManager;
+import org.apache.flink.runtime.io.network.LocalConnectionManager;
+import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.io.network.TaskEventDispatcher;
+import org.apache.flink.runtime.io.network.TaskEventPublisher;
+import org.apache.flink.runtime.io.network.metrics.InputChannelMetrics;
+import org.apache.flink.runtime.io.network.partition.InputChannelTestUtils;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
+
+import java.net.InetSocketAddress;
+
+/**
+ * Builder for various {@link InputChannel} types.
+ */
+public class InputChannelBuilder {
+   static final ConnectionID STUB_CONNECTION_ID =
+   new ConnectionID(new InetSocketAddress("localhost", 5000), 0);
+
+   private int channelIndex = 0;
+   private ResultPartitionID partitionId = new ResultPartitionID();
+   private ConnectionID connectionID = STUB_CONNECTION_ID;
+   private ResultPartitionManager partitionManager = new 
ResultPartitionManager();
+   private TaskEventPublisher taskEventPublisher = new 
TaskEventDispatcher();
+   private ConnectionManager connectionManager = new 
LocalConnectionManager();
+   private int initialBackoff = 0;
+   private int maxBackoff = 0;
+   private InputChannelMetrics metrics = 
InputChannelTestUtils.newUnregisteredInputChannelMetrics();
+
+   public static InputChannelBuilder newBuilder() {
+   return new InputChannelBuilder();
+   }
+
+   public InputChannelBuilder setChannelIndex(int channelIndex) {
+   this.channelIndex = channelIndex;
+   return this;
+   }
+
+   public InputChannelBuilder setPartitionId(ResultPartitionID 
partitionId) {
+   this.partitionId = partitionId;
+   return this;
+   }
+
+   public InputChannelBuilder setPartitionManager(ResultPartitionManager 
partitionManager) {
+   this.partitionManager = partitionManager;
+   return this;
+   }
+
+   InputChannelBuilder setTaskEventPublisher(TaskEventPublisher 
taskEventPublisher) {
+   this.taskEventPublisher = taskEventPublisher;
+   return this;
+   }
+
+   public InputChannelBuilder setConnectionManager(ConnectionManager 
connectionManager) {
+   this.connectionManager = connectionManager;
+   return this;
+   }
+
+   public InputChannelBuilder setInitialBackoff(int initialBackoff) {
+   this.initialBackoff = initialBackoff;
+   return this;
+   }
+
+   public InputChannelBuilder setMaxBackoff(int maxBackoff) {
+   this.maxBackoff = maxBackoff;
+   return this;
+   }
+
+   public InputChannelBuilder setMetrics(InputChannelMetrics metrics) {
+   this.metrics = metrics;
+   return this;
+   }
+
+   InputChannelBuilder setupFromNetworkEnvironment(NetworkEnvironment 
network) {
+   this.partitionManager = network.getResultPartitionManager();
+   this.connectionManager = network.getConnectionManager();
+   this.initialBackoff = 
network.getConfiguration().partitionRequestInitialBackoff();
+   this.maxBackoff = 
network.getConfiguration().partitionRequestMaxBackoff();
+   return this;
+   }
+
+   UnknownInputChannel buildUnknown(SingleInputGate inputGate) {
+   UnknownInputChannel channel = new UnknownInputChannel(
+ 

[GitHub] [flink] azagrebin commented on a change in pull request #8416: [FLINK-12331] Introduce partition/gate setup to decouple task registration with NetworkEnvironment

2019-05-18 Thread GitBox
azagrebin commented on a change in pull request #8416: [FLINK-12331] Introduce 
partition/gate setup to decouple task registration with NetworkEnvironment
URL: https://github.com/apache/flink/pull/8416#discussion_r285340619
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java
 ##
 @@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.consumer;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.taskmanager.NoOpTaskActions;
+import org.apache.flink.runtime.taskmanager.TaskActions;
+
+/**
+ * Utility class to encapsulate the logic of building a {@link 
SingleInputGate} instance.
+ */
+public class SingleInputGateBuilder {
+
+   private JobID jobId = new JobID();
+
+   private IntermediateDataSetID intermediateDataSetID = new 
IntermediateDataSetID();
 
 Review comment:
   though builder is not supposed to be immutable in general, we can keep it 
final for now and change if needed


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #8416: [FLINK-12331] Introduce partition/gate setup to decouple task registration with NetworkEnvironment

2019-05-18 Thread GitBox
azagrebin commented on a change in pull request #8416: [FLINK-12331] Introduce 
partition/gate setup to decouple task registration with NetworkEnvironment
URL: https://github.com/apache/flink/pull/8416#discussion_r285340366
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
 ##
 @@ -114,11 +114,12 @@ public void testConcurrentConsumeMultiplePartitions() 
throws Exception {
.setNumTargetKeyGroups(parallelism)
.setResultPartitionManager(partitionManager)
.setSendScheduleOrUpdateConsumersMessage(true)
+   .setBufferPoolFactory(p ->
+   
networkBuffers.createBufferPool(producerBufferPoolSize, producerBufferPoolSize))
.build();
 
// Create a buffer pool for this partition
 
 Review comment:
   well, this is still what actually happens atm, why is it out-dated?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #8416: [FLINK-12331] Introduce partition/gate setup to decouple task registration with NetworkEnvironment

2019-05-18 Thread GitBox
azagrebin commented on a change in pull request #8416: [FLINK-12331] Introduce 
partition/gate setup to decouple task registration with NetworkEnvironment
URL: https://github.com/apache/flink/pull/8416#discussion_r285339824
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java
 ##
 @@ -88,7 +90,9 @@ public void testConsumptionWithRemoteChannels() throws 
Exception {
final ConnectionManager connManager = 
createDummyConnectionManager();
final Source[] sources = new Source[numberOfChannels];
 
-   final SingleInputGate gate = 
createSingleInputGate(numberOfChannels);
 
 Review comment:
   True, actually I do not mind short-hands. Though, I would remove one of 
short-hands with 3 parameters. I think builder is more readable in case of many 
parameters.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-12424) Supports dag (multiple-sinks query) optimization

2019-05-18 Thread Kurt Young (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12424?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young closed FLINK-12424.
--
   Resolution: Implemented
Fix Version/s: 1.9.0

merge in 1.9.0: e038a801a87f25b30a1b47ffe5710e5d9bd44c9b

> Supports dag (multiple-sinks query) optimization
> 
>
> Key: FLINK-12424
> URL: https://issues.apache.org/jira/browse/FLINK-12424
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / Planner
>Reporter: godfrey he
>Assignee: godfrey he
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
> Attachments: image-2019-05-07-13-33-02-793.png
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> Currently, Flink planner will optimize the plan in {{writeToSink}} method. If 
> there are more than one sink in a query, each sink-tree will be optimized 
> independent and the result execution plans are also completely independent. 
> Actually, there is a high probability of duplicate computing for a 
> multiple-sinks query. This issue aims to resolve the above problem. 
> The basic idea of the solution is as following: 
> 1. lazy optimization: does not optimize the plan in {{writeToSink}} method, 
> just puts the plan into a collection.
> 2. whole plan optimization and execution: a new {{execute}} method is added 
> in {{TableEnvironment}}, this method will trigger whole plan optimization and 
> execute the job.
> The basic idea of dag (multiple-sinks query) optimization:
> 1. decompose the dag into different block, the leaf block is the common 
> sub-plan
> 2. optimize each block from leaf block to root block, each block only needs 
> to be optimized once
> e.g. 
> {code:scala}
> val table = tableEnv.sqlQuery("select * from (select a as a1, b as b1 from 
> MyTable where a > 0) t1, (select b as b2, c as c2 from MyTable where c is not 
> null) t2 where a1 = b2")
> tableEnv.registerTable("TempTable", table)
> val table1 = tableEnv.sqlQuery("select a1, b1 from TempTable where a1 >= 70")
> tableEnv.writeToSink(table1, Sink1)
> val table2 = tableEnv.sqlQuery("select a1, c2 from TempTable where a1 < 70")
> tableEnv.writeToSink(table2, Sink2)
> {code}
>  !image-2019-05-07-13-33-02-793.png! 
> the above plan will be decomposed into 3 blocks, block1 is the input of 
> block2 and block3. block2 and block3 will be optimized after block1 has been 
> optimized.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] KurtYoung merged pull request #8356: [FLINK-12424] [table-planner-blink] Supports dag (multiple-sinks query) optimization

2019-05-18 Thread GitBox
KurtYoung merged pull request #8356: [FLINK-12424] [table-planner-blink] 
Supports dag (multiple-sinks query) optimization
URL: https://github.com/apache/flink/pull/8356
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-6263) Leader error in Kafka producer on leader change (broker restart/failrue)

2019-05-18 Thread Truong Duc Kien (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-6263?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16843135#comment-16843135
 ] 

Truong Duc Kien commented on FLINK-6263:


I think this is a duplicate of https://issues.apache.org/jira/browse/FLINK-3066 

The workaround mentioned in that bug is 
{quote}Set the number of retries to a value above 0. By default Kafka sets it 
to 0 to avoid duplicates.
{quote}
 

> Leader error in Kafka producer on leader change (broker restart/failrue)
> 
>
> Key: FLINK-6263
> URL: https://issues.apache.org/jira/browse/FLINK-6263
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.2.0
>Reporter: Gyula Fora
>Priority: Major
>
> We have observed the following error in the Kafka producer
> java.lang.Exception: Failed to send data to Kafka: This server is not the 
> leader for that topic-partition.
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:376)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.invoke(FlinkKafkaProducerBase.java:293)
>   at 
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:38)
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:185)
>   at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:63)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:261)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:656)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.kafka.common.errors.NotLeaderForPartitionException: 
> This server is not the leader for that topic-partition.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] lamber-ken commented on issue #8265: [FLINK-12302][runtime] Fixed the wrong finalStatus of yarn application when application finished

2019-05-18 Thread GitBox
lamber-ken commented on issue #8265: [FLINK-12302][runtime] Fixed the wrong 
finalStatus of yarn application when application finished
URL: https://github.com/apache/flink/pull/8265#issuecomment-493664892
 
 
   hi, I given detail steps to reproduc this problem @GJL, 
[JIRA-12302](https://issues.apache.org/jira/browse/FLINK-12302) .


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services