[GitHub] ibuenros commented on issue #2548: [GOBBLIN-677] - Allow early termination of Gobblin jobs based on a predicate on the job progress

2019-02-07 Thread GitBox
ibuenros commented on issue #2548: [GOBBLIN-677] - Allow early termination of 
Gobblin jobs based on a predicate on the job progress
URL: 
https://github.com/apache/incubator-gobblin/pull/2548#issuecomment-461545394
 
 
   @htran1 @sv2000 can you take a look?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] ibuenros opened a new pull request #2548: [GOBBLIN-677] - Allow early termination of Gobblin jobs based on a predicate on the job progress

2019-02-07 Thread GitBox
ibuenros opened a new pull request #2548: [GOBBLIN-677] - Allow early 
termination of Gobblin jobs based on a predicate on the job progress
URL: https://github.com/apache/incubator-gobblin/pull/2548
 
 
   Dear Gobblin maintainers,
   
   Please accept this PR. I understand that it will not be reviewed until I 
have checked off all the steps below!
   
   
   ### JIRA
   - [ ] My PR addresses the following [Gobblin 
JIRA](https://issues.apache.org/jira/browse/GOBBLIN/) issues and references 
them in the PR title. For example, "[GOBBLIN-XXX] My Gobblin PR"
   - https://issues.apache.org/jira/browse/GOBBLIN-677
   
   
   ### Description
   - [ ] Here are some details about my PR, including screenshots (if 
applicable):
   
   This PR adds the possibility of terminating a job early once a predicate 
(based on the job progress is satisfied). 
   
   There are four main changes:
   - `ReflectivePredicateEvaluator` is an evaluator for SQL queries which takes 
as an input a list of objects implementing a reference type. Each object is 
considered a row, and each public method in the reference type is a column. The 
evaluator runs the expression on the list of objects and expects a result with 
a single boolean column, which is then `AND`'d to obtain the result of the 
predicate. This is powered by Calcite, and can be used for expressive policies 
in a number of use cases.
   - `FiniteStateMachine` is a thread-safe finite state machine implementation 
that gates certain actions on the current state of the machine and whether a 
transition is legal. It is used, for example, to manage the state of the MR job 
launcher and make sure that interruptions and cancels work correctly regardless 
of whether the MR job is pending, running, or already finished.
   - Actual early termination logic. `JobInterruptionPredicate` used a 
`ReflectivePredicateEvaluator` with base type `JobProgress` to evaluate a 
predicate on the job progress every 30 seconds (for example, `SELECT 
elapsedTime > (30 * 60 * 1000) AND completedTasks >= totalTasks * 0.75 FROM 
jobProgress`). Once the predicate is satisfied, it calls a shutdown trigger 
specific to the current job launcher.
   - For `MRJobLauncher`, the early termination is driven via creating a 
specific file in the `mrJobDir`. The Gobblin mappers, once they find this file, 
indicate to the `Extractor` that they should terminate. If the `Extractor` 
understands this command, it is supposed to do a graceful early termination of 
extraction, committing watermarks, etc. If it does not recognize it, an 
exception is thrown triggering a task failure. An implementation for graceful 
early termination of `KafkaExtractor` is provided.
   
   ### Tests
   - [ ] My PR adds the following unit tests __OR__ does not need testing for 
this extremely good reason:
   
   There are a bunch of unit tests for the individual components. Additionally, 
tests where run on actual Gobblin executions.
   
   ### Commits
   - [ ] My commits all reference JIRA issues in their subject lines, and I 
have squashed multiple commits if they address the same issue. In addition, my 
commits follow the guidelines from "[How to write a good git commit 
message](http://chris.beams.io/posts/git-commit/)":
   1. Subject is separated from body by a blank line
   2. Subject is limited to 50 characters
   3. Subject does not end with a period
   4. Subject uses the imperative mood ("add", not "adding")
   5. Body wraps at 72 characters
   6. Body explains "what" and "why", not "how"


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] ibuenros commented on issue #2547: [GOBBLIN-677] - Allow early termination of Gobblin jobs based on a predicate on the job progress

2019-02-07 Thread GitBox
ibuenros commented on issue #2547: [GOBBLIN-677] - Allow early termination of 
Gobblin jobs based on a predicate on the job progress
URL: 
https://github.com/apache/incubator-gobblin/pull/2547#issuecomment-461545328
 
 
   Replaced by https://github.com/apache/incubator-gobblin/pull/2548


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] ibuenros closed pull request #2547: [GOBBLIN-677] - Allow early termination of Gobblin jobs based on a predicate on the job progress

2019-02-07 Thread GitBox
ibuenros closed pull request #2547: [GOBBLIN-677] - Allow early termination of 
Gobblin jobs based on a predicate on the job progress
URL: https://github.com/apache/incubator-gobblin/pull/2547
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sv2000 opened a new pull request #2549: GOBBLIN-678: Make flow.executionId available in the GaaS Flow config …

2019-02-07 Thread GitBox
sv2000 opened a new pull request #2549: GOBBLIN-678: Make flow.executionId 
available in the GaaS Flow config …
URL: https://github.com/apache/incubator-gobblin/pull/2549
 
 
   …for use in job templates.
   
   Dear Gobblin maintainers,
   
   Please accept this PR. I understand that it will not be reviewed until I 
have checked off all the steps below!
   
   
   ### JIRA
   - [x] My PR addresses the following [Gobblin 
JIRA](https://issues.apache.org/jira/browse/GOBBLIN/) issues and references 
them in the PR title. For example, "[GOBBLIN-XXX] My Gobblin PR"
   - https://issues.apache.org/jira/browse/GOBBLIN-678
   
   
   ### Description
   - [x] Here are some details about my PR, including screenshots (if 
applicable):
   Job templates would like to reference flow.executionId in the job properties 
(e.g. to create tmp directories that use execution id as part of the directory 
path). Currently, for scheduled jobs, the flow execution id is not made 
available in the flow config.  
   
   
   
   ### Tests
   - [x] My PR adds the following unit tests __OR__ does not need testing for 
this extremely good reason:
   MultiHopFlowCompilerTest - changed job template to use flow.executionId and 
ensure the template is resolved during compilation.
   
   ### Commits
   - [x] My commits all reference JIRA issues in their subject lines, and I 
have squashed multiple commits if they address the same issue. In addition, my 
commits follow the guidelines from "[How to write a good git commit 
message](http://chris.beams.io/posts/git-commit/)":
   1. Subject is separated from body by a blank line
   2. Subject is limited to 50 characters
   3. Subject does not end with a period
   4. Subject uses the imperative mood ("add", not "adding")
   5. Body wraps at 72 characters
   6. Body explains "what" and "why", not "how"
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] kyuamazon opened a new pull request #2550: [GOBBLIN-679] Refactor cluster task metrics

2019-02-07 Thread GitBox
kyuamazon opened a new pull request #2550: [GOBBLIN-679] Refactor cluster task 
metrics
URL: https://github.com/apache/incubator-gobblin/pull/2550
 
 
   Dear Gobblin maintainers,
   
   Please accept this PR. I understand that it will not be reviewed until I 
have checked off all the steps below!
   
   
   ### JIRA
   - [x] My PR addresses the following [Gobblin 
JIRA](https://issues.apache.org/jira/browse/GOBBLIN/) issues and references 
them in the PR title. For example, "[GOBBLIN-XXX] My Gobblin PR"
   - https://issues.apache.org/jira/browse/GOBBLIN-679
   
   
   ### Description
   - [x] Here are some details about my PR, including screenshots (if 
applicable):
  1) Refactor the GobblinHelixTaskMetrics
  2) Add the timer for task execution.
   
   ### Tests
   - [x] My PR adds the following unit tests __OR__ does not need testing for 
this extremely good reason:
  
   
   ### Commits
   - [x] My commits all reference JIRA issues in their subject lines, and I 
have squashed multiple commits if they address the same issue. In addition, my 
commits follow the guidelines from "[How to write a good git commit 
message](http://chris.beams.io/posts/git-commit/)":
   1. Subject is separated from body by a blank line
   2. Subject is limited to 50 characters
   3. Subject does not end with a period
   4. Subject uses the imperative mood ("add", not "adding")
   5. Body wraps at 72 characters
   6. Body explains "what" and "why", not "how"
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] autumnust opened a new pull request #2551: [GOBBLIN-680] Enhance error handling on task creation

2019-02-07 Thread GitBox
autumnust opened a new pull request #2551: [GOBBLIN-680] Enhance error handling 
on task creation
URL: https://github.com/apache/incubator-gobblin/pull/2551
 
 
   Dear Gobblin maintainers,
   
   Please accept this PR. I understand that it will not be reviewed until I 
have checked off all the steps below!
   
   In the process of creating `Task` object, especially in customized task 
execution framework, we usually see huge memory consumption in `Task` itself 
and it can run into `OutOfMemoryError` from time to time in one of our internal 
pipelines. `OutOfMemoryError` will not be caught by original implementation 
unfortunately since it is not an `Exception`. We added this implementation and 
corresponding testing code to make sure that, whenever there are virtual 
machine error like `OutOfMemoryError` happening, the `countDownLatch` is still 
being correctly handled. 
   
   
   ### JIRA
   - [x] My PR addresses the following [Gobblin 
JIRA](https://issues.apache.org/jira/browse/GOBBLIN/) issues and references 
them in the PR title. For example, "[GOBBLIN-680] My Gobblin PR"
   - https://issues.apache.org/jira/browse/GOBBLIN-680
   
   
   ### Description
   - [x] Here are some details about my PR, including screenshots (if 
applicable):
   
   
   ### Tests
   - [x] My PR adds the following unit tests __OR__ does not need testing for 
this extremely good reason:
   
   
   ### Commits
   - [x] My commits all reference JIRA issues in their subject lines, and I 
have squashed multiple commits if they address the same issue. In addition, my 
commits follow the guidelines from "[How to write a good git commit 
message](http://chris.beams.io/posts/git-commit/)":
   1. Subject is separated from body by a blank line
   2. Subject is limited to 50 characters
   3. Subject does not end with a period
   4. Subject uses the imperative mood ("add", not "adding")
   5. Body wraps at 72 characters
   6. Body explains "what" and "why", not "how"
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] autumnust commented on issue #2551: [GOBBLIN-680] Enhance error handling on task creation

2019-02-07 Thread GitBox
autumnust commented on issue #2551: [GOBBLIN-680] Enhance error handling on 
task creation
URL: 
https://github.com/apache/incubator-gobblin/pull/2551#issuecomment-461661507
 
 
   @ibuenros  @htran1 @sv2000  can you help review ? Thanks. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] asfgit closed pull request #2549: GOBBLIN-678: Make flow.executionId available in the GaaS Flow config …

2019-02-07 Thread GitBox
asfgit closed pull request #2549: GOBBLIN-678: Make flow.executionId available 
in the GaaS Flow config …
URL: https://github.com/apache/incubator-gobblin/pull/2549
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] arjun4084346 opened a new pull request #2552: [GOBBLIN-681] increase max size of job name

2019-02-08 Thread GitBox
arjun4084346 opened a new pull request #2552: [GOBBLIN-681] increase max size 
of job name
URL: https://github.com/apache/incubator-gobblin/pull/2552
 
 
   Dear Gobblin maintainers,
   
   Please accept this PR. I understand that it will not be reviewed until I 
have checked off all the steps below! @sv2000 please review
   
   
   ### JIRA
   - [x] My PR addresses the following [Gobblin 
JIRA](https://issues.apache.org/jira/browse/GOBBLIN/) issues and references 
them in the PR title. For example, "[GOBBLIN-XXX] My Gobblin PR"
   - https://issues.apache.org/jira/browse/GOBBLIN-XXX
   
   
   ### Description
   - [x] Here are some details about my PR, including screenshots (if 
applicable):
   it just increases the max size of a job name. multi hop job name needs 
longer name because to make it unique
   
   ### Tests
   - [x] My PR adds the following unit tests __OR__ does not need testing for 
this extremely good reason:
   too trivial to add test case
   
   ### Commits
   - [x] My commits all reference JIRA issues in their subject lines, and I 
have squashed multiple commits if they address the same issue. In addition, my 
commits follow the guidelines from "[How to write a good git commit 
message](http://chris.beams.io/posts/git-commit/)":
   1. Subject is separated from body by a blank line
   2. Subject is limited to 50 characters
   3. Subject does not end with a period
   4. Subject uses the imperative mood ("add", not "adding")
   5. Body wraps at 72 characters
   6. Body explains "what" and "why", not "how"
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] yukuai518 opened a new pull request #2553: [GOBBLIN-679] Refactor GobblinHelixTask metrics

2019-02-08 Thread GitBox
yukuai518 opened a new pull request #2553: [GOBBLIN-679] Refactor 
GobblinHelixTask metrics
URL: https://github.com/apache/incubator-gobblin/pull/2553
 
 
   Dear Gobblin maintainers,
   
   Please accept this PR. I understand that it will not be reviewed until I 
have checked off all the steps below!
   
   
   ### JIRA
   - [x] My PR addresses the following [Gobblin 
JIRA](https://issues.apache.org/jira/browse/GOBBLIN/) issues and references 
them in the PR title. For example, "[GOBBLIN-XXX] My Gobblin PR"
   - https://issues.apache.org/jira/browse/GOBBLIN-679
   
   
   ### Description
   - [x] Here are some details about my PR, including screenshots (if 
applicable):
   Refactor GobblinHelixTask metrics
   
   ### Tests
   - [x] My PR adds the following unit tests __OR__ does not need testing for 
this extremely good reason:
   
   
   ### Commits
   - [x] My commits all reference JIRA issues in their subject lines, and I 
have squashed multiple commits if they address the same issue. In addition, my 
commits follow the guidelines from "[How to write a good git commit 
message](http://chris.beams.io/posts/git-commit/)":
   1. Subject is separated from body by a blank line
   2. Subject is limited to 50 characters
   3. Subject does not end with a period
   4. Subject uses the imperative mood ("add", not "adding")
   5. Body wraps at 72 characters
   6. Body explains "what" and "why", not "how"
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] kyuamazon closed pull request #2550: [GOBBLIN-679] Refactor cluster task metrics

2019-02-08 Thread GitBox
kyuamazon closed pull request #2550: [GOBBLIN-679] Refactor cluster task metrics
URL: https://github.com/apache/incubator-gobblin/pull/2550
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] yukuai518 commented on issue #2553: [GOBBLIN-679] Refactor GobblinHelixTask metrics

2019-02-08 Thread GitBox
yukuai518 commented on issue #2553: [GOBBLIN-679] Refactor GobblinHelixTask 
metrics
URL: 
https://github.com/apache/incubator-gobblin/pull/2553#issuecomment-461965924
 
 
   @htran1 Can you help review?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] asfgit closed pull request #2551: [GOBBLIN-680] Enhance error handling on task creation

2019-02-08 Thread GitBox
asfgit closed pull request #2551: [GOBBLIN-680] Enhance error handling on task 
creation
URL: https://github.com/apache/incubator-gobblin/pull/2551
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] asfgit closed pull request #2552: [GOBBLIN-681] increase max size of job name

2019-02-08 Thread GitBox
asfgit closed pull request #2552: [GOBBLIN-681] increase max size of job name
URL: https://github.com/apache/incubator-gobblin/pull/2552
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] asfgit closed pull request #2553: [GOBBLIN-679] Refactor GobblinHelixTask metrics

2019-02-11 Thread GitBox
asfgit closed pull request #2553: [GOBBLIN-679] Refactor GobblinHelixTask 
metrics
URL: https://github.com/apache/incubator-gobblin/pull/2553
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sv2000 opened a new pull request #2554: GOBBLIN-682: Create a new constructor for DatasetCleanerJob.

2019-02-12 Thread GitBox
sv2000 opened a new pull request #2554: GOBBLIN-682: Create a new constructor 
for DatasetCleanerJob.
URL: https://github.com/apache/incubator-gobblin/pull/2554
 
 
   Dear Gobblin maintainers,
   
   Please accept this PR. I understand that it will not be reviewed until I 
have checked off all the steps below!
   
   
   ### JIRA
   - [x] My PR addresses the following [Gobblin 
JIRA](https://issues.apache.org/jira/browse/GOBBLIN/) issues and references 
them in the PR title. For example, "[GOBBLIN-XXX] My Gobblin PR"
   - https://issues.apache.org/jira/browse/GOBBLIN-682
   
   
   ### Description
   - [x] Here are some details about my PR, including screenshots (if 
applicable):
   Current DatasetCleanerJob constructor only accepts config passed as 
azkaban.utils.Props. Here, we implement a new construct that also accepts 
java.util.Properties.
   
   
   ### Tests
   - [x] My PR adds the following unit tests __OR__ does not need testing for 
this extremely good reason:
   Tested in local deployment.
   
   ### Commits
   -  [x] My commits all reference JIRA issues in their subject lines, and I 
have squashed multiple commits if they address the same issue. In addition, my 
commits follow the guidelines from "[How to write a good git commit 
message](http://chris.beams.io/posts/git-commit/)":
   1. Subject is separated from body by a blank line
   2. Subject is limited to 50 characters
   3. Subject does not end with a period
   4. Subject uses the imperative mood ("add", not "adding")
   5. Body wraps at 72 characters
   6. Body explains "what" and "why", not "how"
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sv2000 commented on a change in pull request #2548: [GOBBLIN-677] - Allow early termination of Gobblin jobs based on a predicate on the job progress

2019-02-13 Thread GitBox
sv2000 commented on a change in pull request #2548: [GOBBLIN-677] - Allow early 
termination of Gobblin jobs based on a predicate on the job progress
URL: https://github.com/apache/incubator-gobblin/pull/2548#discussion_r256496993
 
 

 ##
 File path: 
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
 ##
 @@ -170,6 +181,12 @@ public MRJobLauncher(Properties jobProps, Configuration 
conf, SharedResourcesBro
   throws Exception {
 super(jobProps, metadataTags);
 
+this.fsm = new FiniteStateMachine.Builder()
 
 Review comment:
   This FSM instance (and the associated states) looks generic enough that it 
can be moved out of this class. Can we have a FSM factory class that builds out 
specific FSM types and hands it back to the caller?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sv2000 commented on a change in pull request #2548: [GOBBLIN-677] - Allow early termination of Gobblin jobs based on a predicate on the job progress

2019-02-13 Thread GitBox
sv2000 commented on a change in pull request #2548: [GOBBLIN-677] - Allow early 
termination of Gobblin jobs based on a predicate on the job progress
URL: https://github.com/apache/incubator-gobblin/pull/2548#discussion_r256505082
 
 

 ##
 File path: 
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
 ##
 @@ -286,17 +316,57 @@ protected void runWorkUnits(List workUnits) 
throws Exception {
 
   @Override
   protected void executeCancellation() {
-try {
-  if (this.hadoopJobSubmitted && !this.job.isComplete()) {
-LOG.info("Killing the Hadoop MR job for job " + 
this.jobContext.getJobId());
-this.job.killJob();
-// Collect final task states.
-this.taskStateCollectorService.stopAsync().awaitTerminated();
+try (FiniteStateMachine.Transition transition = 
this.fsm.startTransition(MRJobLauncherState.CANCELLED)) {
+  if (transition.getStartState() == MRJobLauncherState.RUNNING) {
+try {
+  LOG.info("Killing the Hadoop MR job for job " + 
this.jobContext.getJobId());
+  this.job.killJob();
+  // Collect final task states.
+  this.taskStateCollectorService.stopAsync().awaitTerminated();
+} catch (IOException ioe) {
+  LOG.error("Failed to kill the Hadoop MR job for job " + 
this.jobContext.getJobId());
+  transition.changeEndState(MRJobLauncherState.FAILED);
+}
+  }
+} catch (FiniteStateMachine.UnallowedTransitionException | 
InterruptedException exc) {
+  LOG.error("Failed to cancel job " + this.jobContext.getJobId(), exc);
+}
+  }
+
+  /**
+   * Attempt a gracious interruptiong of the running job
 
 Review comment:
   Spelling error - interruption instead of interruptiong


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sv2000 commented on a change in pull request #2548: [GOBBLIN-677] - Allow early termination of Gobblin jobs based on a predicate on the job progress

2019-02-13 Thread GitBox
sv2000 commented on a change in pull request #2548: [GOBBLIN-677] - Allow early 
termination of Gobblin jobs based on a predicate on the job progress
URL: https://github.com/apache/incubator-gobblin/pull/2548#discussion_r256509276
 
 

 ##
 File path: 
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
 ##
 @@ -252,26 +270,38 @@ protected void runWorkUnits(List workUnits) 
throws Exception {
   this.taskStateCollectorService.startAsync().awaitRunning();
 
   LOG.info("Launching Hadoop MR job " + this.job.getJobName());
-  this.job.submit();
-  this.hadoopJobSubmitted = true;
+  try (FiniteStateMachine.Transition t = 
this.fsm.startTransition(MRJobLauncherState.RUNNING)) {
+try {
+  this.job.submit();
+} catch (Throwable exc) {
+  t.changeEndState(MRJobLauncherState.FAILED);
+  throw exc;
+}
+this.hadoopJobSubmitted = true;
 
-  // Set job tracking URL to the Hadoop job tracking URL if it is not set 
yet
-  if (!jobState.contains(ConfigurationKeys.JOB_TRACKING_URL_KEY)) {
-jobState.setProp(ConfigurationKeys.JOB_TRACKING_URL_KEY, 
this.job.getTrackingURL());
+// Set job tracking URL to the Hadoop job tracking URL if it is not 
set yet
+if (!jobState.contains(ConfigurationKeys.JOB_TRACKING_URL_KEY)) {
+  jobState.setProp(ConfigurationKeys.JOB_TRACKING_URL_KEY, 
this.job.getTrackingURL());
+}
+  } catch (FiniteStateMachine.UnallowedTransitionException unallowed) {
+LOG.info("Cannot start MR job.", unallowed);
   }
 
-  TimingEvent mrJobRunTimer = 
this.eventSubmitter.getTimingEvent(TimingEvent.RunJobTimings.MR_JOB_RUN);
-  LOG.info(String.format("Waiting for Hadoop MR job %s to complete", 
this.job.getJobID()));
-  this.job.waitForCompletion(true);
-  mrJobRunTimer.stop(ImmutableMap.of("hadoopMRJobId", 
this.job.getJobID().toString()));
+  if (this.fsm.getCurrentState().equals(MRJobLauncherState.RUNNING)) {
+JobInterruptionPredicate jobInterruptionPredicate = new 
JobInterruptionPredicate(jobState, this::interruptJob, true);
 
 Review comment:
   Can the JobInterruptionPredicate be made part of the FSM definition itself? 
I am thinking in general, each state of the FSM could potentially have a 
Predicate service of its own. The service is started/stopped as states change. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sv2000 commented on a change in pull request #2548: [GOBBLIN-677] - Allow early termination of Gobblin jobs based on a predicate on the job progress

2019-02-13 Thread GitBox
sv2000 commented on a change in pull request #2548: [GOBBLIN-677] - Allow early 
termination of Gobblin jobs based on a predicate on the job progress
URL: https://github.com/apache/incubator-gobblin/pull/2548#discussion_r256510193
 
 

 ##
 File path: 
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
 ##
 @@ -679,6 +751,12 @@ protected void setup(Context context) {
 @Override
 public void run(Context context) throws IOException, InterruptedException {
   this.setup(context);
+
+  Path interruptPath = new 
Path(context.getConfiguration().get(GOBBLIN_JOB_INTERRUPT_PATH_KEY));
+  if (this.fs.exists(interruptPath)) {
 
 Review comment:
   A log message here will be useful for debugging purposes.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sv2000 commented on a change in pull request #2548: [GOBBLIN-677] - Allow early termination of Gobblin jobs based on a predicate on the job progress

2019-02-13 Thread GitBox
sv2000 commented on a change in pull request #2548: [GOBBLIN-677] - Allow early 
termination of Gobblin jobs based on a predicate on the job progress
URL: https://github.com/apache/incubator-gobblin/pull/2548#discussion_r256513391
 
 

 ##
 File path: 
gobblin-runtime/src/test/java/org/apache/gobblin/util/ReflectivePredicateEvaluatorTest.java
 ##
 @@ -0,0 +1,108 @@
+package org.apache.gobblin.util;
+
+import org.junit.Assert;
+import org.testng.annotations.Test;
+
+import lombok.Data;
+
+public class ReflectivePredicateEvaluatorTest {
+
+   @Test
+   public void simpleTest() throws Exception {
+   ReflectivePredicateEvaluator evaluator = new 
ReflectivePredicateEvaluator(
+   "SELECT anInt = 1 FROM myInterface", 
MyInterface.class);
+
+   Assert.assertTrue(evaluator.evaluate(new MyImplementation(1, 
"foo")));
+   Assert.assertFalse(evaluator.evaluate(new MyImplementation(2, 
"foo")));
+
+   Assert.assertTrue(evaluator.evaluate("SELECT anInt = 1 OR 
aString = 'foo' FROM myInterface",
+   new MyImplementation(1, "bar")));
+   Assert.assertTrue(evaluator.evaluate("SELECT anInt = 1 OR 
aString = 'foo' FROM myInterface",
+   new MyImplementation(2, "foo")));
+   Assert.assertFalse(evaluator.evaluate("SELECT anInt = 1 OR 
aString = 'foo' FROM myInterface",
+   new MyImplementation(2, "bar")));
+   }
+
+   @Test
+   public void testWithAggregations() throws Exception {
+   ReflectivePredicateEvaluator evaluator = new 
ReflectivePredicateEvaluator(
+   "SELECT sum(anInt) = 5 FROM myInterface", 
MyInterface.class);
+
+   Assert.assertFalse(evaluator.evaluate(new MyImplementation(1, 
"foo")));
+   Assert.assertTrue(evaluator.evaluate(new MyImplementation(1, 
"foo"), new MyImplementation(4, "foo")));
+   Assert.assertFalse(evaluator.evaluate(new MyImplementation(2, 
"foo"), new MyImplementation(4, "foo")));
+   }
+
+   @Test
+   public void testWithAggregationsAndFilter() throws Exception {
+   ReflectivePredicateEvaluator evaluator = new 
ReflectivePredicateEvaluator(
+   "SELECT sum(anInt) = 5 FROM myInterface WHERE 
aString = 'foo'", MyInterface.class);
+
+   Assert.assertFalse(evaluator.evaluate(new MyImplementation(1, 
"foo")));
+   Assert.assertTrue(evaluator.evaluate(new MyImplementation(1, 
"foo"), new MyImplementation(4, "foo"), new MyImplementation(4, "bar")));
+   Assert.assertFalse(evaluator.evaluate(new MyImplementation(1, 
"foo"), new MyImplementation(4, "foo"), new MyImplementation(4, "foo")));
+   }
+
+   @Test
+   public void testMultipleInterfaces() throws Exception {
+   ReflectivePredicateEvaluator evaluator = new 
ReflectivePredicateEvaluator(
+   "SELECT true = ALL (SELECT sum(anInt) = 2 AS 
satisfied FROM myInterface UNION SELECT sum(anInt) = 3 AS satisfied FROM 
myInterface2)",
+   MyInterface.class, MyInterface2.class);
+   Assert.assertFalse(evaluator.evaluate(new MyImplementation(2, 
"foo")));
+   Assert.assertTrue(evaluator.evaluate(new MyImplementation(2, 
"foo"), new MyImplementation2(3)));
+   Assert.assertTrue(evaluator.evaluate(new MyImplementation(1, 
"foo"), new MyImplementation2(3), new MyImplementation(1, "foo")));
+   }
+
+   @Test
+   public void testMultipleOutputs() throws Exception {
+   ReflectivePredicateEvaluator evaluator =
+   new ReflectivePredicateEvaluator("SELECT anInt 
= 1 FROM myInterface", MyInterface.class);
+   Assert.assertTrue(evaluator.evaluate(new MyImplementation(1, 
"bar"), new MyImplementation(1, "foo")));
+   Assert.assertFalse(evaluator.evaluate(new MyImplementation(1, 
"bar"), new MyImplementation(2, "foo")));
+   }
+
+   @Test
+   public void testInvalidSQL() throws Exception {
+   try {
+   ReflectivePredicateEvaluator evaluator =
+   new 
ReflectivePredicateEvaluator("SELECT anInt FROM myInterface", 
MyInterface.class);
+   Assert.fail();
+   } catch (IllegalArgumentException exc) {
+   // Expected
+   }
+   }
+
+   @Test
+   public void testNoOutputs() throws Exception {
+   try {
+   ReflectivePredicateEvaluator evaluator =
+   new 
ReflectivePredicateEvaluator("SELECT anInt = 1 FROM myInterface WHERE aString = 
'foo'",
+   MyInterface.class);
+   evaluator.evaluate(new MyImplementation(1, "b

[GitHub] sv2000 commented on a change in pull request #2548: [GOBBLIN-677] - Allow early termination of Gobblin jobs based on a predicate on the job progress

2019-02-13 Thread GitBox
sv2000 commented on a change in pull request #2548: [GOBBLIN-677] - Allow early 
termination of Gobblin jobs based on a predicate on the job progress
URL: https://github.com/apache/incubator-gobblin/pull/2548#discussion_r256517826
 
 

 ##
 File path: 
gobblin-utility/src/main/java/org/apache/gobblin/fsm/FiniteStateMachine.java
 ##
 @@ -0,0 +1,357 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.fsm;
+
+import java.io.Closeable;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.SetMultimap;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+
+/**
+ * An implementation of a basic FiniteStateMachine that allows keeping track 
of the state its state and gating certain
+ * logic on whether a transition is valid or not.
+ *
+ * This class is useful in situations where logic is complex, possibly 
multi-threaded, and can take multiple paths. Certain
+ * pieces of logic (for example running a job, publishing a dataset, etc) can 
only happen if other actions ended correctly,
+ * and the FSM is a way of simplifying the encoding and verification of those 
conditions. It is understood that state
+ * transitions may not be instantaneous, and that other state transitions 
should not start until the current one has
+ * been resolved.
+ *
+ * All public methods of this class will wait until the FSM is in a 
non-transitioning state. If multiple transitions are
+ * queued at the same time, the order in which they are executed is 
essentially random.
+ *
+ * The states supported by FSM can be enums or instances of any base type. The 
legality of a transition is determined
+ * by equality, i.e. if a transition A -> B is legal, the current state is A' 
and the desired end state is B', the transition
+ * will be legal if A.equals(A') && B.equals(B'). This allows for storing 
additional information into the current state
+ * as long as it does not affect the equality check (i.e. fields that are not 
compared in the equals check can store
+ * state metadata, etc.).
+ *
+ * Suggested Usage:
+ * FiniteStateMachine fsm = new 
FiniteStateMachine.Builder().addTransition(START_SYMBOL, 
END_SYMBOL).build(initialSymbol);
+ *
+ * try (Transition transition = fsm.startTransition(MY_END_STATE)) {
+ *   try {
+ * // my logic
+ *   } catch (MyException exc) {
+ * transition.changeEndState(MY_ERROR);
+ *   }
+ * } catch (UnallowedTransitionException exc) {
+ *   // Cannot execute logic because it's an illegal transition!
+ * } catch (ReentrantStableStateWait exc) {
+ *  // Somewhere in the logic an instruction tried to do an operation with 
the fsm that would likely cause a deadlock
+ * } catch (AbandonedTransitionException exc) {
+ *   // Another thread initiated a transition and became inactive ending the 
transition
+ * } catch (InterruptedException exc) {
+ *   // Could not start transition because thread got interrupted while 
waiting for a non-transitioning state
+ * }
+ *
+ * @param 
+ */
+@Slf4j
+public class FiniteStateMachine {
+
+   /**
+* Used to build a {@link FiniteStateMachine} instance.
+*/
+   public static class Builder {
+   private final SetMultimap allowedTransitions;
+   private final Set universalEnds;
+
+   public Builder() {
+   this.allowedTransitions = HashMultimap.create();
+   this.universalEnds = new HashSet<>();
+   }
+
+   /**
+* Add a legal transition to the {@link FiniteStateMachine}.
+*/
+   public Builder addTransition(T startState, T endState) {
+   this.allowedTransitions.put(startState, endState);
+   return this;
+   }
+
+   /**
+* Specify that a state is a valid end state for a transition 
starting from any state. Useful for example for
+* error sta

[GitHub] sv2000 commented on a change in pull request #2548: [GOBBLIN-677] - Allow early termination of Gobblin jobs based on a predicate on the job progress

2019-02-13 Thread GitBox
sv2000 commented on a change in pull request #2548: [GOBBLIN-677] - Allow early 
termination of Gobblin jobs based on a predicate on the job progress
URL: https://github.com/apache/incubator-gobblin/pull/2548#discussion_r256518557
 
 

 ##
 File path: 
gobblin-utility/src/main/java/org/apache/gobblin/fsm/FiniteStateMachine.java
 ##
 @@ -0,0 +1,357 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.fsm;
+
+import java.io.Closeable;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.SetMultimap;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+
+/**
+ * An implementation of a basic FiniteStateMachine that allows keeping track 
of the state its state and gating certain
+ * logic on whether a transition is valid or not.
+ *
+ * This class is useful in situations where logic is complex, possibly 
multi-threaded, and can take multiple paths. Certain
+ * pieces of logic (for example running a job, publishing a dataset, etc) can 
only happen if other actions ended correctly,
+ * and the FSM is a way of simplifying the encoding and verification of those 
conditions. It is understood that state
+ * transitions may not be instantaneous, and that other state transitions 
should not start until the current one has
+ * been resolved.
+ *
+ * All public methods of this class will wait until the FSM is in a 
non-transitioning state. If multiple transitions are
+ * queued at the same time, the order in which they are executed is 
essentially random.
+ *
+ * The states supported by FSM can be enums or instances of any base type. The 
legality of a transition is determined
+ * by equality, i.e. if a transition A -> B is legal, the current state is A' 
and the desired end state is B', the transition
+ * will be legal if A.equals(A') && B.equals(B'). This allows for storing 
additional information into the current state
+ * as long as it does not affect the equality check (i.e. fields that are not 
compared in the equals check can store
+ * state metadata, etc.).
+ *
+ * Suggested Usage:
+ * FiniteStateMachine fsm = new 
FiniteStateMachine.Builder().addTransition(START_SYMBOL, 
END_SYMBOL).build(initialSymbol);
+ *
+ * try (Transition transition = fsm.startTransition(MY_END_STATE)) {
+ *   try {
+ * // my logic
+ *   } catch (MyException exc) {
+ * transition.changeEndState(MY_ERROR);
+ *   }
+ * } catch (UnallowedTransitionException exc) {
+ *   // Cannot execute logic because it's an illegal transition!
+ * } catch (ReentrantStableStateWait exc) {
+ *  // Somewhere in the logic an instruction tried to do an operation with 
the fsm that would likely cause a deadlock
+ * } catch (AbandonedTransitionException exc) {
+ *   // Another thread initiated a transition and became inactive ending the 
transition
+ * } catch (InterruptedException exc) {
+ *   // Could not start transition because thread got interrupted while 
waiting for a non-transitioning state
+ * }
+ *
+ * @param 
+ */
+@Slf4j
+public class FiniteStateMachine {
+
+   /**
+* Used to build a {@link FiniteStateMachine} instance.
+*/
+   public static class Builder {
+   private final SetMultimap allowedTransitions;
+   private final Set universalEnds;
+
+   public Builder() {
+   this.allowedTransitions = HashMultimap.create();
+   this.universalEnds = new HashSet<>();
+   }
+
+   /**
+* Add a legal transition to the {@link FiniteStateMachine}.
+*/
+   public Builder addTransition(T startState, T endState) {
+   this.allowedTransitions.put(startState, endState);
+   return this;
+   }
+
+   /**
+* Specify that a state is a valid end state for a transition 
starting from any state. Useful for example for
+* error sta

[GitHub] yukuai518 opened a new pull request #2555: [GOBBLIN-683] Add azkaban client retry logic.

2019-02-14 Thread GitBox
yukuai518 opened a new pull request #2555: [GOBBLIN-683] Add azkaban client 
retry logic.
URL: https://github.com/apache/incubator-gobblin/pull/2555
 
 
   Dear Gobblin maintainers,
   
   Please accept this PR. I understand that it will not be reviewed until I 
have checked off all the steps below!
   
   
   ### JIRA
   - [ ] My PR addresses the following [Gobblin 
JIRA](https://issues.apache.org/jira/browse/GOBBLIN/) issues and references 
them in the PR title. For example, "[GOBBLIN-XXX] My Gobblin PR"
   - https://issues.apache.org/jira/browse/GOBBLIN-XXX
   
   
   ### Description
   - [x] Here are some details about my PR, including screenshots (if 
applicable):
  Retry azkaban client if session expires.
   
   ### Tests
   - [x] My PR adds the following unit tests __OR__ does not need testing for 
this extremely good reason:
   
   
   ### Commits
   - [x] My commits all reference JIRA issues in their subject lines, and I 
have squashed multiple commits if they address the same issue. In addition, my 
commits follow the guidelines from "[How to write a good git commit 
message](http://chris.beams.io/posts/git-commit/)":
   1. Subject is separated from body by a blank line
   2. Subject is limited to 50 characters
   3. Subject does not end with a period
   4. Subject uses the imperative mood ("add", not "adding")
   5. Body wraps at 72 characters
   6. Body explains "what" and "why", not "how"
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] yukuai518 commented on issue #2555: [GOBBLIN-683] Add azkaban client retry logic.

2019-02-14 Thread GitBox
yukuai518 commented on issue #2555: [GOBBLIN-683] Add azkaban client retry 
logic.
URL: 
https://github.com/apache/incubator-gobblin/pull/2555#issuecomment-463753284
 
 
   @sv2000 Please review


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sv2000 commented on a change in pull request #2555: [GOBBLIN-683] Add azkaban client retry logic.

2019-02-14 Thread GitBox
sv2000 commented on a change in pull request #2555: [GOBBLIN-683] Add azkaban 
client retry logic.
URL: https://github.com/apache/incubator-gobblin/pull/2555#discussion_r257053131
 
 

 ##
 File path: 
gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClient.java
 ##
 @@ -402,97 +329,71 @@ public AzkabanExecuteFlowStatus executeFlowWithOptions(
*
* @return The status object which contains success status and execution id.
*/
-  public AzkabanExecuteFlowStatus executeFlow(
-  String projectName,
-  String flowName,
-  Map flowParameters) {
+  public AzkabanExecuteFlowStatus executeFlow(String projectName,
+  String flowName,
+  Map 
flowParameters) throws AzkabanClientException {
 return executeFlowWithOptions(projectName, flowName, null, flowParameters);
   }
 
   /**
* Cancel a flow by execution id.
*/
-  public AzkabanClientStatus cancelFlow(int execId) {
-try {
-  refreshSession();
-  List nvps = new ArrayList<>();
-  nvps.add(new BasicNameValuePair(AzkabanClientParams.AJAX, "cancelFlow"));
-  nvps.add(new BasicNameValuePair(AzkabanClientParams.SESSION_ID, 
this.sessionId));
-  nvps.add(new BasicNameValuePair(AzkabanClientParams.EXECID, 
String.valueOf(execId)));
-
-  Header contentType = new BasicHeader(HttpHeaders.CONTENT_TYPE, 
"application/x-www-form-urlencoded");
-  Header requestType = new BasicHeader("X-Requested-With", 
"XMLHttpRequest");
-
-  HttpGet httpGet = new HttpGet(url + "/executor?" + 
URLEncodedUtils.format(nvps, "UTF-8"));
-  httpGet.setHeaders(new Header[]{contentType, requestType});
-
-  CloseableHttpResponse response = this.httpClient.execute(httpGet);
-  try {
-handleResponse(response);
-return new AzkabanClientStatus.SUCCESS();
-  } finally {
-response.close();
-  }
-} catch (Exception e) {
-  return new AzkabanClientStatus.FAIL("", e);
-}
+  public AzkabanClientStatus cancelFlow(String execId) throws 
AzkabanClientException {
+AzkabanMultiCallables.CancelFlowCallable callable =
+new AzkabanMultiCallables.CancelFlowCallable(this,
+execId);
+
+return runWithRetry(callable, AzkabanClientStatus.class);
   }
 
+  /**
+   * Fetch an execution log.
+   */
+  public AzkabanClientStatus fetchExecutionLog(String execId,
+   String jobId,
+   String offset,
+   String length,
+   File ouf) throws 
AzkabanClientException {
+AzkabanMultiCallables.FetchExecLogCallable callable =
+new AzkabanMultiCallables.FetchExecLogCallable(this,
+execId,
+jobId,
+offset,
+length,
+ouf);
+
+return runWithRetry(callable, AzkabanClientStatus.class);
+  }
 
   /**
-   * Given an execution id, fetches all the detailed information of that 
execution, including a list of all the job executions.
+   * Given an execution id, fetches all the detailed information of that 
execution,
+   * including a list of all the job executions.
*
* @param execId execution id to be fetched.
*
-   * @return The status object which contains success status and all the 
detailed information of that execution.
+   * @return The status object which contains success status and all the 
detailed
+   * information of that execution.
*/
-  public AzkabanFetchExecuteFlowStatus fetchFlowExecution (String execId) {
-try {
-  refreshSession();
-  List nvps = new ArrayList<>();
-  nvps.add(new BasicNameValuePair(AzkabanClientParams.AJAX, 
"fetchexecflow"));
-  nvps.add(new BasicNameValuePair(AzkabanClientParams.SESSION_ID, 
this.sessionId));
-  nvps.add(new BasicNameValuePair(AzkabanClientParams.EXECID, execId));
-
-  Header contentType = new BasicHeader(HttpHeaders.CONTENT_TYPE, 
"application/x-www-form-urlencoded");
-  Header requestType = new BasicHeader("X-Requested-With", 
"XMLHttpRequest");
-
-  HttpGet httpGet = new HttpGet(url + "/executor?" + 
URLEncodedUtils.format(nvps, "UTF-8"));
-  httpGet.setHeaders(new Header[]{contentType, requestType});
-
-  CloseableHttpResponse response = this.httpClient.execute(httpGet);
-  try {
-Map map = handleResponse(response);
-return new AzkabanFetchExecuteFlowStatus(new 
AzkabanFetchExecuteFlowStatus.Execution(map));
-  } finally {
-response.close();
-  }
-} catch (Exception e) {
-  return new AzkabanFetchExecuteFlowStatus("Azkaban client cannot "
-  + "fetch execId " + execId, e);
-}
-  }
+  public AzkabanFetchExecuteFlowStatus fetchFlowExecution(String execId) 
throws AzkabanClientException {
+AzkabanMultiCallables.FetchF

[GitHub] sv2000 opened a new pull request #2556: GOBBLIN-684: Ensure buffered messages are flushed before close() in K…

2019-02-14 Thread GitBox
sv2000 opened a new pull request #2556: GOBBLIN-684: Ensure buffered messages 
are flushed before close() in K…
URL: https://github.com/apache/incubator-gobblin/pull/2556
 
 
   …afkaProducerPusher.
   
   Dear Gobblin maintainers,
   
   Please accept this PR. I understand that it will not be reviewed until I 
have checked off all the steps below!
   
   
   ### JIRA
   - [x] My PR addresses the following [Gobblin 
JIRA](https://issues.apache.org/jira/browse/GOBBLIN/) issues and references 
them in the PR title. For example, "[GOBBLIN-XXX] My Gobblin PR"
   - https://issues.apache.org/jira/browse/GOBBLIN-684
   
   
   ### Description
   - [x] Here are some details about my PR, including screenshots (if 
applicable):
   Currently, when KafkaProducerPusher is closed, it invokes 
KafkaProducer#close(). However,close() only guarantees delivery of in-flight 
messages, not the messages in the producer buffer waiting to be sent out. This 
results in data loss.
   
   The fix ensures that we call flush() before close(). As a result, any 
buffered messages are immediately pushed out and we block until the messages 
are acked. 
   
   ### Tests
   - [x] My PR adds the following unit tests __OR__ does not need testing for 
this extremely good reason:
   Tested in real deployment.
   
   ### Commits
   - [x] My commits all reference JIRA issues in their subject lines, and I 
have squashed multiple commits if they address the same issue. In addition, my 
commits follow the guidelines from "[How to write a good git commit 
message](http://chris.beams.io/posts/git-commit/)":
   1. Subject is separated from body by a blank line
   2. Subject is limited to 50 characters
   3. Subject does not end with a period
   4. Subject uses the imperative mood ("add", not "adding")
   5. Body wraps at 72 characters
   6. Body explains "what" and "why", not "how"
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] yukuai518 opened a new pull request #2557: [GOBBLIN-685] Add dump jstack for EmbeddedGobblin

2019-02-15 Thread GitBox
yukuai518 opened a new pull request #2557: [GOBBLIN-685] Add dump jstack for 
EmbeddedGobblin
URL: https://github.com/apache/incubator-gobblin/pull/2557
 
 
   Dear Gobblin maintainers,
   
   Please accept this PR. I understand that it will not be reviewed until I 
have checked off all the steps below!
   
   
   ### JIRA
   - [x] My PR addresses the following [Gobblin 
JIRA](https://issues.apache.org/jira/browse/GOBBLIN/) issues and references 
them in the PR title. For example, "[GOBBLIN-XXX] My Gobblin PR"
   - https://issues.apache.org/jira/browse/GOBBLIN-685
   
   
   ### Description
   - [x] Here are some details about my PR, including screenshots (if 
applicable):
  In ticket https://jira01.corp.linkedin.com:8443/browse/ETL-8477, we don't 
know why the thread would timeout. So two actions were taken:
  1) When timeout happens, we will dump the jstack
  2) Make cancellation executor a daemon thread, so that if won't be the 
only hanging thread when no other non-daemon threads are active.
   
   ### Tests
   - [x] My PR adds the following unit tests __OR__ does not need testing for 
this extremely good reason:
  EmbeddedGobblinTest
   
   ### Commits
   - [x] My commits all reference JIRA issues in their subject lines, and I 
have squashed multiple commits if they address the same issue. In addition, my 
commits follow the guidelines from "[How to write a good git commit 
message](http://chris.beams.io/posts/git-commit/)":
   1. Subject is separated from body by a blank line
   2. Subject is limited to 50 characters
   3. Subject does not end with a period
   4. Subject uses the imperative mood ("add", not "adding")
   5. Body wraps at 72 characters
   6. Body explains "what" and "why", not "how"
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] yukuai518 commented on issue #2557: [GOBBLIN-685] Add dump jstack for EmbeddedGobblin

2019-02-15 Thread GitBox
yukuai518 commented on issue #2557: [GOBBLIN-685] Add dump jstack for 
EmbeddedGobblin
URL: 
https://github.com/apache/incubator-gobblin/pull/2557#issuecomment-464255003
 
 
   @htran1 please review.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] htran1 commented on a change in pull request #2557: [GOBBLIN-685] Add dump jstack for EmbeddedGobblin

2019-02-15 Thread GitBox
htran1 commented on a change in pull request #2557: [GOBBLIN-685] Add dump 
jstack for EmbeddedGobblin
URL: https://github.com/apache/incubator-gobblin/pull/2557#discussion_r257429826
 
 

 ##
 File path: 
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblin.java
 ##
 @@ -352,6 +358,14 @@ public EmbeddedGobblin setShutdownTimeout(String timeout) 
{
 return setShutdownTimeout(Period.parse(timeout).getSeconds(), 
TimeUnit.SECONDS);
   }
 
+  /**
+   * Enable dumping jstack when error happens.
+   */
+  public EmbeddedGobblin setDumpJStack(boolean dumpJStack) {
 
 Review comment:
   `setDumpJStack` -> `setDumpJStackOnTimeout`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] asfgit closed pull request #2557: [GOBBLIN-685] Add dump jstack for EmbeddedGobblin

2019-02-15 Thread GitBox
asfgit closed pull request #2557: [GOBBLIN-685] Add dump jstack for 
EmbeddedGobblin
URL: https://github.com/apache/incubator-gobblin/pull/2557
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] arjun4084346 opened a new pull request #2558: [GOBBLIN-686] Enhance schema comparison

2019-02-16 Thread GitBox
arjun4084346 opened a new pull request #2558: [GOBBLIN-686] Enhance schema 
comparison
URL: https://github.com/apache/incubator-gobblin/pull/2558
 
 
   Dear Gobblin maintainers,
   
   Please accept this PR. I understand that it will not be reviewed until I 
have checked off all the steps below!
   @sv2000  @htran1  please review.
   
   
   ### JIRA
   - [x] My PR addresses the following [Gobblin 
JIRA](https://issues.apache.org/jira/browse/GOBBLIN/) issues and references 
them in the PR title. For example, "[GOBBLIN-XXX] My Gobblin PR"
   - https://issues.apache.org/jira/browse/GOBBLIN-XXX
   
   
   ### Description
   - [x] Here are some details about my PR, including screenshots (if 
applicable):
   write a utility method to validate compatibility of two avro schemas, with 
the option to compare only fields
   
   ### Tests
   - [x] My PR adds the following unit tests __OR__ does not need testing for 
this extremely good reason:
   added unit tests in AvroUtilsTest
   
   ### Commits
   - [x] My commits all reference JIRA issues in their subject lines, and I 
have squashed multiple commits if they address the same issue. In addition, my 
commits follow the guidelines from "[How to write a good git commit 
message](http://chris.beams.io/posts/git-commit/)":
   1. Subject is separated from body by a blank line
   2. Subject is limited to 50 characters
   3. Subject does not end with a period
   4. Subject uses the imperative mood ("add", not "adding")
   5. Body wraps at 72 characters
   6. Body explains "what" and "why", not "how"
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] htran1 commented on a change in pull request #2556: GOBBLIN-684: Ensure buffered messages are flushed before close() in K…

2019-02-18 Thread GitBox
htran1 commented on a change in pull request #2556: GOBBLIN-684: Ensure 
buffered messages are flushed before close() in K…
URL: https://github.com/apache/incubator-gobblin/pull/2556#discussion_r257845882
 
 

 ##
 File path: 
gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaKeyValueProducerPusher.java
 ##
 @@ -47,6 +51,9 @@
   private final String topic;
   private final KafkaProducer producer;
   private final Closer closer;
+  private final Queue> futures = new 
LinkedBlockingDeque<>();
 
 Review comment:
   Can you add a comment that this queue must have capacity (currently 
unlimited) of at least `MAX_NUM_FUTURES_TO_BUFFER` plus concurrency buffer 
since a non-blocking `offer` is used? Just in prevent someone from 
inadvertently adding an incorrect limit.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] htran1 commented on a change in pull request #2556: GOBBLIN-684: Ensure buffered messages are flushed before close() in K…

2019-02-18 Thread GitBox
htran1 commented on a change in pull request #2556: GOBBLIN-684: Ensure 
buffered messages are flushed before close() in K…
URL: https://github.com/apache/incubator-gobblin/pull/2556#discussion_r257847795
 
 

 ##
 File path: 
gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaKeyValueProducerPusher.java
 ##
 @@ -80,17 +88,46 @@ public KafkaKeyValueProducerPusher(String brokers, String 
topic) {
*/
   public void pushMessages(List> messages) {
 for (Pair message: messages) {
-  this.producer.send(new ProducerRecord<>(topic, message.getKey(), 
message.getValue()), (recordMetadata, e) -> {
+  this.futures.offer(this.producer.send(new ProducerRecord<>(topic, 
message.getKey(), message.getValue()), (recordMetadata, e) -> {
 if (e != null) {
   log.error("Failed to send message to topic {} due to exception: ", 
topic, e);
 }
-  });
+  }));
+}
+
+//Accumulate futures returned from send() into a buffer; will be used to 
simulate flush by calling get() on
+// each of the accumulated futures.
+if (this.futures.size() >= MAX_NUM_FUTURES_TO_BUFFER) {
+  flush(MAX_NUM_FUTURES_TO_BUFFER);
+  this.futures.clear();
 
 Review comment:
   This won't be thread safe since if there are concurrent calls to 
pushMessages() some unacknowledged futures may be cleared. `flush` is already 
removing with `poll`, so I think this can be removed.
   
   Also, this forces a synchronous wait every 1000 messages. Should there be 
different max buffer and flush amounts to avoid waiting for the newest messages?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sv2000 commented on a change in pull request #2556: GOBBLIN-684: Ensure buffered messages are flushed before close() in K…

2019-02-18 Thread GitBox
sv2000 commented on a change in pull request #2556: GOBBLIN-684: Ensure 
buffered messages are flushed before close() in K…
URL: https://github.com/apache/incubator-gobblin/pull/2556#discussion_r257866178
 
 

 ##
 File path: 
gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaKeyValueProducerPusher.java
 ##
 @@ -47,6 +51,9 @@
   private final String topic;
   private final KafkaProducer producer;
   private final Closer closer;
+  private final Queue> futures = new 
LinkedBlockingDeque<>();
 
 Review comment:
   Thanks! Added more comments to explain how to set the capacity for the 
buffer.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sv2000 commented on a change in pull request #2556: GOBBLIN-684: Ensure buffered messages are flushed before close() in K…

2019-02-18 Thread GitBox
sv2000 commented on a change in pull request #2556: GOBBLIN-684: Ensure 
buffered messages are flushed before close() in K…
URL: https://github.com/apache/incubator-gobblin/pull/2556#discussion_r257866302
 
 

 ##
 File path: 
gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaKeyValueProducerPusher.java
 ##
 @@ -80,17 +88,46 @@ public KafkaKeyValueProducerPusher(String brokers, String 
topic) {
*/
   public void pushMessages(List> messages) {
 for (Pair message: messages) {
-  this.producer.send(new ProducerRecord<>(topic, message.getKey(), 
message.getValue()), (recordMetadata, e) -> {
+  this.futures.offer(this.producer.send(new ProducerRecord<>(topic, 
message.getKey(), message.getValue()), (recordMetadata, e) -> {
 if (e != null) {
   log.error("Failed to send message to topic {} due to exception: ", 
topic, e);
 }
-  });
+  }));
+}
+
+//Accumulate futures returned from send() into a buffer; will be used to 
simulate flush by calling get() on
+// each of the accumulated futures.
+if (this.futures.size() >= MAX_NUM_FUTURES_TO_BUFFER) {
+  flush(MAX_NUM_FUTURES_TO_BUFFER);
+  this.futures.clear();
 
 Review comment:
   Good catch! Yes, the clear() is not needed. Also, changed the pushMessages() 
method to exclude the newest added messages. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] ibuenros commented on issue #2548: [GOBBLIN-677] - Allow early termination of Gobblin jobs based on a predicate on the job progress

2019-02-19 Thread GitBox
ibuenros commented on issue #2548: [GOBBLIN-677] - Allow early termination of 
Gobblin jobs based on a predicate on the job progress
URL: 
https://github.com/apache/incubator-gobblin/pull/2548#issuecomment-465215902
 
 
   @sv2000 addressed comments, please take another look?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sv2000 opened a new pull request #2559: GOBBLIN-687: Pass TopologySpec map to DagManager to allow reuse of Sp…

2019-02-19 Thread GitBox
sv2000 opened a new pull request #2559: GOBBLIN-687: Pass TopologySpec map to 
DagManager to allow reuse of Sp…
URL: https://github.com/apache/incubator-gobblin/pull/2559
 
 
   …ecExecutors during DAG deserialization.
   
   Dear Gobblin maintainers,
   
   Please accept this PR. I understand that it will not be reviewed until I 
have checked off all the steps below!
   
   
   ### JIRA
   - [x] My PR addresses the following [Gobblin 
JIRA](https://issues.apache.org/jira/browse/GOBBLIN/) issues and references 
them in the PR title. For example, "[GOBBLIN-XXX] My Gobblin PR"
   - https://issues.apache.org/jira/browse/GOBBLIN-687
   
   
   ### Description
   - [x] Here are some details about my PR, including screenshots (if 
applicable):
   DagManager maintains state of all currently executing DAGs, by serializing 
each DAG on compilation and persisting it to a durable store. The serialized 
DAG includes Job config as well as the SpecExecutor config for each job in the 
DAG. This is done to correctly resume execution of DAGs in case of service 
restarts or leadership change. 
   
   Currently, on service restart/leadership change, the new master 
de-serializes SpecExecutor config and creates a SpecExecutor instance for each 
job in the DAG. If the number of DAGs is large, this can result in many 
connections to the underlying executor instance. The proposed fix allows the 
DagManager to re-use the SpecExecutor instances created by the 
TopologySpecFactory when it deserializes a DAG.
   
   
   
   ### Tests
   - [x] My PR adds the following unit tests __OR__ does not need testing for 
this extremely good reason:
   Enhanced unit tests in FSDagStateStoreTest and tested in local environment.
   
   ### Commits
   - [x] My commits all reference JIRA issues in their subject lines, and I 
have squashed multiple commits if they address the same issue. In addition, my 
commits follow the guidelines from "[How to write a good git commit 
message](http://chris.beams.io/posts/git-commit/)":
   1. Subject is separated from body by a blank line
   2. Subject is limited to 50 characters
   3. Subject does not end with a period
   4. Subject uses the imperative mood ("add", not "adding")
   5. Body wraps at 72 characters
   6. Body explains "what" and "why", not "how"
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sv2000 commented on issue #2559: GOBBLIN-687: Pass TopologySpec map to DagManager to allow reuse of Sp…

2019-02-19 Thread GitBox
sv2000 commented on issue #2559: GOBBLIN-687: Pass TopologySpec map to 
DagManager to allow reuse of Sp…
URL: 
https://github.com/apache/incubator-gobblin/pull/2559#issuecomment-465423142
 
 
   @autumnust please review this PR.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sv2000 commented on a change in pull request #2555: [GOBBLIN-683] Add azkaban client retry logic.

2019-02-19 Thread GitBox
sv2000 commented on a change in pull request #2555: [GOBBLIN-683] Add azkaban 
client retry logic.
URL: https://github.com/apache/incubator-gobblin/pull/2555#discussion_r258336055
 
 

 ##
 File path: 
gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClient.java
 ##
 @@ -88,19 +96,23 @@ protected AzkabanClient(String username,
   String url,
   long sessionExpireInMin,
   CloseableHttpClient httpClient,
-  SessionManager sessionManager)
+  SessionManager sessionManager,
+  ExecutorService executorService)
   throws AzkabanClientException {
 this.username = username;
 this.password = password;
 this.url = url;
 this.sessionExpireInMin = sessionExpireInMin;
 this.httpClient = httpClient;
 this.sessionManager = sessionManager;
+this.executorService = executorService;
 this.initializeClient();
 this.initializeSessionManager();
+this.intializeExecutorService();
 this.retryer = RetryerBuilder.newBuilder()
 .retryIfExceptionOfType(InvalidSessionException.class)
-.withWaitStrategy(WaitStrategies.exponentialWait(10, TimeUnit.SECONDS))
+.withAttemptTimeLimiter(AttemptTimeLimiters.fixedTimeLimit(10, 
TimeUnit.SECONDS, this.executorService))
+.withWaitStrategy(WaitStrategies.exponentialWait(60, TimeUnit.SECONDS))
 .withStopStrategy(StopStrategies.stopAfterAttempt(5))
 
 Review comment:
   3 attempts should be sufficient I think. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sv2000 commented on a change in pull request #2555: [GOBBLIN-683] Add azkaban client retry logic.

2019-02-19 Thread GitBox
sv2000 commented on a change in pull request #2555: [GOBBLIN-683] Add azkaban 
client retry logic.
URL: https://github.com/apache/incubator-gobblin/pull/2555#discussion_r258336828
 
 

 ##
 File path: 
gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClient.java
 ##
 @@ -123,6 +135,12 @@ private void initializeSessionManager() throws 
AzkabanClientException {
 }
   }
 
+  private void intializeExecutorService() {
+if (this.executorService == null) {
+  this.executorService = Executors.newFixedThreadPool(300);
 
 Review comment:
   300 threads may be too much. Is 30 too low? Also, do you think some of the 
retryer settings be made configurable? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sv2000 commented on a change in pull request #2555: [GOBBLIN-683] Add azkaban client retry logic.

2019-02-19 Thread GitBox
sv2000 commented on a change in pull request #2555: [GOBBLIN-683] Add azkaban 
client retry logic.
URL: https://github.com/apache/incubator-gobblin/pull/2555#discussion_r258337554
 
 

 ##
 File path: 
gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClient.java
 ##
 @@ -88,19 +96,23 @@ protected AzkabanClient(String username,
   String url,
   long sessionExpireInMin,
   CloseableHttpClient httpClient,
-  SessionManager sessionManager)
+  SessionManager sessionManager,
+  ExecutorService executorService)
   throws AzkabanClientException {
 this.username = username;
 this.password = password;
 this.url = url;
 this.sessionExpireInMin = sessionExpireInMin;
 this.httpClient = httpClient;
 this.sessionManager = sessionManager;
+this.executorService = executorService;
 this.initializeClient();
 this.initializeSessionManager();
+this.intializeExecutorService();
 this.retryer = RetryerBuilder.newBuilder()
 .retryIfExceptionOfType(InvalidSessionException.class)
-.withWaitStrategy(WaitStrategies.exponentialWait(10, TimeUnit.SECONDS))
+.withAttemptTimeLimiter(AttemptTimeLimiters.fixedTimeLimit(10, 
TimeUnit.SECONDS, this.executorService))
 
 Review comment:
   Is this really needed? I am a little concerned about creating executor 
service with a fixed thread pool of 300 threads per client. With multiple 
Azkaban executor instances, this can lead to a lot of threads. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] autumnust commented on a change in pull request #2559: GOBBLIN-687: Pass TopologySpec map to DagManager to allow reuse of Sp…

2019-02-19 Thread GitBox
autumnust commented on a change in pull request #2559: GOBBLIN-687: Pass 
TopologySpec map to DagManager to allow reuse of Sp…
URL: https://github.com/apache/incubator-gobblin/pull/2559#discussion_r258338558
 
 

 ##
 File path: 
gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
 ##
 @@ -232,6 +231,7 @@ public GobblinServiceManager(String serviceName, String 
serviceId, Config config
   this.serviceLauncher.addService(this.scheduler);
 }
 
+
 
 Review comment:
   Clean ? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] autumnust commented on a change in pull request #2559: GOBBLIN-687: Pass TopologySpec map to DagManager to allow reuse of Sp…

2019-02-19 Thread GitBox
autumnust commented on a change in pull request #2559: GOBBLIN-687: Pass 
TopologySpec map to DagManager to allow reuse of Sp…
URL: https://github.com/apache/incubator-gobblin/pull/2559#discussion_r258341565
 
 

 ##
 File path: 
gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
 ##
 @@ -354,7 +354,10 @@ private void handleLeadershipChange(NotificationContext 
changeContext) {
   }
 
   if (this.isDagManagerEnabled) {
-this.dagManager.setActive(true);
+//Activate DagManager only if TopologyCatalog is initialized. If not; 
skip activation.
+if (this.topologyCatalog.getInitComplete().getCount() <= 0) {
 
 Review comment:
   Is `== 0` more accurate ? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] autumnust commented on a change in pull request #2559: GOBBLIN-687: Pass TopologySpec map to DagManager to allow reuse of Sp…

2019-02-19 Thread GitBox
autumnust commented on a change in pull request #2559: GOBBLIN-687: Pass 
TopologySpec map to DagManager to allow reuse of Sp…
URL: https://github.com/apache/incubator-gobblin/pull/2559#discussion_r258344733
 
 

 ##
 File path: 
gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
 ##
 @@ -218,6 +211,12 @@ public GobblinServiceManager(String serviceName, String 
serviceId, Config config
   this.helixManager = Optional.absent();
 }
 
+// Initialize DagManager
 
 Review comment:
   Initially `DagManager` is only initialized when `isFlowCatalogEnabled` set 
to true, but moving it out means this condition doesn't apply anymore.  Is that 
on purpose ? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] autumnust commented on a change in pull request #2559: GOBBLIN-687: Pass TopologySpec map to DagManager to allow reuse of Sp…

2019-02-19 Thread GitBox
autumnust commented on a change in pull request #2559: GOBBLIN-687: Pass 
TopologySpec map to DagManager to allow reuse of Sp…
URL: https://github.com/apache/incubator-gobblin/pull/2559#discussion_r258344143
 
 

 ##
 File path: 
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
 ##
 @@ -107,6 +104,10 @@ public Orchestrator(Config config, 
Optional topologyCatalog, Op
 | ClassNotFoundException e) {
   throw new RuntimeException(e);
 }
+//Pass the topologySpecMap to the DagManager
 
 Review comment:
   Shall we move this block under line 93?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] htran1 commented on a change in pull request #2556: GOBBLIN-684: Ensure buffered messages are flushed before close() in K…

2019-02-19 Thread GitBox
htran1 commented on a change in pull request #2556: GOBBLIN-684: Ensure 
buffered messages are flushed before close() in K…
URL: https://github.com/apache/incubator-gobblin/pull/2556#discussion_r258348552
 
 

 ##
 File path: 
gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/metrics/kafka/KafkaKeyValueProducerPusher.java
 ##
 @@ -80,17 +98,48 @@ public KafkaKeyValueProducerPusher(String brokers, String 
topic) {
*/
   public void pushMessages(List> messages) {
 for (Pair message: messages) {
-  this.producer.send(new ProducerRecord<>(topic, message.getKey(), 
message.getValue()), (recordMetadata, e) -> {
+  this.futures.offer(this.producer.send(new ProducerRecord<>(topic, 
message.getKey(), message.getValue()), (recordMetadata, e) -> {
 if (e != null) {
   log.error("Failed to send message to topic {} due to exception: ", 
topic, e);
 }
-  });
+  }));
+}
+
+//Once the low watermark of numFuturesToBuffer is hit, start flushing 
messages from the futures
+// buffer. In order to avoid blocking on newest messages added to futures 
queue, we only invoke future.get() on
+// the oldest messages in the futures buffer. The number of messages to 
flush is same as the number of messages added
+// in the current call. Note this does not completely avoid calling 
future.get() on the newer messages e.g. when
+// multiple threads enter the if{} block concurrently, and invoke flush().
+if (this.futures.size() >= this.numFuturesToBuffer) {
+  flush(messages.size());
+}
+  }
+
+  /**
+   * Flush any records that may be present in the producer buffer upto a 
maximum of numRecordsToFlush.
+   * This method is needed since Kafka 0.8 producer does not have a flush() 
API. In the absence of the flush()
+   * implementation, records which are present in the buffer but not in-flight 
may not be delivered at all when close()
+   * is called, leading to data loss.
+   * @param numRecordsToFlush
+   */
+  private void flush(long numRecordsToFlush) {
+log.info("Flushing records from producer buffer");
 
 Review comment:
   Can you make this log and the one at the end debug level or print it only 
when `numRecordsToFlush == Long.MAX_VALUE`? I think this may flood the log 
since flush() is called whenever the futures queue has hit the limit.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] asfgit closed pull request #2556: GOBBLIN-684: Ensure buffered messages are flushed before close() in K…

2019-02-20 Thread GitBox
asfgit closed pull request #2556: GOBBLIN-684: Ensure buffered messages are 
flushed before close() in K…
URL: https://github.com/apache/incubator-gobblin/pull/2556
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sv2000 commented on a change in pull request #2559: GOBBLIN-687: Pass TopologySpec map to DagManager to allow reuse of Sp…

2019-02-20 Thread GitBox
sv2000 commented on a change in pull request #2559: GOBBLIN-687: Pass 
TopologySpec map to DagManager to allow reuse of Sp…
URL: https://github.com/apache/incubator-gobblin/pull/2559#discussion_r258622933
 
 

 ##
 File path: 
gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
 ##
 @@ -354,7 +354,10 @@ private void handleLeadershipChange(NotificationContext 
changeContext) {
   }
 
   if (this.isDagManagerEnabled) {
-this.dagManager.setActive(true);
+//Activate DagManager only if TopologyCatalog is initialized. If not; 
skip activation.
+if (this.topologyCatalog.getInitComplete().getCount() <= 0) {
 
 Review comment:
   Changed it to == 0. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sv2000 commented on a change in pull request #2559: GOBBLIN-687: Pass TopologySpec map to DagManager to allow reuse of Sp…

2019-02-20 Thread GitBox
sv2000 commented on a change in pull request #2559: GOBBLIN-687: Pass 
TopologySpec map to DagManager to allow reuse of Sp…
URL: https://github.com/apache/incubator-gobblin/pull/2559#discussion_r258623176
 
 

 ##
 File path: 
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
 ##
 @@ -107,6 +104,10 @@ public Orchestrator(Config config, 
Optional topologyCatalog, Op
 | ClassNotFoundException e) {
   throw new RuntimeException(e);
 }
+//Pass the topologySpecMap to the DagManager
 
 Review comment:
   We need this after the spec compiler initialization, to make sure that the 
topologySpecMap has been instantiated. Added comment to make it explicit.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sv2000 commented on a change in pull request #2559: GOBBLIN-687: Pass TopologySpec map to DagManager to allow reuse of Sp…

2019-02-20 Thread GitBox
sv2000 commented on a change in pull request #2559: GOBBLIN-687: Pass 
TopologySpec map to DagManager to allow reuse of Sp…
URL: https://github.com/apache/incubator-gobblin/pull/2559#discussion_r258623894
 
 

 ##
 File path: 
gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
 ##
 @@ -218,6 +211,12 @@ public GobblinServiceManager(String serviceName, String 
serviceId, Config config
   this.helixManager = Optional.absent();
 }
 
+// Initialize DagManager
 
 Review comment:
   Thanks for noticing this! The DagManager initialization does not depend on 
whether FlowCatalog is enabled or not. We have a separate config to 
enable/disable DagManager. I can't recall why it was inside if 
(isFlowCatalogEnabled). Yes, the move outside the if block is intentional. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sv2000 commented on issue #2559: GOBBLIN-687: Pass TopologySpec map to DagManager to allow reuse of Sp…

2019-02-20 Thread GitBox
sv2000 commented on issue #2559: GOBBLIN-687: Pass TopologySpec map to 
DagManager to allow reuse of Sp…
URL: 
https://github.com/apache/incubator-gobblin/pull/2559#issuecomment-465704819
 
 
   @autumnust addressed your comments. Can you please take another look?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sv2000 commented on a change in pull request #2559: GOBBLIN-687: Pass TopologySpec map to DagManager to allow reuse of Sp…

2019-02-20 Thread GitBox
sv2000 commented on a change in pull request #2559: GOBBLIN-687: Pass 
TopologySpec map to DagManager to allow reuse of Sp…
URL: https://github.com/apache/incubator-gobblin/pull/2559#discussion_r258623964
 
 

 ##
 File path: 
gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
 ##
 @@ -232,6 +231,7 @@ public GobblinServiceManager(String serviceName, String 
serviceId, Config config
   this.serviceLauncher.addService(this.scheduler);
 }
 
+
 
 Review comment:
   Done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] yukuai518 commented on a change in pull request #2555: [GOBBLIN-683] Add azkaban client retry logic.

2019-02-20 Thread GitBox
yukuai518 commented on a change in pull request #2555: [GOBBLIN-683] Add 
azkaban client retry logic.
URL: https://github.com/apache/incubator-gobblin/pull/2555#discussion_r258633224
 
 

 ##
 File path: 
gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClient.java
 ##
 @@ -402,97 +329,71 @@ public AzkabanExecuteFlowStatus executeFlowWithOptions(
*
* @return The status object which contains success status and execution id.
*/
-  public AzkabanExecuteFlowStatus executeFlow(
-  String projectName,
-  String flowName,
-  Map flowParameters) {
+  public AzkabanExecuteFlowStatus executeFlow(String projectName,
+  String flowName,
+  Map 
flowParameters) throws AzkabanClientException {
 return executeFlowWithOptions(projectName, flowName, null, flowParameters);
   }
 
   /**
* Cancel a flow by execution id.
*/
-  public AzkabanClientStatus cancelFlow(int execId) {
-try {
-  refreshSession();
-  List nvps = new ArrayList<>();
-  nvps.add(new BasicNameValuePair(AzkabanClientParams.AJAX, "cancelFlow"));
-  nvps.add(new BasicNameValuePair(AzkabanClientParams.SESSION_ID, 
this.sessionId));
-  nvps.add(new BasicNameValuePair(AzkabanClientParams.EXECID, 
String.valueOf(execId)));
-
-  Header contentType = new BasicHeader(HttpHeaders.CONTENT_TYPE, 
"application/x-www-form-urlencoded");
-  Header requestType = new BasicHeader("X-Requested-With", 
"XMLHttpRequest");
-
-  HttpGet httpGet = new HttpGet(url + "/executor?" + 
URLEncodedUtils.format(nvps, "UTF-8"));
-  httpGet.setHeaders(new Header[]{contentType, requestType});
-
-  CloseableHttpResponse response = this.httpClient.execute(httpGet);
-  try {
-handleResponse(response);
-return new AzkabanClientStatus.SUCCESS();
-  } finally {
-response.close();
-  }
-} catch (Exception e) {
-  return new AzkabanClientStatus.FAIL("", e);
-}
+  public AzkabanClientStatus cancelFlow(String execId) throws 
AzkabanClientException {
+AzkabanMultiCallables.CancelFlowCallable callable =
+new AzkabanMultiCallables.CancelFlowCallable(this,
+execId);
+
+return runWithRetry(callable, AzkabanClientStatus.class);
   }
 
+  /**
+   * Fetch an execution log.
+   */
+  public AzkabanClientStatus fetchExecutionLog(String execId,
+   String jobId,
+   String offset,
+   String length,
+   File ouf) throws 
AzkabanClientException {
+AzkabanMultiCallables.FetchExecLogCallable callable =
+new AzkabanMultiCallables.FetchExecLogCallable(this,
+execId,
+jobId,
+offset,
+length,
+ouf);
+
+return runWithRetry(callable, AzkabanClientStatus.class);
+  }
 
   /**
-   * Given an execution id, fetches all the detailed information of that 
execution, including a list of all the job executions.
+   * Given an execution id, fetches all the detailed information of that 
execution,
+   * including a list of all the job executions.
*
* @param execId execution id to be fetched.
*
-   * @return The status object which contains success status and all the 
detailed information of that execution.
+   * @return The status object which contains success status and all the 
detailed
+   * information of that execution.
*/
-  public AzkabanFetchExecuteFlowStatus fetchFlowExecution (String execId) {
-try {
-  refreshSession();
-  List nvps = new ArrayList<>();
-  nvps.add(new BasicNameValuePair(AzkabanClientParams.AJAX, 
"fetchexecflow"));
-  nvps.add(new BasicNameValuePair(AzkabanClientParams.SESSION_ID, 
this.sessionId));
-  nvps.add(new BasicNameValuePair(AzkabanClientParams.EXECID, execId));
-
-  Header contentType = new BasicHeader(HttpHeaders.CONTENT_TYPE, 
"application/x-www-form-urlencoded");
-  Header requestType = new BasicHeader("X-Requested-With", 
"XMLHttpRequest");
-
-  HttpGet httpGet = new HttpGet(url + "/executor?" + 
URLEncodedUtils.format(nvps, "UTF-8"));
-  httpGet.setHeaders(new Header[]{contentType, requestType});
-
-  CloseableHttpResponse response = this.httpClient.execute(httpGet);
-  try {
-Map map = handleResponse(response);
-return new AzkabanFetchExecuteFlowStatus(new 
AzkabanFetchExecuteFlowStatus.Execution(map));
-  } finally {
-response.close();
-  }
-} catch (Exception e) {
-  return new AzkabanFetchExecuteFlowStatus("Azkaban client cannot "
-  + "fetch execId " + execId, e);
-}
-  }
+  public AzkabanFetchExecuteFlowStatus fetchFlowExecution(String execId) 
throws AzkabanClientException {
+AzkabanMultiCallables.Fet

[GitHub] yukuai518 commented on a change in pull request #2555: [GOBBLIN-683] Add azkaban client retry logic.

2019-02-20 Thread GitBox
yukuai518 commented on a change in pull request #2555: [GOBBLIN-683] Add 
azkaban client retry logic.
URL: https://github.com/apache/incubator-gobblin/pull/2555#discussion_r258633837
 
 

 ##
 File path: 
gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClient.java
 ##
 @@ -88,19 +96,23 @@ protected AzkabanClient(String username,
   String url,
   long sessionExpireInMin,
   CloseableHttpClient httpClient,
-  SessionManager sessionManager)
+  SessionManager sessionManager,
+  ExecutorService executorService)
   throws AzkabanClientException {
 this.username = username;
 this.password = password;
 this.url = url;
 this.sessionExpireInMin = sessionExpireInMin;
 this.httpClient = httpClient;
 this.sessionManager = sessionManager;
+this.executorService = executorService;
 this.initializeClient();
 this.initializeSessionManager();
+this.intializeExecutorService();
 this.retryer = RetryerBuilder.newBuilder()
 .retryIfExceptionOfType(InvalidSessionException.class)
-.withWaitStrategy(WaitStrategies.exponentialWait(10, TimeUnit.SECONDS))
+.withAttemptTimeLimiter(AttemptTimeLimiters.fixedTimeLimit(10, 
TimeUnit.SECONDS, this.executorService))
+.withWaitStrategy(WaitStrategies.exponentialWait(60, TimeUnit.SECONDS))
 .withStopStrategy(StopStrategies.stopAfterAttempt(5))
 
 Review comment:
   Will change it 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] yukuai518 commented on a change in pull request #2555: [GOBBLIN-683] Add azkaban client retry logic.

2019-02-20 Thread GitBox
yukuai518 commented on a change in pull request #2555: [GOBBLIN-683] Add 
azkaban client retry logic.
URL: https://github.com/apache/incubator-gobblin/pull/2555#discussion_r258633782
 
 

 ##
 File path: 
gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClient.java
 ##
 @@ -123,6 +135,12 @@ private void initializeSessionManager() throws 
AzkabanClientException {
 }
   }
 
+  private void intializeExecutorService() {
+if (this.executorService == null) {
+  this.executorService = Executors.newFixedThreadPool(300);
 
 Review comment:
   Sure, I can make it configurable


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] yukuai518 commented on a change in pull request #2555: [GOBBLIN-683] Add azkaban client retry logic.

2019-02-20 Thread GitBox
yukuai518 commented on a change in pull request #2555: [GOBBLIN-683] Add 
azkaban client retry logic.
URL: https://github.com/apache/incubator-gobblin/pull/2555#discussion_r258635143
 
 

 ##
 File path: 
gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClient.java
 ##
 @@ -88,19 +96,23 @@ protected AzkabanClient(String username,
   String url,
   long sessionExpireInMin,
   CloseableHttpClient httpClient,
-  SessionManager sessionManager)
+  SessionManager sessionManager,
+  ExecutorService executorService)
   throws AzkabanClientException {
 this.username = username;
 this.password = password;
 this.url = url;
 this.sessionExpireInMin = sessionExpireInMin;
 this.httpClient = httpClient;
 this.sessionManager = sessionManager;
+this.executorService = executorService;
 this.initializeClient();
 this.initializeSessionManager();
+this.intializeExecutorService();
 this.retryer = RetryerBuilder.newBuilder()
 .retryIfExceptionOfType(InvalidSessionException.class)
-.withWaitStrategy(WaitStrategies.exponentialWait(10, TimeUnit.SECONDS))
+.withAttemptTimeLimiter(AttemptTimeLimiters.fixedTimeLimit(10, 
TimeUnit.SECONDS, this.executorService))
 
 Review comment:
   The decision would be based on if you want to share the Azkaban producer or 
not, if you want to share the producer, then 300 is okay, but if you don't want 
to share, each job request will have its own AzkabanClient, then you could use 
SingleThreadPool. It depends on the implementation, that's why I give user 
option to setExecutorService() by themselves if they want to change the default 
behavior.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] yukuai518 commented on a change in pull request #2555: [GOBBLIN-683] Add azkaban client retry logic.

2019-02-20 Thread GitBox
yukuai518 commented on a change in pull request #2555: [GOBBLIN-683] Add 
azkaban client retry logic.
URL: https://github.com/apache/incubator-gobblin/pull/2555#discussion_r258633782
 
 

 ##
 File path: 
gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClient.java
 ##
 @@ -123,6 +135,12 @@ private void initializeSessionManager() throws 
AzkabanClientException {
 }
   }
 
+  private void intializeExecutorService() {
+if (this.executorService == null) {
+  this.executorService = Executors.newFixedThreadPool(300);
 
 Review comment:
   Sure, I can make it configurable


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] yukuai518 commented on a change in pull request #2555: [GOBBLIN-683] Add azkaban client retry logic.

2019-02-20 Thread GitBox
yukuai518 commented on a change in pull request #2555: [GOBBLIN-683] Add 
azkaban client retry logic.
URL: https://github.com/apache/incubator-gobblin/pull/2555#discussion_r258638119
 
 

 ##
 File path: 
gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClient.java
 ##
 @@ -123,6 +135,12 @@ private void initializeSessionManager() throws 
AzkabanClientException {
 }
   }
 
+  private void intializeExecutorService() {
+if (this.executorService == null) {
+  this.executorService = Executors.newFixedThreadPool(300);
 
 Review comment:
   @sv2000 I change it back to 30 now, the whole idea is to let user customize 
their own executorService which meets their requirement. So here is just a 
default fallback.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sv2000 opened a new pull request #2560: GOBBLIN-688: Make FsJobStatusRetriever config more scoped.

2019-02-20 Thread GitBox
sv2000 opened a new pull request #2560: GOBBLIN-688: Make FsJobStatusRetriever 
config more scoped.
URL: https://github.com/apache/incubator-gobblin/pull/2560
 
 
   Dear Gobblin maintainers,
   
   Please accept this PR. I understand that it will not be reviewed until I 
have checked off all the steps below!
   
   
   ### JIRA
   - [x] My PR addresses the following [Gobblin 
JIRA](https://issues.apache.org/jira/browse/GOBBLIN/) issues and references 
them in the PR title. For example, "[GOBBLIN-XXX] My Gobblin PR"
   - https://issues.apache.org/jira/browse/GOBBLIN-688
   
   
   ### Description
   - [x] Here are some details about my PR, including screenshots (if 
applicable):
   The proposed enhancement adds a configuration prefix to FsJobStatusRetriever 
config to make it more scoped.
   
   ### Tests
   - [x] My PR adds the following unit tests __OR__ does not need testing for 
this extremely good reason:
   FsJobStatusRetrieverTest. 
   
   ### Commits
   - [x] My commits all reference JIRA issues in their subject lines, and I 
have squashed multiple commits if they address the same issue. In addition, my 
commits follow the guidelines from "[How to write a good git commit 
message](http://chris.beams.io/posts/git-commit/)":
   1. Subject is separated from body by a blank line
   2. Subject is limited to 50 characters
   3. Subject does not end with a period
   4. Subject uses the imperative mood ("add", not "adding")
   5. Body wraps at 72 characters
   6. Body explains "what" and "why", not "how"
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] arjun4084346 commented on issue #2560: GOBBLIN-688: Make FsJobStatusRetriever config more scoped.

2019-02-20 Thread GitBox
arjun4084346 commented on issue #2560: GOBBLIN-688: Make FsJobStatusRetriever 
config more scoped.
URL: 
https://github.com/apache/incubator-gobblin/pull/2560#issuecomment-465767426
 
 
   +1


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] asfgit closed pull request #2558: [GOBBLIN-686] Enhance schema comparison

2019-02-20 Thread GitBox
asfgit closed pull request #2558: [GOBBLIN-686] Enhance schema comparison
URL: https://github.com/apache/incubator-gobblin/pull/2558
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] autumnust commented on issue #2559: GOBBLIN-687: Pass TopologySpec map to DagManager to allow reuse of Sp…

2019-02-20 Thread GitBox
autumnust commented on issue #2559: GOBBLIN-687: Pass TopologySpec map to 
DagManager to allow reuse of Sp…
URL: 
https://github.com/apache/incubator-gobblin/pull/2559#issuecomment-465781772
 
 
   @sv2000 LGTM. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] asfgit closed pull request #2560: GOBBLIN-688: Make FsJobStatusRetriever config more scoped.

2019-02-20 Thread GitBox
asfgit closed pull request #2560: GOBBLIN-688: Make FsJobStatusRetriever config 
more scoped.
URL: https://github.com/apache/incubator-gobblin/pull/2560
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] asfgit closed pull request #2555: [GOBBLIN-683] Add azkaban client retry logic.

2019-02-20 Thread GitBox
asfgit closed pull request #2555: [GOBBLIN-683] Add azkaban client retry logic.
URL: https://github.com/apache/incubator-gobblin/pull/2555
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] asfgit closed pull request #2559: GOBBLIN-687: Pass TopologySpec map to DagManager to allow reuse of Sp…

2019-02-20 Thread GitBox
asfgit closed pull request #2559: GOBBLIN-687: Pass TopologySpec map to 
DagManager to allow reuse of Sp…
URL: https://github.com/apache/incubator-gobblin/pull/2559
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] asfgit closed pull request #2539: [GOBBLIN-666] Data too long for column 'property_key'

2019-02-21 Thread GitBox
asfgit closed pull request #2539: [GOBBLIN-666] Data too long for column 
'property_key'
URL: https://github.com/apache/incubator-gobblin/pull/2539
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] asfgit closed pull request #2538: [GOBBLIN-669] Configuration Properties Glossary section of Docs hard to read

2019-02-21 Thread GitBox
asfgit closed pull request #2538: [GOBBLIN-669] Configuration Properties 
Glossary section of Docs hard to read
URL: https://github.com/apache/incubator-gobblin/pull/2538
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] asfgit closed pull request #2554: GOBBLIN-682: Create a new constructor for DatasetCleanerJob.

2019-02-21 Thread GitBox
asfgit closed pull request #2554: GOBBLIN-682: Create a new constructor for 
DatasetCleanerJob.
URL: https://github.com/apache/incubator-gobblin/pull/2554
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] arjun4084346 opened a new pull request #2561: [GOBBLIN-689] catch unchecked exceptions in KafkaSource

2019-02-23 Thread GitBox
arjun4084346 opened a new pull request #2561: [GOBBLIN-689] catch unchecked 
exceptions in KafkaSource
URL: https://github.com/apache/incubator-gobblin/pull/2561
 
 
   Dear Gobblin maintainers,
   
   Please accept this PR. I understand that it will not be reviewed until I 
have checked off all the steps below! @htran1  please review
   
   
   ### JIRA
   - [x] My PR addresses the following [Gobblin 
JIRA](https://issues.apache.org/jira/browse/GOBBLIN/) issues and references 
them in the PR title. For example, "[GOBBLIN-XXX] My Gobblin PR"
   - https://issues.apache.org/jira/browse/GOBBLIN-XXX
   
   
   ### Description
   - [x] Here are some details about my PR, including screenshots (if 
applicable):
   workunit creation failures are not catching unchecked exception as of now. 
this may lead to adding a partition to map `partitionsToBeProcessed` but 
without adding it to list `workUnits` which is used to pack workunits into 
multiWorkUnits. this results in not running ingestion for some partitions and 
next run will not find high watermarks for those partition which in turn result 
in using earliestOffset and thus possible data loss.
   
   ### Tests
   - [x] My PR adds the following unit tests __OR__ does not need testing for 
this extremely good reason:
   changes are trivial
   
   ### Commits
   - [ ] My commits all reference JIRA issues in their subject lines, and I 
have squashed multiple commits if they address the same issue. In addition, my 
commits follow the guidelines from "[How to write a good git commit 
message](http://chris.beams.io/posts/git-commit/)":
   1. Subject is separated from body by a blank line
   2. Subject is limited to 50 characters
   3. Subject does not end with a period
   4. Subject uses the imperative mood ("add", not "adding")
   5. Body wraps at 72 characters
   6. Body explains "what" and "why", not "how"
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] yukuai518 opened a new pull request #2562: [GOBBLIN-690] Fix the planning job relaunch name match.

2019-02-25 Thread GitBox
yukuai518 opened a new pull request #2562: [GOBBLIN-690] Fix the planning job 
relaunch name match.
URL: https://github.com/apache/incubator-gobblin/pull/2562
 
 
   Dear Gobblin maintainers,
   
   Please accept this PR. I understand that it will not be reviewed until I 
have checked off all the steps below!
   
   
   ### JIRA
   - [x] My PR addresses the following [Gobblin 
JIRA](https://issues.apache.org/jira/browse/GOBBLIN/) issues and references 
them in the PR title. For example, "[GOBBLIN-XXX] My Gobblin PR"
   - https://issues.apache.org/jira/browse/GOBBLIN-690
   
   
   ### Description
   - [x] Here are some details about my PR, including screenshots (if 
applicable):
   The name match for the planning job is not correct.
   
   ### Tests
   - [x] My PR adds the following unit tests __OR__ does not need testing for 
this extremely good reason:
  
   
   ### Commits
   - [x] My commits all reference JIRA issues in their subject lines, and I 
have squashed multiple commits if they address the same issue. In addition, my 
commits follow the guidelines from "[How to write a good git commit 
message](http://chris.beams.io/posts/git-commit/)":
   1. Subject is separated from body by a blank line
   2. Subject is limited to 50 characters
   3. Subject does not end with a period
   4. Subject uses the imperative mood ("add", not "adding")
   5. Body wraps at 72 characters
   6. Body explains "what" and "why", not "how"
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] asfgit closed pull request #2562: [GOBBLIN-690] Fix the planning job relaunch name match.

2019-02-25 Thread GitBox
asfgit closed pull request #2562: [GOBBLIN-690] Fix the planning job relaunch 
name match.
URL: https://github.com/apache/incubator-gobblin/pull/2562
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] autumnust opened a new pull request #2563: [GOBBLIN-691] Make format-specific component pluggable in compaction

2019-02-26 Thread GitBox
autumnust opened a new pull request #2563: [GOBBLIN-691] Make format-specific 
component pluggable in compaction 
URL: https://github.com/apache/incubator-gobblin/pull/2563
 
 
   Dear Gobblin maintainers,
   
   Please accept this PR. I understand that it will not be reviewed until I 
have checked off all the steps below!
   
   
   ### JIRA
   - [x] My PR addresses the following [Gobblin 
JIRA](https://issues.apache.org/jira/browse/GOBBLIN/691) issues and references 
them in the PR title. 
   - https://issues.apache.org/jira/browse/GOBBLIN-691
   
   
   ### Description
   - As the first step of ORC compaction, this PR tries to make current 
compaction code insensitive to the data format: Format-specific components 
should be pluggable: `CompactionJobConfigurator ` becomes the abstract 
implementation for `CompactionAvroJobConfigurator ` which used to be the 
component for Avro-specific logic, so that we can implement other formats' 
configurator to use the same dataset discovery and verification logic.  All 
changes should be backward compatible. 
   - Put `deprecation` annotation for old compaction implementation. Instead, 
we should use `CompactionSource` + `MRCompactionTask` to launch MR job. 
   - Fix some of typo while reading thru. code. 
   
   
   ### Tests
   - [x] My PR adds the following unit tests __OR__ does not need testing for 
this extremely good reason:
   
   
   ### Commits
   - [x] My commits all reference JIRA issues in their subject lines, and I 
have squashed multiple commits if they address the same issue. In addition, my 
commits follow the guidelines from "[How to write a good git commit 
message](http://chris.beams.io/posts/git-commit/)":
   1. Subject is separated from body by a blank line
   2. Subject is limited to 50 characters
   3. Subject does not end with a period
   4. Subject uses the imperative mood ("add", not "adding")
   5. Body wraps at 72 characters
   6. Body explains "what" and "why", not "how"
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] autumnust commented on issue #2563: [GOBBLIN-691] Make format-specific component pluggable in compaction

2019-02-26 Thread GitBox
autumnust commented on issue #2563: [GOBBLIN-691] Make format-specific 
component pluggable in compaction 
URL: 
https://github.com/apache/incubator-gobblin/pull/2563#issuecomment-467668174
 
 
   @yukuai518 @ibuenros Can you help review ? Thanks. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sv2000 opened a new pull request #2564: GOBBLIN-692: Add support to query last K flow executions in Gobblin-a…

2019-02-26 Thread GitBox
sv2000 opened a new pull request #2564: GOBBLIN-692: Add support to query last 
K flow executions in Gobblin-a…
URL: https://github.com/apache/incubator-gobblin/pull/2564
 
 
   …s-a-Service (GaaS).
   
   Dear Gobblin maintainers,
   
   Please accept this PR. I understand that it will not be reviewed until I 
have checked off all the steps below!
   
   
   ### JIRA
   - [x] My PR addresses the following [Gobblin 
JIRA](https://issues.apache.org/jira/browse/GOBBLIN/) issues and references 
them in the PR title. For example, "[GOBBLIN-XXX] My Gobblin PR"
   - https://issues.apache.org/jira/browse/GOBBLIN-692
   
   
   ### Description
   - [x] Here are some details about my PR, including screenshots (if 
applicable):
   Currently, REST APIs only support retrieving the latest execution of a flow. 
We enhance the APIs to query the last K executions where K is passed as a query 
parameter.
   
   
   ### Tests
   - [x] My PR adds the following unit tests __OR__ does not need testing for 
this extremely good reason:
   Enhance unit tests in FlowStatusTest and FsJobStatusRetrieverTest classes.
   
   ### Commits
   - [x] My commits all reference JIRA issues in their subject lines, and I 
have squashed multiple commits if they address the same issue. In addition, my 
commits follow the guidelines from "[How to write a good git commit 
message](http://chris.beams.io/posts/git-commit/)":
   1. Subject is separated from body by a blank line
   2. Subject is limited to 50 characters
   3. Subject does not end with a period
   4. Subject uses the imperative mood ("add", not "adding")
   5. Body wraps at 72 characters
   6. Body explains "what" and "why", not "how"
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] asfgit closed pull request #2564: GOBBLIN-692: Add support to query last K flow executions in Gobblin-a…

2019-02-27 Thread GitBox
asfgit closed pull request #2564: GOBBLIN-692: Add support to query last K flow 
executions in Gobblin-a…
URL: https://github.com/apache/incubator-gobblin/pull/2564
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] ibuenros commented on a change in pull request #2563: [GOBBLIN-691] Make format-specific component pluggable in compaction

2019-02-28 Thread GitBox
ibuenros commented on a change in pull request #2563: [GOBBLIN-691] Make 
format-specific component pluggable in compaction 
URL: https://github.com/apache/incubator-gobblin/pull/2563#discussion_r261352026
 
 

 ##
 File path: 
gobblin-compaction/src/main/java/org/apache/gobblin/compaction/CompactorFactory.java
 ##
 @@ -29,6 +29,7 @@
 
 /**
  * A factory responsible for creating {@link Compactor}s.
+ * @deprecated
 
 Review comment:
   The annotation on javadoc is intended to explain reasons / alternatives to 
this API (which you should do). The actual deprecation annotation goes outside 
of the javadoc (`@Deprecated`)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] ibuenros commented on a change in pull request #2563: [GOBBLIN-691] Make format-specific component pluggable in compaction

2019-02-28 Thread GitBox
ibuenros commented on a change in pull request #2563: [GOBBLIN-691] Make 
format-specific component pluggable in compaction 
URL: https://github.com/apache/incubator-gobblin/pull/2563#discussion_r261407910
 
 

 ##
 File path: 
gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionAvroJobConfigurator.java
 ##
 @@ -138,25 +106,27 @@ private Schema getKeySchema(Job job, Schema topicSchema) 
throws IOException {
 keySchema = AvroUtils.parseSchemaFromFile(keySchemaFile, this.fs);
   } catch (IOException e) {
 log.error("Failed to parse avro schema from " + keySchemaFile
-+ ", using key attributes in the schema for compaction");
-keySchema = 
AvroUtils.removeUncomparableFields(MRCompactorAvroKeyDedupJobRunner.getKeySchema(topicSchema)).get();
++ ", using key attributes in the schema for compaction");
+keySchema =
+
AvroUtils.removeUncomparableFields(MRCompactorAvroKeyDedupJobRunner.getKeySchema(topicSchema)).get();
   }
 
   if (!MRCompactorAvroKeyDedupJobRunner.isKeySchemaValid(keySchema, 
topicSchema)) {
 log.warn(String.format("Key schema %s is not compatible with record 
schema %s.", keySchema, topicSchema)
-+ "Using key attributes in the schema for compaction");
-keySchema = 
AvroUtils.removeUncomparableFields(MRCompactorAvroKeyDedupJobRunner.getKeySchema(topicSchema)).get();
++ "Using key attributes in the schema for compaction");
+keySchema =
+
AvroUtils.removeUncomparableFields(MRCompactorAvroKeyDedupJobRunner.getKeySchema(topicSchema)).get();
   }
 } else {
   log.info("Property " + 
MRCompactorAvroKeyDedupJobRunner.COMPACTION_JOB_AVRO_KEY_SCHEMA_LOC
-  + " not provided. Using key attributes in the schema for 
compaction");
+  + " not provided. Using key attributes in the schema for 
compaction");
   keySchema = 
AvroUtils.removeUncomparableFields(MRCompactorAvroKeyDedupJobRunner.getKeySchema(topicSchema)).get();
 }
 
 return keySchema;
   }
 
-  private void configureSchema(Job job) throws IOException {
+  protected void configureSchema(Job job) throws IOException {
 
 Review comment:
   Please add `@Override` annotation.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] ibuenros commented on a change in pull request #2563: [GOBBLIN-691] Make format-specific component pluggable in compaction

2019-02-28 Thread GitBox
ibuenros commented on a change in pull request #2563: [GOBBLIN-691] Make 
format-specific component pluggable in compaction 
URL: https://github.com/apache/incubator-gobblin/pull/2563#discussion_r261408097
 
 

 ##
 File path: 
gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionJobConfigurator.java
 ##
 @@ -0,0 +1,364 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.compaction.mapreduce;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.google.common.primitives.Ints;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.math3.primes.Primes;
+import org.apache.gobblin.compaction.dataset.DatasetHelper;
+import 
org.apache.gobblin.compaction.mapreduce.avro.MRCompactorAvroKeyDedupJobRunner;
+import org.apache.gobblin.compaction.parser.CompactionPathParser;
+import org.apache.gobblin.compaction.verify.InputRecordCountHelper;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.dataset.FileSystemDataset;
+import org.apache.gobblin.hive.policy.HiveRegistrationPolicy;
+import org.apache.gobblin.util.FileListUtils;
+import org.apache.gobblin.util.HadoopUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.TaskCompletionEvent;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
+
+/**
+ * Configurator for compaction job.
+ * Different data formats should have their own impl. for this interface.
+ *
+ */
+@Slf4j
+public abstract class CompactionJobConfigurator {
+
+  public static final String COMPACTION_JOB_CONFIGURATOR_FACTORY_CLASS_KEY = 
"compaction.jobConfiguratorFactory.class";
+  public static final String DEFAULT_COMPACTION_JOB_CONFIGURATOR_FACTORY_CLASS 
=
+  
"org.apache.gobblin.compaction.mapreduce.CompactionAvroJobConfigurator$Factory";
+
+
+  @Getter
+  @AllArgsConstructor
+  protected enum EXTENSION {
+AVRO("avro"), ORC("orc");
+
+private String extensionString;
+  }
+
+  protected final State state;
+
+  @Getter
+  protected final FileSystem fs;
+
+  // Below attributes are MR related
+  @Getter
+  protected Job configuredJob;
+  @Getter
+  protected final boolean shouldDeduplicate;
+  @Getter
+  protected Path mrOutputPath = null;
+  @Getter
+  protected boolean isJobCreated = false;
+  @Getter
+  protected Collection mapReduceInputPaths = null;
+  @Getter
+  protected long fileNameRecordCount = 0;
+
+  public interface ConfiguratorFactory {
+CompactionJobConfigurator createConfigurator(State state) throws 
IOException;
+  }
+
+  public CompactionJobConfigurator(State state) throws IOException {
+this.state = state;
+this.fs = getFileSystem(state);
+this.shouldDeduplicate = 
state.getPropAsBoolean(MRCompactor.COMPACTION_SHOULD_DEDUPLICATE, true);
+  }
+
+  public static CompactionJobConfigurator instantiateConfigurator(State state) 
{
+String compactionConfiguratorFactoryClass =
+state.getProp(COMPACTION_JOB_CONFIGURATOR_FACTORY_CLASS_KEY, 
DEFAULT_COMPACTION_JOB_CONFIGURATOR_FACTORY_CLASS);
+try {
+  return Class.forName(compactionConfiguratorFactoryClass)
+  .asSubclass(ConfiguratorFactory.class)
+  .newInstance()
+  .createConfigurator(state);
+} catch (ReflectiveOperationException | IOException e) {
+  throw new RuntimeException("Failed to instantiate a instance of job 
configurator:", e);
+}
+  }
+
+  public abstract String getFileExtension();
+
+  /**
+   * Customized MR job crea

[GitHub] yukuai518 commented on issue #2563: [GOBBLIN-691] Make format-specific component pluggable in compaction

2019-02-28 Thread GitBox
yukuai518 commented on issue #2563: [GOBBLIN-691] Make format-specific 
component pluggable in compaction 
URL: 
https://github.com/apache/incubator-gobblin/pull/2563#issuecomment-468507094
 
 
   +1 LGTM.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sv2000 commented on issue #2548: [GOBBLIN-677] - Allow early termination of Gobblin jobs based on a predicate on the job progress

2019-02-28 Thread GitBox
sv2000 commented on issue #2548: [GOBBLIN-677] - Allow early termination of 
Gobblin jobs based on a predicate on the job progress
URL: 
https://github.com/apache/incubator-gobblin/pull/2548#issuecomment-468547996
 
 
   @ibuenros can you ensure that the travis build is successful before @htran1 
can merge this PR?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] htran1 opened a new pull request #2565: [GOBBLIN-693] Add ORC hive serde manager

2019-03-01 Thread GitBox
htran1 opened a new pull request #2565: [GOBBLIN-693] Add ORC hive serde manager
URL: https://github.com/apache/incubator-gobblin/pull/2565
 
 
   Dear Gobblin maintainers,
   
   Please accept this PR. I understand that it will not be reviewed until I 
have checked off all the steps below!
   
   
   ### JIRA
   - [X] My PR addresses the following [Gobblin 
JIRA](https://issues.apache.org/jira/browse/GOBBLIN/) issues and references 
them in the PR title. For example, "[GOBBLIN-XXX] My Gobblin PR"
   - https://issues.apache.org/jira/browse/GOBBLIN-693
   
   
   ### Description
   - [X] Here are some details about my PR, including screenshots (if 
applicable):
   Created an ORC serde manager for hive registration.
   
   ### Tests
   - [X] My PR adds the following unit tests __OR__ does not need testing for 
this extremely good reason:
   
   
   ### Commits
   - [X] My commits all reference JIRA issues in their subject lines, and I 
have squashed multiple commits if they address the same issue. In addition, my 
commits follow the guidelines from "[How to write a good git commit 
message](http://chris.beams.io/posts/git-commit/)":
   1. Subject is separated from body by a blank line
   2. Subject is limited to 50 characters
   3. Subject does not end with a period
   4. Subject uses the imperative mood ("add", not "adding")
   5. Body wraps at 72 characters
   6. Body explains "what" and "why", not "how"
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] htran1 commented on issue #2565: [GOBBLIN-693] Add ORC hive serde manager

2019-03-01 Thread GitBox
htran1 commented on issue #2565: [GOBBLIN-693] Add ORC hive serde manager
URL: 
https://github.com/apache/incubator-gobblin/pull/2565#issuecomment-468736489
 
 
   @autumnust, please take a look.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] ibuenros commented on a change in pull request #2565: [GOBBLIN-693] Add ORC hive serde manager

2019-03-01 Thread GitBox
ibuenros commented on a change in pull request #2565: [GOBBLIN-693] Add ORC 
hive serde manager
URL: https://github.com/apache/incubator-gobblin/pull/2565#discussion_r261696977
 
 

 ##
 File path: 
gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveSerDeManager.java
 ##
 @@ -79,7 +80,8 @@ protected HiveSerDeManager(State props) {
   public abstract boolean haveSameSchema(HiveRegistrationUnit unit1, 
HiveRegistrationUnit unit2) throws IOException;
 
   public enum Implementation {
-AVRO(HiveAvroSerDeManager.class.getName());
+AVRO(HiveAvroSerDeManager.class.getName()),
 
 Review comment:
   Should we use service providers instead of a hard-coded list of 
implementations? This prevents third parties from adding their own 
implementations.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] htran1 commented on a change in pull request #2565: [GOBBLIN-693] Add ORC hive serde manager

2019-03-01 Thread GitBox
htran1 commented on a change in pull request #2565: [GOBBLIN-693] Add ORC hive 
serde manager
URL: https://github.com/apache/incubator-gobblin/pull/2565#discussion_r261732673
 
 

 ##
 File path: 
gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveSerDeManager.java
 ##
 @@ -79,7 +80,8 @@ protected HiveSerDeManager(State props) {
   public abstract boolean haveSameSchema(HiveRegistrationUnit unit1, 
HiveRegistrationUnit unit2) throws IOException;
 
   public enum Implementation {
-AVRO(HiveAvroSerDeManager.class.getName());
+AVRO(HiveAvroSerDeManager.class.getName()),
 
 Review comment:
   Existing code has a fallback to treating the implementation name as a class 
name and instantiating the class using the name. The implementation names here 
are aliases to the class name.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-gobblin] asfgit closed pull request #2548: [GOBBLIN-677] - Allow early termination of Gobblin jobs based on a predicate on the job progress

2019-03-04 Thread GitBox
asfgit closed pull request #2548: [GOBBLIN-677] - Allow early termination of 
Gobblin jobs based on a predicate on the job progress
URL: https://github.com/apache/incubator-gobblin/pull/2548
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-gobblin] asfgit closed pull request #2563: [GOBBLIN-691] Make format-specific component pluggable in compaction

2019-03-05 Thread GitBox
asfgit closed pull request #2563: [GOBBLIN-691] Make format-specific component 
pluggable in compaction 
URL: https://github.com/apache/incubator-gobblin/pull/2563
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-gobblin] autumnust opened a new pull request #2566: [GOBBLIN-695] Adding utility functions to generate Avro/ORC binary using json

2019-03-05 Thread GitBox
autumnust opened a new pull request #2566: [GOBBLIN-695] Adding utility 
functions to generate Avro/ORC binary using json
URL: https://github.com/apache/incubator-gobblin/pull/2566
 
 
   Create new module for code that generates binary files from json file
   
   Dear Gobblin maintainers,
   
   Please accept this PR. I understand that it will not be reviewed until I 
have checked off all the steps below!
   
   
   ### JIRA
   - [x] My PR addresses the following [Gobblin 
JIRA](https://issues.apache.org/jira/browse/GOBBLIN-695/) issues and references 
them in the PR title. For example, "[GOBBLIN-695] My Gobblin PR"
   - https://issues.apache.org/jira/browse/GOBBLIN-695
   
   ### Description
   - [x] Here are some details about my PR, including screenshots (if 
applicable):
   
   Using `AvroTestTools` / `OrcTestTools` to generate Avro/ORC file from Json 
file and `.avsc` schema file.
   
   
   ### Tests
   - [x] My PR adds the following unit tests __OR__ does not need testing for 
this extremely good reason:
   
   
   ### Commits
   - [x] My commits all reference JIRA issues in their subject lines, and I 
have squashed multiple commits if they address the same issue. In addition, my 
commits follow the guidelines from "[How to write a good git commit 
message](http://chris.beams.io/posts/git-commit/)":
   1. Subject is separated from body by a blank line
   2. Subject is limited to 50 characters
   3. Subject does not end with a period
   4. Subject uses the imperative mood ("add", not "adding")
   5. Body wraps at 72 characters
   6. Body explains "what" and "why", not "how"
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-gobblin] autumnust commented on issue #2566: [GOBBLIN-695] Adding utility functions to generate Avro/ORC binary using json

2019-03-05 Thread GitBox
autumnust commented on issue #2566: [GOBBLIN-695] Adding utility functions to 
generate Avro/ORC binary using json
URL: 
https://github.com/apache/incubator-gobblin/pull/2566#issuecomment-469850988
 
 
   @htran1  @ibuenros  Please help review, thanks. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-gobblin] sv2000 opened a new pull request #2567: GOBBLIN-696: Provide an "explain" option to return a compiled flow wh…

2019-03-05 Thread GitBox
sv2000 opened a new pull request #2567: GOBBLIN-696: Provide an "explain" 
option to return a compiled flow wh…
URL: https://github.com/apache/incubator-gobblin/pull/2567
 
 
   …en a flow config is added.
   
   Dear Gobblin maintainers,
   
   Please accept this PR. I understand that it will not be reviewed until I 
have checked off all the steps below!
   
   
   ### JIRA
   - [x] My PR addresses the following [Gobblin 
JIRA](https://issues.apache.org/jira/browse/GOBBLIN/) issues and references 
them in the PR title. For example, "[GOBBLIN-XXX] My Gobblin PR"
   - https://issues.apache.org/jira/browse/GOBBLIN-696
   
   
   ### Description
   - [x] Here are some details about my PR, including screenshots (if 
applicable):
   We add support for an "explain" option in Gobblin-as-a-Service (GaaS) flow 
creation requests to return the expected output of flow compilation. The 
"explain" option allows end users to validate their FlowConfig requests by 
ensuring that: 1. the request results in a successful compilation and 2. that 
the compiled output is as expected. Further, the "explain" option allows users 
to query GaaS without any side-effects i.e. no FlowSpecs are actually 
created/scheduled on GaaS.  
   
   
   ### Tests
   - [x] My PR adds the following unit tests __OR__ does not need testing for 
this extremely good reason:
   Enhanced unit tests. End-to-end testing against a locally deployed instance.
   
   ### Commits
   - [x] My commits all reference JIRA issues in their subject lines, and I 
have squashed multiple commits if they address the same issue. In addition, my 
commits follow the guidelines from "[How to write a good git commit 
message](http://chris.beams.io/posts/git-commit/)":
   1. Subject is separated from body by a blank line
   2. Subject is limited to 50 characters
   3. Subject does not end with a period
   4. Subject uses the imperative mood ("add", not "adding")
   5. Body wraps at 72 characters
   6. Body explains "what" and "why", not "how"
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-gobblin] sv2000 commented on issue #2567: GOBBLIN-696: Provide an "explain" option to return a compiled flow wh…

2019-03-05 Thread GitBox
sv2000 commented on issue #2567: GOBBLIN-696: Provide an "explain" option to 
return a compiled flow wh…
URL: 
https://github.com/apache/incubator-gobblin/pull/2567#issuecomment-469878966
 
 
   @yukuai518 can you review this PR?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-gobblin] ibuenros opened a new pull request #2568: [GOBBLIN-697] Implementation of data file versioning and preservation in distcp.

2019-03-05 Thread GitBox
ibuenros opened a new pull request #2568: [GOBBLIN-697] Implementation of data 
file versioning and preservation in distcp.
URL: https://github.com/apache/incubator-gobblin/pull/2568
 
 
   Dear Gobblin maintainers,
   
   Please accept this PR. I understand that it will not be reviewed until I 
have checked off all the steps below!
   
   
   ### JIRA
   - [ ] My PR addresses the following [Gobblin 
JIRA](https://issues.apache.org/jira/browse/GOBBLIN/) issues and references 
them in the PR title. For example, "[GOBBLIN-XXX] My Gobblin PR"
   - https://issues.apache.org/jira/browse/GOBBLIN-XXX
   
   
   ### Description
   - [ ] Here are some details about my PR, including screenshots (if 
applicable):
   
   
   ### Tests
   - [ ] My PR adds the following unit tests __OR__ does not need testing for 
this extremely good reason:
   
   
   ### Commits
   - [ ] My commits all reference JIRA issues in their subject lines, and I 
have squashed multiple commits if they address the same issue. In addition, my 
commits follow the guidelines from "[How to write a good git commit 
message](http://chris.beams.io/posts/git-commit/)":
   1. Subject is separated from body by a blank line
   2. Subject is limited to 50 characters
   3. Subject does not end with a period
   4. Subject uses the imperative mood ("add", not "adding")
   5. Body wraps at 72 characters
   6. Body explains "what" and "why", not "how"
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-gobblin] ibuenros commented on issue #2568: [GOBBLIN-697] Implementation of data file versioning and preservation in distcp.

2019-03-05 Thread GitBox
ibuenros commented on issue #2568: [GOBBLIN-697] Implementation of data file 
versioning and preservation in distcp.
URL: 
https://github.com/apache/incubator-gobblin/pull/2568#issuecomment-469901739
 
 
   @yukuai518 @htran1 can you review? Thanks!


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-gobblin] sv2000 opened a new pull request #2569: GOBBLIN-698: Enhance logging to print job and flow details when a job…

2019-03-05 Thread GitBox
sv2000 opened a new pull request #2569: GOBBLIN-698: Enhance logging to print 
job and flow details when a job…
URL: https://github.com/apache/incubator-gobblin/pull/2569
 
 
   … is orchestrated by GaaS.
   
   Dear Gobblin maintainers,
   
   Please accept this PR. I understand that it will not be reviewed until I 
have checked off all the steps below!
   
   
   ### JIRA
   - [x] My PR addresses the following [Gobblin 
JIRA](https://issues.apache.org/jira/browse/GOBBLIN/) issues and references 
them in the PR title. For example, "[GOBBLIN-XXX] My Gobblin PR"
   - https://issues.apache.org/jira/browse/GOBBLIN-698
   
   
   ### Description
   - [x] Here are some details about my PR, including screenshots (if 
applicable):
   We enhance logging in GaaS to add job and flow details when a job is 
orchestrated by GaaS.
   
   
   
   ### Tests
   - [x] My PR adds the following unit tests __OR__ does not need testing for 
this extremely good reason:
   Tested in local set up.
   
   ### Commits
   - [x] My commits all reference JIRA issues in their subject lines, and I 
have squashed multiple commits if they address the same issue. In addition, my 
commits follow the guidelines from "[How to write a good git commit 
message](http://chris.beams.io/posts/git-commit/)":
   1. Subject is separated from body by a blank line
   2. Subject is limited to 50 characters
   3. Subject does not end with a period
   4. Subject uses the imperative mood ("add", not "adding")
   5. Body wraps at 72 characters
   6. Body explains "what" and "why", not "how"
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-gobblin] asfgit closed pull request #2569: GOBBLIN-698: Enhance logging to print job and flow details when a job…

2019-03-05 Thread GitBox
asfgit closed pull request #2569: GOBBLIN-698: Enhance logging to print job and 
flow details when a job…
URL: https://github.com/apache/incubator-gobblin/pull/2569
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-gobblin] htran1 commented on a change in pull request #2566: [GOBBLIN-695] Adding utility functions to generate Avro/ORC binary using json

2019-03-06 Thread GitBox
htran1 commented on a change in pull request #2566: [GOBBLIN-695] Adding 
utility functions to generate Avro/ORC binary using json
URL: https://github.com/apache/incubator-gobblin/pull/2566#discussion_r263076168
 
 

 ##
 File path: 
gobblin-binary-management/src/main/java/org/apache/gobblin/binary_creation/AvroTestTools.java
 ##
 @@ -0,0 +1,375 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.binary_creation;
+
+import com.google.common.collect.AbstractIterator;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.Spliterators;
+import java.util.TreeMap;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import javax.annotation.Nullable;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.experimental.Delegate;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.file.SeekableInput;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.mapred.FsInput;
+import org.apache.gobblin.util.FileListUtils;
+import org.apache.gobblin.util.PathUtils;
+import org.apache.gobblin.util.filters.HiddenFilter;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.reflections.Reflections;
+import org.reflections.scanners.ResourcesScanner;
+import org.reflections.util.ConfigurationBuilder;
+
+
+/**
+ * A implementation of {@link DataTestTools} for Avro format.
+ */
+@Slf4j
+public class AvroTestTools extends DataTestTools {
+
+  public boolean checkSameFilesAndRecords(TreeMap 
expected,
+  TreeMap observed, boolean allowDifferentOrder, 
Collection blacklistRecordFields,
+  boolean allowDifferentSchema) {
+Iterator keys1 = expected.navigableKeySet().iterator();
+Iterator keys2 = observed.navigableKeySet().iterator();
+
+return compareIterators(keys1, keys2, (key1, key2) -> {
+  if (!removeExtension(key1).equals(removeExtension(key2))) {
+log.error(String.format("Mismatched files: %s and %s", key1, key2));
+return false;
+  }
+
+  RecordIterator it1 = expected.get(key1);
+  RecordIterator it2 = observed.get(key2);
+
+  if (!allowDifferentSchema && !it1.getSchema().equals(it2.getSchema())) {
+log.error(String.format("Mismatched schemas: %s and %s", key1, key2));
+return false;
+  }
+
+  if (allowDifferentOrder) {
+Set r1 = allowDifferentSchema
+? toSetWithBlacklistedFields(it1, blacklistRecordFields, 
GenericRecordWrapper::new)
+: toSetWithBlacklistedFields(it1, blacklistRecordFields, 
Function.identity());
+Set r2 = allowDifferentSchema
+? toSetWithBlacklistedFields(it2, blacklistRecordFields, 
GenericRecordWrapper::new)
+: toSetWithBlacklistedFields(it2, blacklistRecordFields, 
Function.identity());
+if (r1.equals(r2)) {
+  return true;
+} else {
+  log.info("Sets of records differ.");
+  return false;
+}
+  } else {
+return compareIterators(it1, it2, (r1, r2) -> {
+  if (blacklistRecordFields != null) {
+for (String blacklisted : blacklistRecordFields) {
+  r1.put(blacklisted, null);
+  r2.put(blacklisted, null);
+}
+  }
+  return allowDifferentSchema ?
+  GenericRecordWrapper.compareGenericRecordRegardlessOfSchema(r1, 
r2) : r1.equals(r2);
+});
+  }
+});
+  }
+
+  private static  Set toSetWithBlacklistedFields(Iterator 
it,
+  Collection blacklistRecordFields, Functio

[GitHub] [incubator-gobblin] htran1 commented on a change in pull request #2566: [GOBBLIN-695] Adding utility functions to generate Avro/ORC binary using json

2019-03-06 Thread GitBox
htran1 commented on a change in pull request #2566: [GOBBLIN-695] Adding 
utility functions to generate Avro/ORC binary using json
URL: https://github.com/apache/incubator-gobblin/pull/2566#discussion_r263095840
 
 

 ##
 File path: 
gobblin-binary-management/src/main/java/org/apache/gobblin/binary_creation/OrcTestTools.java
 ##
 @@ -0,0 +1,408 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.binary_creation;
+
+import com.google.common.collect.AbstractIterator;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Properties;
+import java.util.TreeMap;
+import javax.annotation.Nullable;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.experimental.Delegate;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.gobblin.util.FileListUtils;
+import org.apache.gobblin.util.PathUtils;
+import org.apache.gobblin.util.filters.HiddenFilter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.hive.ql.io.orc.OrcFile;
+import org.apache.hadoop.hive.ql.io.orc.OrcStruct;
+import org.apache.hadoop.hive.ql.io.orc.Reader;
+import org.apache.hadoop.hive.ql.io.orc.RecordReader;
+import org.apache.hadoop.hive.ql.io.orc.Writer;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.avro.AvroGenericRecordWritable;
+import org.apache.hadoop.hive.serde2.avro.AvroObjectInspectorGenerator;
+import org.apache.hadoop.hive.serde2.avro.AvroSerDe;
+import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.ShortWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+
+import static org.apache.gobblin.binary_creation.AvroTestTools.*;
+
+
+// A class that examines ORC-Format file in Purger Integration test.
+@Slf4j
+public class OrcTestTools extends DataTestTools {
+
+  /**
+   *
+   * @param expected
+   * @param observed
+   * @param allowDifferentOrder ORC tools will not use this parameter 
currently.
+   * @param blacklistRecordFields ORC tools will not use this parameter 
currently.
+   * @return If two sets of files are identical.
+   * Note that there might be an ordering issue in this comparison method. 
When one is drafting an ORC integration
+   * test, try to name all json files differently.
+   */
+  @Override
+  public boolean checkSameFilesAndRecords(TreeMap 
expected,
+  TreeMap observed, boolean allowDifferentOrder, 
Collection blacklistRecordFields,
+  boolean allowDifferentSchema) {
+Iterator keys1 = expected.navigableKeySet().iterator();
+Iterator keys2 = observed.navigableKeySet().iterator();
+
+return compareIterators(keys1, keys2, (key1, key2) -> {
+  // ORC file doesn't have extension by Linkedin's convention.
+  if (!removeExtension(key1).equals(key2)) {
+log.error(String.format("Mismatched files: %s and %s", key1, key2));
+return false;
+  }
+
+  OrcRowIterator it1 = expected.get(key1);
+  OrcRowIterator it2 = observed.get(key2);
+
+  if (!it1.getTypeInfo().equals(it2.getTypeInfo())) {
+log.error(String.format("Mismatched Typeinfo: %s and %s", key1, key2));
+return false;
+  }
+
+  boolean result = true;
+  while (it1.hasNext()) {
+if (!it2.hasNext()) {
+  retur

[GitHub] [incubator-gobblin] htran1 commented on a change in pull request #2566: [GOBBLIN-695] Adding utility functions to generate Avro/ORC binary using json

2019-03-06 Thread GitBox
htran1 commented on a change in pull request #2566: [GOBBLIN-695] Adding 
utility functions to generate Avro/ORC binary using json
URL: https://github.com/apache/incubator-gobblin/pull/2566#discussion_r263070410
 
 

 ##
 File path: 
gobblin-binary-management/src/main/java/org/apache/gobblin/binary_creation/AvroTestTools.java
 ##
 @@ -0,0 +1,375 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.binary_creation;
+
+import com.google.common.collect.AbstractIterator;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.Spliterators;
+import java.util.TreeMap;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import javax.annotation.Nullable;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.experimental.Delegate;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.file.SeekableInput;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.mapred.FsInput;
+import org.apache.gobblin.util.FileListUtils;
+import org.apache.gobblin.util.PathUtils;
+import org.apache.gobblin.util.filters.HiddenFilter;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.reflections.Reflections;
+import org.reflections.scanners.ResourcesScanner;
+import org.reflections.util.ConfigurationBuilder;
+
+
+/**
+ * A implementation of {@link DataTestTools} for Avro format.
+ */
+@Slf4j
+public class AvroTestTools extends DataTestTools {
+
+  public boolean checkSameFilesAndRecords(TreeMap 
expected,
+  TreeMap observed, boolean allowDifferentOrder, 
Collection blacklistRecordFields,
+  boolean allowDifferentSchema) {
+Iterator keys1 = expected.navigableKeySet().iterator();
+Iterator keys2 = observed.navigableKeySet().iterator();
+
+return compareIterators(keys1, keys2, (key1, key2) -> {
+  if (!removeExtension(key1).equals(removeExtension(key2))) {
+log.error(String.format("Mismatched files: %s and %s", key1, key2));
+return false;
+  }
+
+  RecordIterator it1 = expected.get(key1);
+  RecordIterator it2 = observed.get(key2);
+
+  if (!allowDifferentSchema && !it1.getSchema().equals(it2.getSchema())) {
+log.error(String.format("Mismatched schemas: %s and %s", key1, key2));
+return false;
+  }
+
+  if (allowDifferentOrder) {
+Set r1 = allowDifferentSchema
+? toSetWithBlacklistedFields(it1, blacklistRecordFields, 
GenericRecordWrapper::new)
+: toSetWithBlacklistedFields(it1, blacklistRecordFields, 
Function.identity());
+Set r2 = allowDifferentSchema
+? toSetWithBlacklistedFields(it2, blacklistRecordFields, 
GenericRecordWrapper::new)
+: toSetWithBlacklistedFields(it2, blacklistRecordFields, 
Function.identity());
+if (r1.equals(r2)) {
+  return true;
+} else {
+  log.info("Sets of records differ.");
+  return false;
+}
+  } else {
+return compareIterators(it1, it2, (r1, r2) -> {
+  if (blacklistRecordFields != null) {
+for (String blacklisted : blacklistRecordFields) {
+  r1.put(blacklisted, null);
+  r2.put(blacklisted, null);
+}
+  }
+  return allowDifferentSchema ?
+  GenericRecordWrapper.compareGenericRecordRegardlessOfSchema(r1, 
r2) : r1.equals(r2);
+});
+  }
+});
+  }
+
+  private static  Set toSetWithBlacklistedFields(Iterator 
it,
+  Collection blacklistRecordFields, Functio

[GitHub] [incubator-gobblin] htran1 commented on a change in pull request #2566: [GOBBLIN-695] Adding utility functions to generate Avro/ORC binary using json

2019-03-06 Thread GitBox
htran1 commented on a change in pull request #2566: [GOBBLIN-695] Adding 
utility functions to generate Avro/ORC binary using json
URL: https://github.com/apache/incubator-gobblin/pull/2566#discussion_r263094760
 
 

 ##
 File path: 
gobblin-binary-management/src/main/java/org/apache/gobblin/binary_creation/OrcTestTools.java
 ##
 @@ -0,0 +1,408 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.binary_creation;
+
+import com.google.common.collect.AbstractIterator;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Properties;
+import java.util.TreeMap;
+import javax.annotation.Nullable;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.experimental.Delegate;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.gobblin.util.FileListUtils;
+import org.apache.gobblin.util.PathUtils;
+import org.apache.gobblin.util.filters.HiddenFilter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.hive.ql.io.orc.OrcFile;
+import org.apache.hadoop.hive.ql.io.orc.OrcStruct;
+import org.apache.hadoop.hive.ql.io.orc.Reader;
+import org.apache.hadoop.hive.ql.io.orc.RecordReader;
+import org.apache.hadoop.hive.ql.io.orc.Writer;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.avro.AvroGenericRecordWritable;
+import org.apache.hadoop.hive.serde2.avro.AvroObjectInspectorGenerator;
+import org.apache.hadoop.hive.serde2.avro.AvroSerDe;
+import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.ShortWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+
+import static org.apache.gobblin.binary_creation.AvroTestTools.*;
+
+
+// A class that examines ORC-Format file in Purger Integration test.
+@Slf4j
+public class OrcTestTools extends DataTestTools {
+
+  /**
+   *
+   * @param expected
+   * @param observed
+   * @param allowDifferentOrder ORC tools will not use this parameter 
currently.
+   * @param blacklistRecordFields ORC tools will not use this parameter 
currently.
+   * @return If two sets of files are identical.
+   * Note that there might be an ordering issue in this comparison method. 
When one is drafting an ORC integration
+   * test, try to name all json files differently.
+   */
+  @Override
+  public boolean checkSameFilesAndRecords(TreeMap 
expected,
+  TreeMap observed, boolean allowDifferentOrder, 
Collection blacklistRecordFields,
+  boolean allowDifferentSchema) {
+Iterator keys1 = expected.navigableKeySet().iterator();
+Iterator keys2 = observed.navigableKeySet().iterator();
+
+return compareIterators(keys1, keys2, (key1, key2) -> {
+  // ORC file doesn't have extension by Linkedin's convention.
+  if (!removeExtension(key1).equals(key2)) {
+log.error(String.format("Mismatched files: %s and %s", key1, key2));
+return false;
+  }
+
+  OrcRowIterator it1 = expected.get(key1);
+  OrcRowIterator it2 = observed.get(key2);
+
+  if (!it1.getTypeInfo().equals(it2.getTypeInfo())) {
+log.error(String.format("Mismatched Typeinfo: %s and %s", key1, key2));
+return false;
+  }
+
+  boolean result = true;
+  while (it1.hasNext()) {
+if (!it2.hasNext()) {
+  retur

  1   2   3   4   5   6   7   8   9   10   >