[ 
https://issues.apache.org/jira/browse/SQOOP-1803?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14345504#comment-14345504
 ] 

Jarek Jarcec Cecho commented on SQOOP-1803:
-------------------------------------------

I was thinking about this one a bit myself. I have couple of thoughts of 
getting data back from the execution engine and I'm wondering what others 
thinks. Please don't hesitate and chime in if I missed any approach.

1) DistributedCache

In addition to [~gwenshap] comments about supportability (and/or debuggability) 
of {{DistributedCache}}, to my best knowledge it can be only used to distribute 
data from the launcher (Sqoop Server) to children mapreduce tasks. I do not 
believe that it can be used to the other way around to get files or data from 
individual tasks back to the launcher. Looking at the [latest 
javadocs|https://hadoop.apache.org/docs/current/api/org/apache/hadoop/filecache/DistributedCache.html]
 it seems still valid as the documentation contains note about immutability of 
the cache when the job is submitted:

{quote}
DistributedCache tracks modification timestamps of the cache files. Clearly the 
cache files should not be modified by the application or externally while the 
job is executing.
{quote}

*Summary:* I believe that this solution is disqualified for the retrieving data 
back from execution engine.

2) Counters

[Counters|https://hadoop.apache.org/docs/current/api/org/apache/hadoop/mapred/Counters.html]
 are nice technique how to get insight into what is the mapreduce job doing. 
Multiple mappers can be updating the counters in parallel and it's mapreduce 
responsibility to ensure that counters from all children tasks are sum-upped 
correctly. The limitation of this solution is that counters can be only 
{{long}} based (e.g. no {{String}}, {{Date}}, ...). Also the counters are 
cumulative in nature, so it might be a bit difficult to retrieve discrete 
values - we would need to ensure that only certain mappers/reducers will update 
given counters whereas others won't or we would need to figure out a way how to 
decode one single value when multiple mapper/reducers will update one counter 
at the same time.

*Summary:* Whereas it's not impossible to use the counters to retrieve data 
from execution engine, it seems that this solution will impose limitations and 
will be "difficult" to implement and maintain.

3) HDFS Files

Looking into how others are solving this problem, 
[Oozie|http://oozie.apache.org] launcher tasks (=one map mapreduce jobs) are 
generating files on HDFS in predefined directory from where the Oozie server 
will pick them up to read any arbitrary values. This is neat solution as it 
allows us to retrieve any value of any type from any part of the workflow (all 
processes can create their own files if needed). The downside is that we would 
need to agree on certain location where Server and the mapreduce job will be 
exchanging files - this directory must exists and must be accessible by both 
Sqoop (running under system user) and the mapreduce job itself (most likely 
running as end user). I believe that HDFS ACLs can be easily used to accomplish 
this task.

We would need to be careful here with edge conditions - we would need to make 
sure that we're cleaning up old and unused files (job failures, ...) and that 
we are not leaking any sensitive information to the HDFS.

*Summary:* Possible solution that will support all our use cases, but will be a 
bit harder to implement.

4) Server side exchange only

I was also looking into how things work currently in the server and I've 
realized something that made me thing about this proposal. Back when we were 
defining the workflow, the intention was that only {{Initializer}} is allowed 
to generate state whereas all other parts of the workflow {{Partitioner}}, 
{{Extractor}}, {{Loader}} and {{Destroyer}} should not generate any state and 
only reuse the one that was pre-prepared in initializer. The reason for that is 
that {{Initializer}} is run only once whereas all other parts of the workflow 
are run in parallel and/or not running on Sqoop server itself, hence by 
allowing state to be generated only in {{Initializer}} we don't have to deal 
with synchronizing the parallel pieces or deal with limitations in various 
execution engines. The intention is persisted in the API when {{Initializer}} 
is given {{MutableContext}} where connector developer can set any properties 
that will be shared with rest of the workflow (~ the state) and when all other 
parts are given only {{ImmutableContext}} that doesn't allow any changes to the 
shared properties. I have to say that we have small exception in the code base, 
because {{Partitioner}} class is generating {{Partition}} objects that can 
carry some context as well. However as the {{Partition}} objects are not 
available in {{Destroyer}}, connector developer still needs to persist state 
that is required through entire workflow inside the {{Initializer}}.

Having said that, another option seems to be to simply not retrieve anything 
from the execution engine and let connector update the configuration objects 
based on info that the connector generated in {{Initializer}} - assuming that 
the job finished correctly. Looking at current connectors this should work 
well, as we need to update and persist state that is 'locked' at the 
{{Initializer}} stage. For database-base connectors the "max value" should be 
determined in initializer (it's currently not though) and the same for Kafka 
and other connectors. The beauty of this approach is that it's simple to 
implement and can actually be easily extended in the future to include data 
coming from execution engine shell there be a need for it (for the approach 3) 
for example).

> JobManager and Execution Engine changes: Support for a injecting and pulling 
> out configs and job output in connectors 
> ----------------------------------------------------------------------------------------------------------------------
>
>                 Key: SQOOP-1803
>                 URL: https://issues.apache.org/jira/browse/SQOOP-1803
>             Project: Sqoop
>          Issue Type: Sub-task
>            Reporter: Veena Basavaraj
>            Assignee: Veena Basavaraj
>             Fix For: 1.99.6
>
>
> The details are in the design wiki, as the implementation happens more 
> discussions can happen here.
> https://cwiki.apache.org/confluence/display/SQOOP/Delta+Fetch+And+Merge+Design#DeltaFetchAndMergeDesign-Howtogetoutputfromconnectortosqoop?
> The goal is to dynamically inject a IncrementalConfig instance into the 
> FromJobConfiguration. The current MFromConfig and MToConfig can already hold 
> a list of configs, and a strong sentiment was expressed to keep it as a list, 
> why not for the first time actually make use of it and group the incremental 
> related configs in one config object
> This task will prepare the FromJobConfiguration from the job config data, 
> ExtractorContext with the relevant values from the prev job run 
> This task will prepare the ToJobConfiguration from the job config data, 
> LoaderContext with the relevant values from the prev job run if any
> We will use DistributedCache to get State information from the Extractor and 
> Loader out and finally persist it into the sqoop repository depending on 
> SQOOP-1804 once the outputcommitter commit is called



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to