http://git-wip-us.apache.org/repos/asf/hbase-site/blob/d02dd5db/devapidocs/src-html/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.SecureBulkLoadListener.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.SecureBulkLoadListener.html b/devapidocs/src-html/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.SecureBulkLoadListener.html index 03578e8..d0873aa 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.SecureBulkLoadListener.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.SecureBulkLoadListener.html @@ -300,173 +300,183 @@ <span class="sourceLineNo">292</span> new SecureBulkLoadListener(fs, bulkToken, conf));<a name="line.292"></a> <span class="sourceLineNo">293</span> } catch (Exception e) {<a name="line.293"></a> <span class="sourceLineNo">294</span> LOG.error("Failed to complete bulk load", e);<a name="line.294"></a> -<span class="sourceLineNo">295</span> }<a name="line.295"></a> -<span class="sourceLineNo">296</span> return false;<a name="line.296"></a> -<span class="sourceLineNo">297</span> }<a name="line.297"></a> -<span class="sourceLineNo">298</span> });<a name="line.298"></a> -<span class="sourceLineNo">299</span> }<a name="line.299"></a> -<span class="sourceLineNo">300</span> if (region.getCoprocessorHost() != null) {<a name="line.300"></a> -<span class="sourceLineNo">301</span> try {<a name="line.301"></a> -<span class="sourceLineNo">302</span> loaded = region.getCoprocessorHost().postBulkLoadHFile(familyPaths, loaded);<a name="line.302"></a> -<span class="sourceLineNo">303</span> } catch (IOException e) {<a name="line.303"></a> -<span class="sourceLineNo">304</span> ResponseConverter.setControllerException(controller, e);<a name="line.304"></a> -<span class="sourceLineNo">305</span> done.run(SecureBulkLoadHFilesResponse.newBuilder().setLoaded(false).build());<a name="line.305"></a> -<span class="sourceLineNo">306</span> return;<a name="line.306"></a> -<span class="sourceLineNo">307</span> }<a name="line.307"></a> -<span class="sourceLineNo">308</span> }<a name="line.308"></a> -<span class="sourceLineNo">309</span> done.run(SecureBulkLoadHFilesResponse.newBuilder().setLoaded(loaded).build());<a name="line.309"></a> -<span class="sourceLineNo">310</span> }<a name="line.310"></a> -<span class="sourceLineNo">311</span><a name="line.311"></a> -<span class="sourceLineNo">312</span> private List<BulkLoadObserver> getBulkLoadObservers() {<a name="line.312"></a> -<span class="sourceLineNo">313</span> List<BulkLoadObserver> coprocessorList =<a name="line.313"></a> -<span class="sourceLineNo">314</span> this.env.getRegion().getCoprocessorHost().findCoprocessors(BulkLoadObserver.class);<a name="line.314"></a> -<span class="sourceLineNo">315</span><a name="line.315"></a> -<span class="sourceLineNo">316</span> return coprocessorList;<a name="line.316"></a> -<span class="sourceLineNo">317</span> }<a name="line.317"></a> -<span class="sourceLineNo">318</span><a name="line.318"></a> -<span class="sourceLineNo">319</span> private Path createStagingDir(Path baseDir,<a name="line.319"></a> -<span class="sourceLineNo">320</span> User user,<a name="line.320"></a> -<span class="sourceLineNo">321</span> TableName tableName) throws IOException {<a name="line.321"></a> -<span class="sourceLineNo">322</span> String tblName = tableName.getNameAsString().replace(":", "_");<a name="line.322"></a> -<span class="sourceLineNo">323</span> String randomDir = user.getShortName()+"__"+ tblName +"__"+<a name="line.323"></a> -<span class="sourceLineNo">324</span> (new BigInteger(RANDOM_WIDTH, random).toString(RANDOM_RADIX));<a name="line.324"></a> -<span class="sourceLineNo">325</span> return createStagingDir(baseDir, user, randomDir);<a name="line.325"></a> -<span class="sourceLineNo">326</span> }<a name="line.326"></a> -<span class="sourceLineNo">327</span><a name="line.327"></a> -<span class="sourceLineNo">328</span> private Path createStagingDir(Path baseDir,<a name="line.328"></a> -<span class="sourceLineNo">329</span> User user,<a name="line.329"></a> -<span class="sourceLineNo">330</span> String randomDir) throws IOException {<a name="line.330"></a> -<span class="sourceLineNo">331</span> Path p = new Path(baseDir, randomDir);<a name="line.331"></a> -<span class="sourceLineNo">332</span> fs.mkdirs(p, PERM_ALL_ACCESS);<a name="line.332"></a> -<span class="sourceLineNo">333</span> fs.setPermission(p, PERM_ALL_ACCESS);<a name="line.333"></a> -<span class="sourceLineNo">334</span> return p;<a name="line.334"></a> -<span class="sourceLineNo">335</span> }<a name="line.335"></a> -<span class="sourceLineNo">336</span><a name="line.336"></a> -<span class="sourceLineNo">337</span> private User getActiveUser() {<a name="line.337"></a> -<span class="sourceLineNo">338</span> User user = RpcServer.getRequestUser();<a name="line.338"></a> -<span class="sourceLineNo">339</span> if (user == null) {<a name="line.339"></a> -<span class="sourceLineNo">340</span> return null;<a name="line.340"></a> -<span class="sourceLineNo">341</span> }<a name="line.341"></a> -<span class="sourceLineNo">342</span><a name="line.342"></a> -<span class="sourceLineNo">343</span> //this is for testing<a name="line.343"></a> -<span class="sourceLineNo">344</span> if (userProvider.isHadoopSecurityEnabled()<a name="line.344"></a> -<span class="sourceLineNo">345</span> && "simple".equalsIgnoreCase(conf.get(User.HBASE_SECURITY_CONF_KEY))) {<a name="line.345"></a> -<span class="sourceLineNo">346</span> return User.createUserForTesting(conf, user.getShortName(), new String[]{});<a name="line.346"></a> -<span class="sourceLineNo">347</span> }<a name="line.347"></a> -<span class="sourceLineNo">348</span><a name="line.348"></a> -<span class="sourceLineNo">349</span> return user;<a name="line.349"></a> -<span class="sourceLineNo">350</span> }<a name="line.350"></a> -<span class="sourceLineNo">351</span><a name="line.351"></a> -<span class="sourceLineNo">352</span> @Override<a name="line.352"></a> -<span class="sourceLineNo">353</span> public Service getService() {<a name="line.353"></a> -<span class="sourceLineNo">354</span> return this;<a name="line.354"></a> -<span class="sourceLineNo">355</span> }<a name="line.355"></a> -<span class="sourceLineNo">356</span><a name="line.356"></a> -<span class="sourceLineNo">357</span> private static class SecureBulkLoadListener implements BulkLoadListener {<a name="line.357"></a> -<span class="sourceLineNo">358</span> // Target filesystem<a name="line.358"></a> -<span class="sourceLineNo">359</span> private FileSystem fs;<a name="line.359"></a> -<span class="sourceLineNo">360</span> private String stagingDir;<a name="line.360"></a> -<span class="sourceLineNo">361</span> private Configuration conf;<a name="line.361"></a> -<span class="sourceLineNo">362</span> // Source filesystem<a name="line.362"></a> -<span class="sourceLineNo">363</span> private FileSystem srcFs = null;<a name="line.363"></a> -<span class="sourceLineNo">364</span> private Map<String, FsPermission> origPermissions = null;<a name="line.364"></a> -<span class="sourceLineNo">365</span><a name="line.365"></a> -<span class="sourceLineNo">366</span> public SecureBulkLoadListener(FileSystem fs, String stagingDir, Configuration conf) {<a name="line.366"></a> -<span class="sourceLineNo">367</span> this.fs = fs;<a name="line.367"></a> -<span class="sourceLineNo">368</span> this.stagingDir = stagingDir;<a name="line.368"></a> -<span class="sourceLineNo">369</span> this.conf = conf;<a name="line.369"></a> -<span class="sourceLineNo">370</span> this.origPermissions = new HashMap<String, FsPermission>();<a name="line.370"></a> -<span class="sourceLineNo">371</span> }<a name="line.371"></a> -<span class="sourceLineNo">372</span><a name="line.372"></a> -<span class="sourceLineNo">373</span> @Override<a name="line.373"></a> -<span class="sourceLineNo">374</span> public String prepareBulkLoad(final byte[] family, final String srcPath) throws IOException {<a name="line.374"></a> -<span class="sourceLineNo">375</span> Path p = new Path(srcPath);<a name="line.375"></a> -<span class="sourceLineNo">376</span> Path stageP = new Path(stagingDir, new Path(Bytes.toString(family), p.getName()));<a name="line.376"></a> -<span class="sourceLineNo">377</span><a name="line.377"></a> -<span class="sourceLineNo">378</span> // In case of Replication for bulk load files, hfiles are already copied in staging directory<a name="line.378"></a> -<span class="sourceLineNo">379</span> if (p.equals(stageP)) {<a name="line.379"></a> -<span class="sourceLineNo">380</span> LOG.debug(p.getName()<a name="line.380"></a> -<span class="sourceLineNo">381</span> + " is already available in staging directory. Skipping copy or rename.");<a name="line.381"></a> -<span class="sourceLineNo">382</span> return stageP.toString();<a name="line.382"></a> -<span class="sourceLineNo">383</span> }<a name="line.383"></a> -<span class="sourceLineNo">384</span><a name="line.384"></a> -<span class="sourceLineNo">385</span> if (srcFs == null) {<a name="line.385"></a> -<span class="sourceLineNo">386</span> srcFs = FileSystem.get(p.toUri(), conf);<a name="line.386"></a> -<span class="sourceLineNo">387</span> }<a name="line.387"></a> -<span class="sourceLineNo">388</span><a name="line.388"></a> -<span class="sourceLineNo">389</span> if(!isFile(p)) {<a name="line.389"></a> -<span class="sourceLineNo">390</span> throw new IOException("Path does not reference a file: " + p);<a name="line.390"></a> -<span class="sourceLineNo">391</span> }<a name="line.391"></a> -<span class="sourceLineNo">392</span><a name="line.392"></a> -<span class="sourceLineNo">393</span> // Check to see if the source and target filesystems are the same<a name="line.393"></a> -<span class="sourceLineNo">394</span> if (!FSHDFSUtils.isSameHdfs(conf, srcFs, fs)) {<a name="line.394"></a> -<span class="sourceLineNo">395</span> LOG.debug("Bulk-load file " + srcPath + " is on different filesystem than " +<a name="line.395"></a> -<span class="sourceLineNo">396</span> "the destination filesystem. Copying file over to destination staging dir.");<a name="line.396"></a> -<span class="sourceLineNo">397</span> FileUtil.copy(srcFs, p, fs, stageP, false, conf);<a name="line.397"></a> -<span class="sourceLineNo">398</span> } else {<a name="line.398"></a> -<span class="sourceLineNo">399</span> LOG.debug("Moving " + p + " to " + stageP);<a name="line.399"></a> -<span class="sourceLineNo">400</span> FileStatus origFileStatus = fs.getFileStatus(p);<a name="line.400"></a> -<span class="sourceLineNo">401</span> origPermissions.put(srcPath, origFileStatus.getPermission());<a name="line.401"></a> -<span class="sourceLineNo">402</span> if(!fs.rename(p, stageP)) {<a name="line.402"></a> -<span class="sourceLineNo">403</span> throw new IOException("Failed to move HFile: " + p + " to " + stageP);<a name="line.403"></a> -<span class="sourceLineNo">404</span> }<a name="line.404"></a> -<span class="sourceLineNo">405</span> }<a name="line.405"></a> -<span class="sourceLineNo">406</span> fs.setPermission(stageP, PERM_ALL_ACCESS);<a name="line.406"></a> -<span class="sourceLineNo">407</span> return stageP.toString();<a name="line.407"></a> -<span class="sourceLineNo">408</span> }<a name="line.408"></a> -<span class="sourceLineNo">409</span><a name="line.409"></a> -<span class="sourceLineNo">410</span> @Override<a name="line.410"></a> -<span class="sourceLineNo">411</span> public void doneBulkLoad(byte[] family, String srcPath) throws IOException {<a name="line.411"></a> -<span class="sourceLineNo">412</span> LOG.debug("Bulk Load done for: " + srcPath);<a name="line.412"></a> -<span class="sourceLineNo">413</span> }<a name="line.413"></a> -<span class="sourceLineNo">414</span><a name="line.414"></a> -<span class="sourceLineNo">415</span> @Override<a name="line.415"></a> -<span class="sourceLineNo">416</span> public void failedBulkLoad(final byte[] family, final String srcPath) throws IOException {<a name="line.416"></a> -<span class="sourceLineNo">417</span> if (!FSHDFSUtils.isSameHdfs(conf, srcFs, fs)) {<a name="line.417"></a> -<span class="sourceLineNo">418</span> // files are copied so no need to move them back<a name="line.418"></a> -<span class="sourceLineNo">419</span> return;<a name="line.419"></a> -<span class="sourceLineNo">420</span> }<a name="line.420"></a> -<span class="sourceLineNo">421</span> Path p = new Path(srcPath);<a name="line.421"></a> -<span class="sourceLineNo">422</span> Path stageP = new Path(stagingDir,<a name="line.422"></a> -<span class="sourceLineNo">423</span> new Path(Bytes.toString(family), p.getName()));<a name="line.423"></a> +<span class="sourceLineNo">295</span> } finally {<a name="line.295"></a> +<span class="sourceLineNo">296</span> if (fs != null) {<a name="line.296"></a> +<span class="sourceLineNo">297</span> try {<a name="line.297"></a> +<span class="sourceLineNo">298</span> if(!UserGroupInformation.getCurrentUser().equals(ugi)) {<a name="line.298"></a> +<span class="sourceLineNo">299</span> FileSystem.closeAllForUGI(ugi);<a name="line.299"></a> +<span class="sourceLineNo">300</span> }<a name="line.300"></a> +<span class="sourceLineNo">301</span> } catch (IOException e) {<a name="line.301"></a> +<span class="sourceLineNo">302</span> LOG.error("Failed to close FileSystem for " + ugi.getUserName(), e);<a name="line.302"></a> +<span class="sourceLineNo">303</span> }<a name="line.303"></a> +<span class="sourceLineNo">304</span> }<a name="line.304"></a> +<span class="sourceLineNo">305</span> }<a name="line.305"></a> +<span class="sourceLineNo">306</span> return false;<a name="line.306"></a> +<span class="sourceLineNo">307</span> }<a name="line.307"></a> +<span class="sourceLineNo">308</span> });<a name="line.308"></a> +<span class="sourceLineNo">309</span> }<a name="line.309"></a> +<span class="sourceLineNo">310</span> if (region.getCoprocessorHost() != null) {<a name="line.310"></a> +<span class="sourceLineNo">311</span> try {<a name="line.311"></a> +<span class="sourceLineNo">312</span> loaded = region.getCoprocessorHost().postBulkLoadHFile(familyPaths, loaded);<a name="line.312"></a> +<span class="sourceLineNo">313</span> } catch (IOException e) {<a name="line.313"></a> +<span class="sourceLineNo">314</span> ResponseConverter.setControllerException(controller, e);<a name="line.314"></a> +<span class="sourceLineNo">315</span> done.run(SecureBulkLoadHFilesResponse.newBuilder().setLoaded(false).build());<a name="line.315"></a> +<span class="sourceLineNo">316</span> return;<a name="line.316"></a> +<span class="sourceLineNo">317</span> }<a name="line.317"></a> +<span class="sourceLineNo">318</span> }<a name="line.318"></a> +<span class="sourceLineNo">319</span> done.run(SecureBulkLoadHFilesResponse.newBuilder().setLoaded(loaded).build());<a name="line.319"></a> +<span class="sourceLineNo">320</span> }<a name="line.320"></a> +<span class="sourceLineNo">321</span><a name="line.321"></a> +<span class="sourceLineNo">322</span> private List<BulkLoadObserver> getBulkLoadObservers() {<a name="line.322"></a> +<span class="sourceLineNo">323</span> List<BulkLoadObserver> coprocessorList =<a name="line.323"></a> +<span class="sourceLineNo">324</span> this.env.getRegion().getCoprocessorHost().findCoprocessors(BulkLoadObserver.class);<a name="line.324"></a> +<span class="sourceLineNo">325</span><a name="line.325"></a> +<span class="sourceLineNo">326</span> return coprocessorList;<a name="line.326"></a> +<span class="sourceLineNo">327</span> }<a name="line.327"></a> +<span class="sourceLineNo">328</span><a name="line.328"></a> +<span class="sourceLineNo">329</span> private Path createStagingDir(Path baseDir,<a name="line.329"></a> +<span class="sourceLineNo">330</span> User user,<a name="line.330"></a> +<span class="sourceLineNo">331</span> TableName tableName) throws IOException {<a name="line.331"></a> +<span class="sourceLineNo">332</span> String tblName = tableName.getNameAsString().replace(":", "_");<a name="line.332"></a> +<span class="sourceLineNo">333</span> String randomDir = user.getShortName()+"__"+ tblName +"__"+<a name="line.333"></a> +<span class="sourceLineNo">334</span> (new BigInteger(RANDOM_WIDTH, random).toString(RANDOM_RADIX));<a name="line.334"></a> +<span class="sourceLineNo">335</span> return createStagingDir(baseDir, user, randomDir);<a name="line.335"></a> +<span class="sourceLineNo">336</span> }<a name="line.336"></a> +<span class="sourceLineNo">337</span><a name="line.337"></a> +<span class="sourceLineNo">338</span> private Path createStagingDir(Path baseDir,<a name="line.338"></a> +<span class="sourceLineNo">339</span> User user,<a name="line.339"></a> +<span class="sourceLineNo">340</span> String randomDir) throws IOException {<a name="line.340"></a> +<span class="sourceLineNo">341</span> Path p = new Path(baseDir, randomDir);<a name="line.341"></a> +<span class="sourceLineNo">342</span> fs.mkdirs(p, PERM_ALL_ACCESS);<a name="line.342"></a> +<span class="sourceLineNo">343</span> fs.setPermission(p, PERM_ALL_ACCESS);<a name="line.343"></a> +<span class="sourceLineNo">344</span> return p;<a name="line.344"></a> +<span class="sourceLineNo">345</span> }<a name="line.345"></a> +<span class="sourceLineNo">346</span><a name="line.346"></a> +<span class="sourceLineNo">347</span> private User getActiveUser() {<a name="line.347"></a> +<span class="sourceLineNo">348</span> User user = RpcServer.getRequestUser();<a name="line.348"></a> +<span class="sourceLineNo">349</span> if (user == null) {<a name="line.349"></a> +<span class="sourceLineNo">350</span> return null;<a name="line.350"></a> +<span class="sourceLineNo">351</span> }<a name="line.351"></a> +<span class="sourceLineNo">352</span><a name="line.352"></a> +<span class="sourceLineNo">353</span> //this is for testing<a name="line.353"></a> +<span class="sourceLineNo">354</span> if (userProvider.isHadoopSecurityEnabled()<a name="line.354"></a> +<span class="sourceLineNo">355</span> && "simple".equalsIgnoreCase(conf.get(User.HBASE_SECURITY_CONF_KEY))) {<a name="line.355"></a> +<span class="sourceLineNo">356</span> return User.createUserForTesting(conf, user.getShortName(), new String[]{});<a name="line.356"></a> +<span class="sourceLineNo">357</span> }<a name="line.357"></a> +<span class="sourceLineNo">358</span><a name="line.358"></a> +<span class="sourceLineNo">359</span> return user;<a name="line.359"></a> +<span class="sourceLineNo">360</span> }<a name="line.360"></a> +<span class="sourceLineNo">361</span><a name="line.361"></a> +<span class="sourceLineNo">362</span> @Override<a name="line.362"></a> +<span class="sourceLineNo">363</span> public Service getService() {<a name="line.363"></a> +<span class="sourceLineNo">364</span> return this;<a name="line.364"></a> +<span class="sourceLineNo">365</span> }<a name="line.365"></a> +<span class="sourceLineNo">366</span><a name="line.366"></a> +<span class="sourceLineNo">367</span> private static class SecureBulkLoadListener implements BulkLoadListener {<a name="line.367"></a> +<span class="sourceLineNo">368</span> // Target filesystem<a name="line.368"></a> +<span class="sourceLineNo">369</span> private FileSystem fs;<a name="line.369"></a> +<span class="sourceLineNo">370</span> private String stagingDir;<a name="line.370"></a> +<span class="sourceLineNo">371</span> private Configuration conf;<a name="line.371"></a> +<span class="sourceLineNo">372</span> // Source filesystem<a name="line.372"></a> +<span class="sourceLineNo">373</span> private FileSystem srcFs = null;<a name="line.373"></a> +<span class="sourceLineNo">374</span> private Map<String, FsPermission> origPermissions = null;<a name="line.374"></a> +<span class="sourceLineNo">375</span><a name="line.375"></a> +<span class="sourceLineNo">376</span> public SecureBulkLoadListener(FileSystem fs, String stagingDir, Configuration conf) {<a name="line.376"></a> +<span class="sourceLineNo">377</span> this.fs = fs;<a name="line.377"></a> +<span class="sourceLineNo">378</span> this.stagingDir = stagingDir;<a name="line.378"></a> +<span class="sourceLineNo">379</span> this.conf = conf;<a name="line.379"></a> +<span class="sourceLineNo">380</span> this.origPermissions = new HashMap<String, FsPermission>();<a name="line.380"></a> +<span class="sourceLineNo">381</span> }<a name="line.381"></a> +<span class="sourceLineNo">382</span><a name="line.382"></a> +<span class="sourceLineNo">383</span> @Override<a name="line.383"></a> +<span class="sourceLineNo">384</span> public String prepareBulkLoad(final byte[] family, final String srcPath) throws IOException {<a name="line.384"></a> +<span class="sourceLineNo">385</span> Path p = new Path(srcPath);<a name="line.385"></a> +<span class="sourceLineNo">386</span> Path stageP = new Path(stagingDir, new Path(Bytes.toString(family), p.getName()));<a name="line.386"></a> +<span class="sourceLineNo">387</span><a name="line.387"></a> +<span class="sourceLineNo">388</span> // In case of Replication for bulk load files, hfiles are already copied in staging directory<a name="line.388"></a> +<span class="sourceLineNo">389</span> if (p.equals(stageP)) {<a name="line.389"></a> +<span class="sourceLineNo">390</span> LOG.debug(p.getName()<a name="line.390"></a> +<span class="sourceLineNo">391</span> + " is already available in staging directory. Skipping copy or rename.");<a name="line.391"></a> +<span class="sourceLineNo">392</span> return stageP.toString();<a name="line.392"></a> +<span class="sourceLineNo">393</span> }<a name="line.393"></a> +<span class="sourceLineNo">394</span><a name="line.394"></a> +<span class="sourceLineNo">395</span> if (srcFs == null) {<a name="line.395"></a> +<span class="sourceLineNo">396</span> srcFs = FileSystem.get(p.toUri(), conf);<a name="line.396"></a> +<span class="sourceLineNo">397</span> }<a name="line.397"></a> +<span class="sourceLineNo">398</span><a name="line.398"></a> +<span class="sourceLineNo">399</span> if(!isFile(p)) {<a name="line.399"></a> +<span class="sourceLineNo">400</span> throw new IOException("Path does not reference a file: " + p);<a name="line.400"></a> +<span class="sourceLineNo">401</span> }<a name="line.401"></a> +<span class="sourceLineNo">402</span><a name="line.402"></a> +<span class="sourceLineNo">403</span> // Check to see if the source and target filesystems are the same<a name="line.403"></a> +<span class="sourceLineNo">404</span> if (!FSHDFSUtils.isSameHdfs(conf, srcFs, fs)) {<a name="line.404"></a> +<span class="sourceLineNo">405</span> LOG.debug("Bulk-load file " + srcPath + " is on different filesystem than " +<a name="line.405"></a> +<span class="sourceLineNo">406</span> "the destination filesystem. Copying file over to destination staging dir.");<a name="line.406"></a> +<span class="sourceLineNo">407</span> FileUtil.copy(srcFs, p, fs, stageP, false, conf);<a name="line.407"></a> +<span class="sourceLineNo">408</span> } else {<a name="line.408"></a> +<span class="sourceLineNo">409</span> LOG.debug("Moving " + p + " to " + stageP);<a name="line.409"></a> +<span class="sourceLineNo">410</span> FileStatus origFileStatus = fs.getFileStatus(p);<a name="line.410"></a> +<span class="sourceLineNo">411</span> origPermissions.put(srcPath, origFileStatus.getPermission());<a name="line.411"></a> +<span class="sourceLineNo">412</span> if(!fs.rename(p, stageP)) {<a name="line.412"></a> +<span class="sourceLineNo">413</span> throw new IOException("Failed to move HFile: " + p + " to " + stageP);<a name="line.413"></a> +<span class="sourceLineNo">414</span> }<a name="line.414"></a> +<span class="sourceLineNo">415</span> }<a name="line.415"></a> +<span class="sourceLineNo">416</span> fs.setPermission(stageP, PERM_ALL_ACCESS);<a name="line.416"></a> +<span class="sourceLineNo">417</span> return stageP.toString();<a name="line.417"></a> +<span class="sourceLineNo">418</span> }<a name="line.418"></a> +<span class="sourceLineNo">419</span><a name="line.419"></a> +<span class="sourceLineNo">420</span> @Override<a name="line.420"></a> +<span class="sourceLineNo">421</span> public void doneBulkLoad(byte[] family, String srcPath) throws IOException {<a name="line.421"></a> +<span class="sourceLineNo">422</span> LOG.debug("Bulk Load done for: " + srcPath);<a name="line.422"></a> +<span class="sourceLineNo">423</span> }<a name="line.423"></a> <span class="sourceLineNo">424</span><a name="line.424"></a> -<span class="sourceLineNo">425</span> // In case of Replication for bulk load files, hfiles are not renamed by end point during<a name="line.425"></a> -<span class="sourceLineNo">426</span> // prepare stage, so no need of rename here again<a name="line.426"></a> -<span class="sourceLineNo">427</span> if (p.equals(stageP)) {<a name="line.427"></a> -<span class="sourceLineNo">428</span> LOG.debug(p.getName() + " is already available in source directory. Skipping rename.");<a name="line.428"></a> +<span class="sourceLineNo">425</span> @Override<a name="line.425"></a> +<span class="sourceLineNo">426</span> public void failedBulkLoad(final byte[] family, final String srcPath) throws IOException {<a name="line.426"></a> +<span class="sourceLineNo">427</span> if (!FSHDFSUtils.isSameHdfs(conf, srcFs, fs)) {<a name="line.427"></a> +<span class="sourceLineNo">428</span> // files are copied so no need to move them back<a name="line.428"></a> <span class="sourceLineNo">429</span> return;<a name="line.429"></a> <span class="sourceLineNo">430</span> }<a name="line.430"></a> -<span class="sourceLineNo">431</span><a name="line.431"></a> -<span class="sourceLineNo">432</span> LOG.debug("Moving " + stageP + " back to " + p);<a name="line.432"></a> -<span class="sourceLineNo">433</span> if(!fs.rename(stageP, p))<a name="line.433"></a> -<span class="sourceLineNo">434</span> throw new IOException("Failed to move HFile: " + stageP + " to " + p);<a name="line.434"></a> -<span class="sourceLineNo">435</span><a name="line.435"></a> -<span class="sourceLineNo">436</span> // restore original permission<a name="line.436"></a> -<span class="sourceLineNo">437</span> if (origPermissions.containsKey(srcPath)) {<a name="line.437"></a> -<span class="sourceLineNo">438</span> fs.setPermission(p, origPermissions.get(srcPath));<a name="line.438"></a> -<span class="sourceLineNo">439</span> } else {<a name="line.439"></a> -<span class="sourceLineNo">440</span> LOG.warn("Can't find previous permission for path=" + srcPath);<a name="line.440"></a> -<span class="sourceLineNo">441</span> }<a name="line.441"></a> -<span class="sourceLineNo">442</span> }<a name="line.442"></a> -<span class="sourceLineNo">443</span><a name="line.443"></a> -<span class="sourceLineNo">444</span> /**<a name="line.444"></a> -<span class="sourceLineNo">445</span> * Check if the path is referencing a file.<a name="line.445"></a> -<span class="sourceLineNo">446</span> * This is mainly needed to avoid symlinks.<a name="line.446"></a> -<span class="sourceLineNo">447</span> * @param p<a name="line.447"></a> -<span class="sourceLineNo">448</span> * @return true if the p is a file<a name="line.448"></a> -<span class="sourceLineNo">449</span> * @throws IOException<a name="line.449"></a> -<span class="sourceLineNo">450</span> */<a name="line.450"></a> -<span class="sourceLineNo">451</span> private boolean isFile(Path p) throws IOException {<a name="line.451"></a> -<span class="sourceLineNo">452</span> FileStatus status = srcFs.getFileStatus(p);<a name="line.452"></a> -<span class="sourceLineNo">453</span> boolean isFile = !status.isDirectory();<a name="line.453"></a> -<span class="sourceLineNo">454</span> try {<a name="line.454"></a> -<span class="sourceLineNo">455</span> isFile = isFile && !(Boolean)Methods.call(FileStatus.class, status, "isSymlink", null, null);<a name="line.455"></a> -<span class="sourceLineNo">456</span> } catch (Exception e) {<a name="line.456"></a> -<span class="sourceLineNo">457</span> }<a name="line.457"></a> -<span class="sourceLineNo">458</span> return isFile;<a name="line.458"></a> -<span class="sourceLineNo">459</span> }<a name="line.459"></a> -<span class="sourceLineNo">460</span> }<a name="line.460"></a> -<span class="sourceLineNo">461</span>}<a name="line.461"></a> +<span class="sourceLineNo">431</span> Path p = new Path(srcPath);<a name="line.431"></a> +<span class="sourceLineNo">432</span> Path stageP = new Path(stagingDir,<a name="line.432"></a> +<span class="sourceLineNo">433</span> new Path(Bytes.toString(family), p.getName()));<a name="line.433"></a> +<span class="sourceLineNo">434</span><a name="line.434"></a> +<span class="sourceLineNo">435</span> // In case of Replication for bulk load files, hfiles are not renamed by end point during<a name="line.435"></a> +<span class="sourceLineNo">436</span> // prepare stage, so no need of rename here again<a name="line.436"></a> +<span class="sourceLineNo">437</span> if (p.equals(stageP)) {<a name="line.437"></a> +<span class="sourceLineNo">438</span> LOG.debug(p.getName() + " is already available in source directory. Skipping rename.");<a name="line.438"></a> +<span class="sourceLineNo">439</span> return;<a name="line.439"></a> +<span class="sourceLineNo">440</span> }<a name="line.440"></a> +<span class="sourceLineNo">441</span><a name="line.441"></a> +<span class="sourceLineNo">442</span> LOG.debug("Moving " + stageP + " back to " + p);<a name="line.442"></a> +<span class="sourceLineNo">443</span> if(!fs.rename(stageP, p))<a name="line.443"></a> +<span class="sourceLineNo">444</span> throw new IOException("Failed to move HFile: " + stageP + " to " + p);<a name="line.444"></a> +<span class="sourceLineNo">445</span><a name="line.445"></a> +<span class="sourceLineNo">446</span> // restore original permission<a name="line.446"></a> +<span class="sourceLineNo">447</span> if (origPermissions.containsKey(srcPath)) {<a name="line.447"></a> +<span class="sourceLineNo">448</span> fs.setPermission(p, origPermissions.get(srcPath));<a name="line.448"></a> +<span class="sourceLineNo">449</span> } else {<a name="line.449"></a> +<span class="sourceLineNo">450</span> LOG.warn("Can't find previous permission for path=" + srcPath);<a name="line.450"></a> +<span class="sourceLineNo">451</span> }<a name="line.451"></a> +<span class="sourceLineNo">452</span> }<a name="line.452"></a> +<span class="sourceLineNo">453</span><a name="line.453"></a> +<span class="sourceLineNo">454</span> /**<a name="line.454"></a> +<span class="sourceLineNo">455</span> * Check if the path is referencing a file.<a name="line.455"></a> +<span class="sourceLineNo">456</span> * This is mainly needed to avoid symlinks.<a name="line.456"></a> +<span class="sourceLineNo">457</span> * @param p<a name="line.457"></a> +<span class="sourceLineNo">458</span> * @return true if the p is a file<a name="line.458"></a> +<span class="sourceLineNo">459</span> * @throws IOException<a name="line.459"></a> +<span class="sourceLineNo">460</span> */<a name="line.460"></a> +<span class="sourceLineNo">461</span> private boolean isFile(Path p) throws IOException {<a name="line.461"></a> +<span class="sourceLineNo">462</span> FileStatus status = srcFs.getFileStatus(p);<a name="line.462"></a> +<span class="sourceLineNo">463</span> boolean isFile = !status.isDirectory();<a name="line.463"></a> +<span class="sourceLineNo">464</span> try {<a name="line.464"></a> +<span class="sourceLineNo">465</span> isFile = isFile && !(Boolean)Methods.call(FileStatus.class, status, "isSymlink", null, null);<a name="line.465"></a> +<span class="sourceLineNo">466</span> } catch (Exception e) {<a name="line.466"></a> +<span class="sourceLineNo">467</span> }<a name="line.467"></a> +<span class="sourceLineNo">468</span> return isFile;<a name="line.468"></a> +<span class="sourceLineNo">469</span> }<a name="line.469"></a> +<span class="sourceLineNo">470</span> }<a name="line.470"></a> +<span class="sourceLineNo">471</span>}<a name="line.471"></a>
http://git-wip-us.apache.org/repos/asf/hbase-site/blob/d02dd5db/devapidocs/src-html/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.html ---------------------------------------------------------------------- diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.html b/devapidocs/src-html/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.html index 03578e8..d0873aa 100644 --- a/devapidocs/src-html/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.html +++ b/devapidocs/src-html/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.html @@ -300,173 +300,183 @@ <span class="sourceLineNo">292</span> new SecureBulkLoadListener(fs, bulkToken, conf));<a name="line.292"></a> <span class="sourceLineNo">293</span> } catch (Exception e) {<a name="line.293"></a> <span class="sourceLineNo">294</span> LOG.error("Failed to complete bulk load", e);<a name="line.294"></a> -<span class="sourceLineNo">295</span> }<a name="line.295"></a> -<span class="sourceLineNo">296</span> return false;<a name="line.296"></a> -<span class="sourceLineNo">297</span> }<a name="line.297"></a> -<span class="sourceLineNo">298</span> });<a name="line.298"></a> -<span class="sourceLineNo">299</span> }<a name="line.299"></a> -<span class="sourceLineNo">300</span> if (region.getCoprocessorHost() != null) {<a name="line.300"></a> -<span class="sourceLineNo">301</span> try {<a name="line.301"></a> -<span class="sourceLineNo">302</span> loaded = region.getCoprocessorHost().postBulkLoadHFile(familyPaths, loaded);<a name="line.302"></a> -<span class="sourceLineNo">303</span> } catch (IOException e) {<a name="line.303"></a> -<span class="sourceLineNo">304</span> ResponseConverter.setControllerException(controller, e);<a name="line.304"></a> -<span class="sourceLineNo">305</span> done.run(SecureBulkLoadHFilesResponse.newBuilder().setLoaded(false).build());<a name="line.305"></a> -<span class="sourceLineNo">306</span> return;<a name="line.306"></a> -<span class="sourceLineNo">307</span> }<a name="line.307"></a> -<span class="sourceLineNo">308</span> }<a name="line.308"></a> -<span class="sourceLineNo">309</span> done.run(SecureBulkLoadHFilesResponse.newBuilder().setLoaded(loaded).build());<a name="line.309"></a> -<span class="sourceLineNo">310</span> }<a name="line.310"></a> -<span class="sourceLineNo">311</span><a name="line.311"></a> -<span class="sourceLineNo">312</span> private List<BulkLoadObserver> getBulkLoadObservers() {<a name="line.312"></a> -<span class="sourceLineNo">313</span> List<BulkLoadObserver> coprocessorList =<a name="line.313"></a> -<span class="sourceLineNo">314</span> this.env.getRegion().getCoprocessorHost().findCoprocessors(BulkLoadObserver.class);<a name="line.314"></a> -<span class="sourceLineNo">315</span><a name="line.315"></a> -<span class="sourceLineNo">316</span> return coprocessorList;<a name="line.316"></a> -<span class="sourceLineNo">317</span> }<a name="line.317"></a> -<span class="sourceLineNo">318</span><a name="line.318"></a> -<span class="sourceLineNo">319</span> private Path createStagingDir(Path baseDir,<a name="line.319"></a> -<span class="sourceLineNo">320</span> User user,<a name="line.320"></a> -<span class="sourceLineNo">321</span> TableName tableName) throws IOException {<a name="line.321"></a> -<span class="sourceLineNo">322</span> String tblName = tableName.getNameAsString().replace(":", "_");<a name="line.322"></a> -<span class="sourceLineNo">323</span> String randomDir = user.getShortName()+"__"+ tblName +"__"+<a name="line.323"></a> -<span class="sourceLineNo">324</span> (new BigInteger(RANDOM_WIDTH, random).toString(RANDOM_RADIX));<a name="line.324"></a> -<span class="sourceLineNo">325</span> return createStagingDir(baseDir, user, randomDir);<a name="line.325"></a> -<span class="sourceLineNo">326</span> }<a name="line.326"></a> -<span class="sourceLineNo">327</span><a name="line.327"></a> -<span class="sourceLineNo">328</span> private Path createStagingDir(Path baseDir,<a name="line.328"></a> -<span class="sourceLineNo">329</span> User user,<a name="line.329"></a> -<span class="sourceLineNo">330</span> String randomDir) throws IOException {<a name="line.330"></a> -<span class="sourceLineNo">331</span> Path p = new Path(baseDir, randomDir);<a name="line.331"></a> -<span class="sourceLineNo">332</span> fs.mkdirs(p, PERM_ALL_ACCESS);<a name="line.332"></a> -<span class="sourceLineNo">333</span> fs.setPermission(p, PERM_ALL_ACCESS);<a name="line.333"></a> -<span class="sourceLineNo">334</span> return p;<a name="line.334"></a> -<span class="sourceLineNo">335</span> }<a name="line.335"></a> -<span class="sourceLineNo">336</span><a name="line.336"></a> -<span class="sourceLineNo">337</span> private User getActiveUser() {<a name="line.337"></a> -<span class="sourceLineNo">338</span> User user = RpcServer.getRequestUser();<a name="line.338"></a> -<span class="sourceLineNo">339</span> if (user == null) {<a name="line.339"></a> -<span class="sourceLineNo">340</span> return null;<a name="line.340"></a> -<span class="sourceLineNo">341</span> }<a name="line.341"></a> -<span class="sourceLineNo">342</span><a name="line.342"></a> -<span class="sourceLineNo">343</span> //this is for testing<a name="line.343"></a> -<span class="sourceLineNo">344</span> if (userProvider.isHadoopSecurityEnabled()<a name="line.344"></a> -<span class="sourceLineNo">345</span> && "simple".equalsIgnoreCase(conf.get(User.HBASE_SECURITY_CONF_KEY))) {<a name="line.345"></a> -<span class="sourceLineNo">346</span> return User.createUserForTesting(conf, user.getShortName(), new String[]{});<a name="line.346"></a> -<span class="sourceLineNo">347</span> }<a name="line.347"></a> -<span class="sourceLineNo">348</span><a name="line.348"></a> -<span class="sourceLineNo">349</span> return user;<a name="line.349"></a> -<span class="sourceLineNo">350</span> }<a name="line.350"></a> -<span class="sourceLineNo">351</span><a name="line.351"></a> -<span class="sourceLineNo">352</span> @Override<a name="line.352"></a> -<span class="sourceLineNo">353</span> public Service getService() {<a name="line.353"></a> -<span class="sourceLineNo">354</span> return this;<a name="line.354"></a> -<span class="sourceLineNo">355</span> }<a name="line.355"></a> -<span class="sourceLineNo">356</span><a name="line.356"></a> -<span class="sourceLineNo">357</span> private static class SecureBulkLoadListener implements BulkLoadListener {<a name="line.357"></a> -<span class="sourceLineNo">358</span> // Target filesystem<a name="line.358"></a> -<span class="sourceLineNo">359</span> private FileSystem fs;<a name="line.359"></a> -<span class="sourceLineNo">360</span> private String stagingDir;<a name="line.360"></a> -<span class="sourceLineNo">361</span> private Configuration conf;<a name="line.361"></a> -<span class="sourceLineNo">362</span> // Source filesystem<a name="line.362"></a> -<span class="sourceLineNo">363</span> private FileSystem srcFs = null;<a name="line.363"></a> -<span class="sourceLineNo">364</span> private Map<String, FsPermission> origPermissions = null;<a name="line.364"></a> -<span class="sourceLineNo">365</span><a name="line.365"></a> -<span class="sourceLineNo">366</span> public SecureBulkLoadListener(FileSystem fs, String stagingDir, Configuration conf) {<a name="line.366"></a> -<span class="sourceLineNo">367</span> this.fs = fs;<a name="line.367"></a> -<span class="sourceLineNo">368</span> this.stagingDir = stagingDir;<a name="line.368"></a> -<span class="sourceLineNo">369</span> this.conf = conf;<a name="line.369"></a> -<span class="sourceLineNo">370</span> this.origPermissions = new HashMap<String, FsPermission>();<a name="line.370"></a> -<span class="sourceLineNo">371</span> }<a name="line.371"></a> -<span class="sourceLineNo">372</span><a name="line.372"></a> -<span class="sourceLineNo">373</span> @Override<a name="line.373"></a> -<span class="sourceLineNo">374</span> public String prepareBulkLoad(final byte[] family, final String srcPath) throws IOException {<a name="line.374"></a> -<span class="sourceLineNo">375</span> Path p = new Path(srcPath);<a name="line.375"></a> -<span class="sourceLineNo">376</span> Path stageP = new Path(stagingDir, new Path(Bytes.toString(family), p.getName()));<a name="line.376"></a> -<span class="sourceLineNo">377</span><a name="line.377"></a> -<span class="sourceLineNo">378</span> // In case of Replication for bulk load files, hfiles are already copied in staging directory<a name="line.378"></a> -<span class="sourceLineNo">379</span> if (p.equals(stageP)) {<a name="line.379"></a> -<span class="sourceLineNo">380</span> LOG.debug(p.getName()<a name="line.380"></a> -<span class="sourceLineNo">381</span> + " is already available in staging directory. Skipping copy or rename.");<a name="line.381"></a> -<span class="sourceLineNo">382</span> return stageP.toString();<a name="line.382"></a> -<span class="sourceLineNo">383</span> }<a name="line.383"></a> -<span class="sourceLineNo">384</span><a name="line.384"></a> -<span class="sourceLineNo">385</span> if (srcFs == null) {<a name="line.385"></a> -<span class="sourceLineNo">386</span> srcFs = FileSystem.get(p.toUri(), conf);<a name="line.386"></a> -<span class="sourceLineNo">387</span> }<a name="line.387"></a> -<span class="sourceLineNo">388</span><a name="line.388"></a> -<span class="sourceLineNo">389</span> if(!isFile(p)) {<a name="line.389"></a> -<span class="sourceLineNo">390</span> throw new IOException("Path does not reference a file: " + p);<a name="line.390"></a> -<span class="sourceLineNo">391</span> }<a name="line.391"></a> -<span class="sourceLineNo">392</span><a name="line.392"></a> -<span class="sourceLineNo">393</span> // Check to see if the source and target filesystems are the same<a name="line.393"></a> -<span class="sourceLineNo">394</span> if (!FSHDFSUtils.isSameHdfs(conf, srcFs, fs)) {<a name="line.394"></a> -<span class="sourceLineNo">395</span> LOG.debug("Bulk-load file " + srcPath + " is on different filesystem than " +<a name="line.395"></a> -<span class="sourceLineNo">396</span> "the destination filesystem. Copying file over to destination staging dir.");<a name="line.396"></a> -<span class="sourceLineNo">397</span> FileUtil.copy(srcFs, p, fs, stageP, false, conf);<a name="line.397"></a> -<span class="sourceLineNo">398</span> } else {<a name="line.398"></a> -<span class="sourceLineNo">399</span> LOG.debug("Moving " + p + " to " + stageP);<a name="line.399"></a> -<span class="sourceLineNo">400</span> FileStatus origFileStatus = fs.getFileStatus(p);<a name="line.400"></a> -<span class="sourceLineNo">401</span> origPermissions.put(srcPath, origFileStatus.getPermission());<a name="line.401"></a> -<span class="sourceLineNo">402</span> if(!fs.rename(p, stageP)) {<a name="line.402"></a> -<span class="sourceLineNo">403</span> throw new IOException("Failed to move HFile: " + p + " to " + stageP);<a name="line.403"></a> -<span class="sourceLineNo">404</span> }<a name="line.404"></a> -<span class="sourceLineNo">405</span> }<a name="line.405"></a> -<span class="sourceLineNo">406</span> fs.setPermission(stageP, PERM_ALL_ACCESS);<a name="line.406"></a> -<span class="sourceLineNo">407</span> return stageP.toString();<a name="line.407"></a> -<span class="sourceLineNo">408</span> }<a name="line.408"></a> -<span class="sourceLineNo">409</span><a name="line.409"></a> -<span class="sourceLineNo">410</span> @Override<a name="line.410"></a> -<span class="sourceLineNo">411</span> public void doneBulkLoad(byte[] family, String srcPath) throws IOException {<a name="line.411"></a> -<span class="sourceLineNo">412</span> LOG.debug("Bulk Load done for: " + srcPath);<a name="line.412"></a> -<span class="sourceLineNo">413</span> }<a name="line.413"></a> -<span class="sourceLineNo">414</span><a name="line.414"></a> -<span class="sourceLineNo">415</span> @Override<a name="line.415"></a> -<span class="sourceLineNo">416</span> public void failedBulkLoad(final byte[] family, final String srcPath) throws IOException {<a name="line.416"></a> -<span class="sourceLineNo">417</span> if (!FSHDFSUtils.isSameHdfs(conf, srcFs, fs)) {<a name="line.417"></a> -<span class="sourceLineNo">418</span> // files are copied so no need to move them back<a name="line.418"></a> -<span class="sourceLineNo">419</span> return;<a name="line.419"></a> -<span class="sourceLineNo">420</span> }<a name="line.420"></a> -<span class="sourceLineNo">421</span> Path p = new Path(srcPath);<a name="line.421"></a> -<span class="sourceLineNo">422</span> Path stageP = new Path(stagingDir,<a name="line.422"></a> -<span class="sourceLineNo">423</span> new Path(Bytes.toString(family), p.getName()));<a name="line.423"></a> +<span class="sourceLineNo">295</span> } finally {<a name="line.295"></a> +<span class="sourceLineNo">296</span> if (fs != null) {<a name="line.296"></a> +<span class="sourceLineNo">297</span> try {<a name="line.297"></a> +<span class="sourceLineNo">298</span> if(!UserGroupInformation.getCurrentUser().equals(ugi)) {<a name="line.298"></a> +<span class="sourceLineNo">299</span> FileSystem.closeAllForUGI(ugi);<a name="line.299"></a> +<span class="sourceLineNo">300</span> }<a name="line.300"></a> +<span class="sourceLineNo">301</span> } catch (IOException e) {<a name="line.301"></a> +<span class="sourceLineNo">302</span> LOG.error("Failed to close FileSystem for " + ugi.getUserName(), e);<a name="line.302"></a> +<span class="sourceLineNo">303</span> }<a name="line.303"></a> +<span class="sourceLineNo">304</span> }<a name="line.304"></a> +<span class="sourceLineNo">305</span> }<a name="line.305"></a> +<span class="sourceLineNo">306</span> return false;<a name="line.306"></a> +<span class="sourceLineNo">307</span> }<a name="line.307"></a> +<span class="sourceLineNo">308</span> });<a name="line.308"></a> +<span class="sourceLineNo">309</span> }<a name="line.309"></a> +<span class="sourceLineNo">310</span> if (region.getCoprocessorHost() != null) {<a name="line.310"></a> +<span class="sourceLineNo">311</span> try {<a name="line.311"></a> +<span class="sourceLineNo">312</span> loaded = region.getCoprocessorHost().postBulkLoadHFile(familyPaths, loaded);<a name="line.312"></a> +<span class="sourceLineNo">313</span> } catch (IOException e) {<a name="line.313"></a> +<span class="sourceLineNo">314</span> ResponseConverter.setControllerException(controller, e);<a name="line.314"></a> +<span class="sourceLineNo">315</span> done.run(SecureBulkLoadHFilesResponse.newBuilder().setLoaded(false).build());<a name="line.315"></a> +<span class="sourceLineNo">316</span> return;<a name="line.316"></a> +<span class="sourceLineNo">317</span> }<a name="line.317"></a> +<span class="sourceLineNo">318</span> }<a name="line.318"></a> +<span class="sourceLineNo">319</span> done.run(SecureBulkLoadHFilesResponse.newBuilder().setLoaded(loaded).build());<a name="line.319"></a> +<span class="sourceLineNo">320</span> }<a name="line.320"></a> +<span class="sourceLineNo">321</span><a name="line.321"></a> +<span class="sourceLineNo">322</span> private List<BulkLoadObserver> getBulkLoadObservers() {<a name="line.322"></a> +<span class="sourceLineNo">323</span> List<BulkLoadObserver> coprocessorList =<a name="line.323"></a> +<span class="sourceLineNo">324</span> this.env.getRegion().getCoprocessorHost().findCoprocessors(BulkLoadObserver.class);<a name="line.324"></a> +<span class="sourceLineNo">325</span><a name="line.325"></a> +<span class="sourceLineNo">326</span> return coprocessorList;<a name="line.326"></a> +<span class="sourceLineNo">327</span> }<a name="line.327"></a> +<span class="sourceLineNo">328</span><a name="line.328"></a> +<span class="sourceLineNo">329</span> private Path createStagingDir(Path baseDir,<a name="line.329"></a> +<span class="sourceLineNo">330</span> User user,<a name="line.330"></a> +<span class="sourceLineNo">331</span> TableName tableName) throws IOException {<a name="line.331"></a> +<span class="sourceLineNo">332</span> String tblName = tableName.getNameAsString().replace(":", "_");<a name="line.332"></a> +<span class="sourceLineNo">333</span> String randomDir = user.getShortName()+"__"+ tblName +"__"+<a name="line.333"></a> +<span class="sourceLineNo">334</span> (new BigInteger(RANDOM_WIDTH, random).toString(RANDOM_RADIX));<a name="line.334"></a> +<span class="sourceLineNo">335</span> return createStagingDir(baseDir, user, randomDir);<a name="line.335"></a> +<span class="sourceLineNo">336</span> }<a name="line.336"></a> +<span class="sourceLineNo">337</span><a name="line.337"></a> +<span class="sourceLineNo">338</span> private Path createStagingDir(Path baseDir,<a name="line.338"></a> +<span class="sourceLineNo">339</span> User user,<a name="line.339"></a> +<span class="sourceLineNo">340</span> String randomDir) throws IOException {<a name="line.340"></a> +<span class="sourceLineNo">341</span> Path p = new Path(baseDir, randomDir);<a name="line.341"></a> +<span class="sourceLineNo">342</span> fs.mkdirs(p, PERM_ALL_ACCESS);<a name="line.342"></a> +<span class="sourceLineNo">343</span> fs.setPermission(p, PERM_ALL_ACCESS);<a name="line.343"></a> +<span class="sourceLineNo">344</span> return p;<a name="line.344"></a> +<span class="sourceLineNo">345</span> }<a name="line.345"></a> +<span class="sourceLineNo">346</span><a name="line.346"></a> +<span class="sourceLineNo">347</span> private User getActiveUser() {<a name="line.347"></a> +<span class="sourceLineNo">348</span> User user = RpcServer.getRequestUser();<a name="line.348"></a> +<span class="sourceLineNo">349</span> if (user == null) {<a name="line.349"></a> +<span class="sourceLineNo">350</span> return null;<a name="line.350"></a> +<span class="sourceLineNo">351</span> }<a name="line.351"></a> +<span class="sourceLineNo">352</span><a name="line.352"></a> +<span class="sourceLineNo">353</span> //this is for testing<a name="line.353"></a> +<span class="sourceLineNo">354</span> if (userProvider.isHadoopSecurityEnabled()<a name="line.354"></a> +<span class="sourceLineNo">355</span> && "simple".equalsIgnoreCase(conf.get(User.HBASE_SECURITY_CONF_KEY))) {<a name="line.355"></a> +<span class="sourceLineNo">356</span> return User.createUserForTesting(conf, user.getShortName(), new String[]{});<a name="line.356"></a> +<span class="sourceLineNo">357</span> }<a name="line.357"></a> +<span class="sourceLineNo">358</span><a name="line.358"></a> +<span class="sourceLineNo">359</span> return user;<a name="line.359"></a> +<span class="sourceLineNo">360</span> }<a name="line.360"></a> +<span class="sourceLineNo">361</span><a name="line.361"></a> +<span class="sourceLineNo">362</span> @Override<a name="line.362"></a> +<span class="sourceLineNo">363</span> public Service getService() {<a name="line.363"></a> +<span class="sourceLineNo">364</span> return this;<a name="line.364"></a> +<span class="sourceLineNo">365</span> }<a name="line.365"></a> +<span class="sourceLineNo">366</span><a name="line.366"></a> +<span class="sourceLineNo">367</span> private static class SecureBulkLoadListener implements BulkLoadListener {<a name="line.367"></a> +<span class="sourceLineNo">368</span> // Target filesystem<a name="line.368"></a> +<span class="sourceLineNo">369</span> private FileSystem fs;<a name="line.369"></a> +<span class="sourceLineNo">370</span> private String stagingDir;<a name="line.370"></a> +<span class="sourceLineNo">371</span> private Configuration conf;<a name="line.371"></a> +<span class="sourceLineNo">372</span> // Source filesystem<a name="line.372"></a> +<span class="sourceLineNo">373</span> private FileSystem srcFs = null;<a name="line.373"></a> +<span class="sourceLineNo">374</span> private Map<String, FsPermission> origPermissions = null;<a name="line.374"></a> +<span class="sourceLineNo">375</span><a name="line.375"></a> +<span class="sourceLineNo">376</span> public SecureBulkLoadListener(FileSystem fs, String stagingDir, Configuration conf) {<a name="line.376"></a> +<span class="sourceLineNo">377</span> this.fs = fs;<a name="line.377"></a> +<span class="sourceLineNo">378</span> this.stagingDir = stagingDir;<a name="line.378"></a> +<span class="sourceLineNo">379</span> this.conf = conf;<a name="line.379"></a> +<span class="sourceLineNo">380</span> this.origPermissions = new HashMap<String, FsPermission>();<a name="line.380"></a> +<span class="sourceLineNo">381</span> }<a name="line.381"></a> +<span class="sourceLineNo">382</span><a name="line.382"></a> +<span class="sourceLineNo">383</span> @Override<a name="line.383"></a> +<span class="sourceLineNo">384</span> public String prepareBulkLoad(final byte[] family, final String srcPath) throws IOException {<a name="line.384"></a> +<span class="sourceLineNo">385</span> Path p = new Path(srcPath);<a name="line.385"></a> +<span class="sourceLineNo">386</span> Path stageP = new Path(stagingDir, new Path(Bytes.toString(family), p.getName()));<a name="line.386"></a> +<span class="sourceLineNo">387</span><a name="line.387"></a> +<span class="sourceLineNo">388</span> // In case of Replication for bulk load files, hfiles are already copied in staging directory<a name="line.388"></a> +<span class="sourceLineNo">389</span> if (p.equals(stageP)) {<a name="line.389"></a> +<span class="sourceLineNo">390</span> LOG.debug(p.getName()<a name="line.390"></a> +<span class="sourceLineNo">391</span> + " is already available in staging directory. Skipping copy or rename.");<a name="line.391"></a> +<span class="sourceLineNo">392</span> return stageP.toString();<a name="line.392"></a> +<span class="sourceLineNo">393</span> }<a name="line.393"></a> +<span class="sourceLineNo">394</span><a name="line.394"></a> +<span class="sourceLineNo">395</span> if (srcFs == null) {<a name="line.395"></a> +<span class="sourceLineNo">396</span> srcFs = FileSystem.get(p.toUri(), conf);<a name="line.396"></a> +<span class="sourceLineNo">397</span> }<a name="line.397"></a> +<span class="sourceLineNo">398</span><a name="line.398"></a> +<span class="sourceLineNo">399</span> if(!isFile(p)) {<a name="line.399"></a> +<span class="sourceLineNo">400</span> throw new IOException("Path does not reference a file: " + p);<a name="line.400"></a> +<span class="sourceLineNo">401</span> }<a name="line.401"></a> +<span class="sourceLineNo">402</span><a name="line.402"></a> +<span class="sourceLineNo">403</span> // Check to see if the source and target filesystems are the same<a name="line.403"></a> +<span class="sourceLineNo">404</span> if (!FSHDFSUtils.isSameHdfs(conf, srcFs, fs)) {<a name="line.404"></a> +<span class="sourceLineNo">405</span> LOG.debug("Bulk-load file " + srcPath + " is on different filesystem than " +<a name="line.405"></a> +<span class="sourceLineNo">406</span> "the destination filesystem. Copying file over to destination staging dir.");<a name="line.406"></a> +<span class="sourceLineNo">407</span> FileUtil.copy(srcFs, p, fs, stageP, false, conf);<a name="line.407"></a> +<span class="sourceLineNo">408</span> } else {<a name="line.408"></a> +<span class="sourceLineNo">409</span> LOG.debug("Moving " + p + " to " + stageP);<a name="line.409"></a> +<span class="sourceLineNo">410</span> FileStatus origFileStatus = fs.getFileStatus(p);<a name="line.410"></a> +<span class="sourceLineNo">411</span> origPermissions.put(srcPath, origFileStatus.getPermission());<a name="line.411"></a> +<span class="sourceLineNo">412</span> if(!fs.rename(p, stageP)) {<a name="line.412"></a> +<span class="sourceLineNo">413</span> throw new IOException("Failed to move HFile: " + p + " to " + stageP);<a name="line.413"></a> +<span class="sourceLineNo">414</span> }<a name="line.414"></a> +<span class="sourceLineNo">415</span> }<a name="line.415"></a> +<span class="sourceLineNo">416</span> fs.setPermission(stageP, PERM_ALL_ACCESS);<a name="line.416"></a> +<span class="sourceLineNo">417</span> return stageP.toString();<a name="line.417"></a> +<span class="sourceLineNo">418</span> }<a name="line.418"></a> +<span class="sourceLineNo">419</span><a name="line.419"></a> +<span class="sourceLineNo">420</span> @Override<a name="line.420"></a> +<span class="sourceLineNo">421</span> public void doneBulkLoad(byte[] family, String srcPath) throws IOException {<a name="line.421"></a> +<span class="sourceLineNo">422</span> LOG.debug("Bulk Load done for: " + srcPath);<a name="line.422"></a> +<span class="sourceLineNo">423</span> }<a name="line.423"></a> <span class="sourceLineNo">424</span><a name="line.424"></a> -<span class="sourceLineNo">425</span> // In case of Replication for bulk load files, hfiles are not renamed by end point during<a name="line.425"></a> -<span class="sourceLineNo">426</span> // prepare stage, so no need of rename here again<a name="line.426"></a> -<span class="sourceLineNo">427</span> if (p.equals(stageP)) {<a name="line.427"></a> -<span class="sourceLineNo">428</span> LOG.debug(p.getName() + " is already available in source directory. Skipping rename.");<a name="line.428"></a> +<span class="sourceLineNo">425</span> @Override<a name="line.425"></a> +<span class="sourceLineNo">426</span> public void failedBulkLoad(final byte[] family, final String srcPath) throws IOException {<a name="line.426"></a> +<span class="sourceLineNo">427</span> if (!FSHDFSUtils.isSameHdfs(conf, srcFs, fs)) {<a name="line.427"></a> +<span class="sourceLineNo">428</span> // files are copied so no need to move them back<a name="line.428"></a> <span class="sourceLineNo">429</span> return;<a name="line.429"></a> <span class="sourceLineNo">430</span> }<a name="line.430"></a> -<span class="sourceLineNo">431</span><a name="line.431"></a> -<span class="sourceLineNo">432</span> LOG.debug("Moving " + stageP + " back to " + p);<a name="line.432"></a> -<span class="sourceLineNo">433</span> if(!fs.rename(stageP, p))<a name="line.433"></a> -<span class="sourceLineNo">434</span> throw new IOException("Failed to move HFile: " + stageP + " to " + p);<a name="line.434"></a> -<span class="sourceLineNo">435</span><a name="line.435"></a> -<span class="sourceLineNo">436</span> // restore original permission<a name="line.436"></a> -<span class="sourceLineNo">437</span> if (origPermissions.containsKey(srcPath)) {<a name="line.437"></a> -<span class="sourceLineNo">438</span> fs.setPermission(p, origPermissions.get(srcPath));<a name="line.438"></a> -<span class="sourceLineNo">439</span> } else {<a name="line.439"></a> -<span class="sourceLineNo">440</span> LOG.warn("Can't find previous permission for path=" + srcPath);<a name="line.440"></a> -<span class="sourceLineNo">441</span> }<a name="line.441"></a> -<span class="sourceLineNo">442</span> }<a name="line.442"></a> -<span class="sourceLineNo">443</span><a name="line.443"></a> -<span class="sourceLineNo">444</span> /**<a name="line.444"></a> -<span class="sourceLineNo">445</span> * Check if the path is referencing a file.<a name="line.445"></a> -<span class="sourceLineNo">446</span> * This is mainly needed to avoid symlinks.<a name="line.446"></a> -<span class="sourceLineNo">447</span> * @param p<a name="line.447"></a> -<span class="sourceLineNo">448</span> * @return true if the p is a file<a name="line.448"></a> -<span class="sourceLineNo">449</span> * @throws IOException<a name="line.449"></a> -<span class="sourceLineNo">450</span> */<a name="line.450"></a> -<span class="sourceLineNo">451</span> private boolean isFile(Path p) throws IOException {<a name="line.451"></a> -<span class="sourceLineNo">452</span> FileStatus status = srcFs.getFileStatus(p);<a name="line.452"></a> -<span class="sourceLineNo">453</span> boolean isFile = !status.isDirectory();<a name="line.453"></a> -<span class="sourceLineNo">454</span> try {<a name="line.454"></a> -<span class="sourceLineNo">455</span> isFile = isFile && !(Boolean)Methods.call(FileStatus.class, status, "isSymlink", null, null);<a name="line.455"></a> -<span class="sourceLineNo">456</span> } catch (Exception e) {<a name="line.456"></a> -<span class="sourceLineNo">457</span> }<a name="line.457"></a> -<span class="sourceLineNo">458</span> return isFile;<a name="line.458"></a> -<span class="sourceLineNo">459</span> }<a name="line.459"></a> -<span class="sourceLineNo">460</span> }<a name="line.460"></a> -<span class="sourceLineNo">461</span>}<a name="line.461"></a> +<span class="sourceLineNo">431</span> Path p = new Path(srcPath);<a name="line.431"></a> +<span class="sourceLineNo">432</span> Path stageP = new Path(stagingDir,<a name="line.432"></a> +<span class="sourceLineNo">433</span> new Path(Bytes.toString(family), p.getName()));<a name="line.433"></a> +<span class="sourceLineNo">434</span><a name="line.434"></a> +<span class="sourceLineNo">435</span> // In case of Replication for bulk load files, hfiles are not renamed by end point during<a name="line.435"></a> +<span class="sourceLineNo">436</span> // prepare stage, so no need of rename here again<a name="line.436"></a> +<span class="sourceLineNo">437</span> if (p.equals(stageP)) {<a name="line.437"></a> +<span class="sourceLineNo">438</span> LOG.debug(p.getName() + " is already available in source directory. Skipping rename.");<a name="line.438"></a> +<span class="sourceLineNo">439</span> return;<a name="line.439"></a> +<span class="sourceLineNo">440</span> }<a name="line.440"></a> +<span class="sourceLineNo">441</span><a name="line.441"></a> +<span class="sourceLineNo">442</span> LOG.debug("Moving " + stageP + " back to " + p);<a name="line.442"></a> +<span class="sourceLineNo">443</span> if(!fs.rename(stageP, p))<a name="line.443"></a> +<span class="sourceLineNo">444</span> throw new IOException("Failed to move HFile: " + stageP + " to " + p);<a name="line.444"></a> +<span class="sourceLineNo">445</span><a name="line.445"></a> +<span class="sourceLineNo">446</span> // restore original permission<a name="line.446"></a> +<span class="sourceLineNo">447</span> if (origPermissions.containsKey(srcPath)) {<a name="line.447"></a> +<span class="sourceLineNo">448</span> fs.setPermission(p, origPermissions.get(srcPath));<a name="line.448"></a> +<span class="sourceLineNo">449</span> } else {<a name="line.449"></a> +<span class="sourceLineNo">450</span> LOG.warn("Can't find previous permission for path=" + srcPath);<a name="line.450"></a> +<span class="sourceLineNo">451</span> }<a name="line.451"></a> +<span class="sourceLineNo">452</span> }<a name="line.452"></a> +<span class="sourceLineNo">453</span><a name="line.453"></a> +<span class="sourceLineNo">454</span> /**<a name="line.454"></a> +<span class="sourceLineNo">455</span> * Check if the path is referencing a file.<a name="line.455"></a> +<span class="sourceLineNo">456</span> * This is mainly needed to avoid symlinks.<a name="line.456"></a> +<span class="sourceLineNo">457</span> * @param p<a name="line.457"></a> +<span class="sourceLineNo">458</span> * @return true if the p is a file<a name="line.458"></a> +<span class="sourceLineNo">459</span> * @throws IOException<a name="line.459"></a> +<span class="sourceLineNo">460</span> */<a name="line.460"></a> +<span class="sourceLineNo">461</span> private boolean isFile(Path p) throws IOException {<a name="line.461"></a> +<span class="sourceLineNo">462</span> FileStatus status = srcFs.getFileStatus(p);<a name="line.462"></a> +<span class="sourceLineNo">463</span> boolean isFile = !status.isDirectory();<a name="line.463"></a> +<span class="sourceLineNo">464</span> try {<a name="line.464"></a> +<span class="sourceLineNo">465</span> isFile = isFile && !(Boolean)Methods.call(FileStatus.class, status, "isSymlink", null, null);<a name="line.465"></a> +<span class="sourceLineNo">466</span> } catch (Exception e) {<a name="line.466"></a> +<span class="sourceLineNo">467</span> }<a name="line.467"></a> +<span class="sourceLineNo">468</span> return isFile;<a name="line.468"></a> +<span class="sourceLineNo">469</span> }<a name="line.469"></a> +<span class="sourceLineNo">470</span> }<a name="line.470"></a> +<span class="sourceLineNo">471</span>}<a name="line.471"></a>