http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/pig/accumulo.pig/pom.xml ---------------------------------------------------------------------- diff --git a/pig/accumulo.pig/pom.xml b/pig/accumulo.pig/pom.xml index 998d1ca..1a1e5f9 100644 --- a/pig/accumulo.pig/pom.xml +++ b/pig/accumulo.pig/pom.xml @@ -1,56 +1,50 @@ <?xml version="1.0" encoding="UTF-8"?> + +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> <parent> - <groupId>mvm.rya</groupId> + <groupId>org.apache.rya</groupId> <artifactId>rya.pig</artifactId> <version>3.2.10-SNAPSHOT</version> </parent> - <modelVersion>4.0.0</modelVersion> <artifactId>accumulo.pig</artifactId> - <name>${project.groupId}.${project.artifactId}</name> + <name>Apache Rya Accumulo Pig</name> + <dependencies> <dependency> - <groupId>org.apache.zookeeper</groupId> - <artifactId>zookeeper</artifactId> - <exclusions> - <!-- the log4j that comes with zookeeper 3.3.5 has some bad dependencies --> - <exclusion> - <groupId>javax.jms</groupId> - <artifactId>jms</artifactId> - </exclusion> - <exclusion> - <groupId>com.sun.jdmk</groupId> - <artifactId>jmxtools</artifactId> - </exclusion> - <exclusion> - <groupId>com.sun.jmx</groupId> - <artifactId>jmxri</artifactId> - </exclusion> - </exclusions> + <groupId>org.apache.rya</groupId> + <artifactId>rya.sail</artifactId> </dependency> <dependency> - <groupId>junit</groupId> - <artifactId>junit</artifactId> - <scope>test</scope> - </dependency> - <dependency> - <groupId>mvm.rya</groupId> - <artifactId>rya.sail.impl</artifactId> - </dependency> - <dependency> - <groupId>mvm.rya</groupId> + <groupId>org.apache.rya</groupId> <artifactId>accumulo.rya</artifactId> </dependency> + <dependency> <groupId>org.openrdf.sesame</groupId> <artifactId>sesame-queryparser-sparql</artifactId> </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-common</artifactId> - <scope>provided</scope> - </dependency> + <dependency> <groupId>org.apache.pig</groupId> <artifactId>pig</artifactId> @@ -61,43 +55,30 @@ <artifactId>antlr-runtime</artifactId> <scope>provided</scope> </dependency> + + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> </dependencies> <build> <plugins> <plugin> + <groupId>org.apache.rat</groupId> + <artifactId>apache-rat-plugin</artifactId> + <configuration> + <excludes combine.children="append"> + <exclude>src/test/resources/ResultsFile1.txt</exclude> + <exclude>src/test/resources/testQuery.txt</exclude> + <exclude>src/test/resources/testQuery2.txt</exclude> + </excludes> + </configuration> + </plugin> + <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> </plugin> </plugins> </build> - - <profiles> - <profile> - <id>accumulo</id> - <activation> - <activeByDefault>true</activeByDefault> - </activation> - <dependencies> - <dependency> - <groupId>org.apache.accumulo</groupId> - <artifactId>accumulo-core</artifactId> - <optional>true</optional> - </dependency> - </dependencies> - </profile> - <profile> - <id>cloudbase</id> - <activation> - <activeByDefault>false</activeByDefault> - </activation> - <dependencies> - <dependency> - <groupId>com.texeltek</groupId> - <artifactId>accumulo-cloudbase-shim</artifactId> - <optional>true</optional> - </dependency> - </dependencies> - </profile> - </profiles> - </project>
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/pig/accumulo.pig/src/main/java/mvm/rya/accumulo/pig/AccumuloStorage.java ---------------------------------------------------------------------- diff --git a/pig/accumulo.pig/src/main/java/mvm/rya/accumulo/pig/AccumuloStorage.java b/pig/accumulo.pig/src/main/java/mvm/rya/accumulo/pig/AccumuloStorage.java index 6ffedfe..054146d 100644 --- a/pig/accumulo.pig/src/main/java/mvm/rya/accumulo/pig/AccumuloStorage.java +++ b/pig/accumulo.pig/src/main/java/mvm/rya/accumulo/pig/AccumuloStorage.java @@ -1,25 +1,26 @@ package mvm.rya.accumulo.pig; /* - * #%L - * mvm.rya.accumulo.pig - * %% - * Copyright (C) 2014 Rya - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ + + import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.DataInputStream; @@ -379,4 +380,4 @@ public class AccumuloStorage extends LoadFunc implements StoreFuncInterface, Ord stream.close(); return range; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/pig/accumulo.pig/src/main/java/mvm/rya/accumulo/pig/IndexWritingTool.java ---------------------------------------------------------------------- diff --git a/pig/accumulo.pig/src/main/java/mvm/rya/accumulo/pig/IndexWritingTool.java b/pig/accumulo.pig/src/main/java/mvm/rya/accumulo/pig/IndexWritingTool.java index d5c289d..392c108 100644 --- a/pig/accumulo.pig/src/main/java/mvm/rya/accumulo/pig/IndexWritingTool.java +++ b/pig/accumulo.pig/src/main/java/mvm/rya/accumulo/pig/IndexWritingTool.java @@ -1,25 +1,26 @@ package mvm.rya.accumulo.pig; /* - * #%L - * mvm.rya.accumulo.pig - * %% - * Copyright (C) 2014 Rya - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ + + import java.io.File; import java.io.IOException; import java.util.List; http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/pig/accumulo.pig/src/main/java/mvm/rya/accumulo/pig/SparqlQueryPigEngine.java ---------------------------------------------------------------------- diff --git a/pig/accumulo.pig/src/main/java/mvm/rya/accumulo/pig/SparqlQueryPigEngine.java b/pig/accumulo.pig/src/main/java/mvm/rya/accumulo/pig/SparqlQueryPigEngine.java index 10ddf4c..ed8134d 100644 --- a/pig/accumulo.pig/src/main/java/mvm/rya/accumulo/pig/SparqlQueryPigEngine.java +++ b/pig/accumulo.pig/src/main/java/mvm/rya/accumulo/pig/SparqlQueryPigEngine.java @@ -1,25 +1,26 @@ package mvm.rya.accumulo.pig; /* - * #%L - * mvm.rya.accumulo.pig - * %% - * Copyright (C) 2014 Rya - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ + + import com.google.common.base.Preconditions; import com.google.common.io.ByteStreams; import mvm.rya.accumulo.AccumuloRdfConfiguration; http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/pig/accumulo.pig/src/main/java/mvm/rya/accumulo/pig/SparqlToPigTransformVisitor.java ---------------------------------------------------------------------- diff --git a/pig/accumulo.pig/src/main/java/mvm/rya/accumulo/pig/SparqlToPigTransformVisitor.java b/pig/accumulo.pig/src/main/java/mvm/rya/accumulo/pig/SparqlToPigTransformVisitor.java index b2d6886..38d8adb 100644 --- a/pig/accumulo.pig/src/main/java/mvm/rya/accumulo/pig/SparqlToPigTransformVisitor.java +++ b/pig/accumulo.pig/src/main/java/mvm/rya/accumulo/pig/SparqlToPigTransformVisitor.java @@ -1,25 +1,26 @@ package mvm.rya.accumulo.pig; /* - * #%L - * mvm.rya.accumulo.pig - * %% - * Copyright (C) 2014 Rya - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ + + import org.openrdf.model.Literal; import org.openrdf.model.URI; import org.openrdf.model.Value; http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/pig/accumulo.pig/src/main/java/mvm/rya/accumulo/pig/StatementPatternStorage.java ---------------------------------------------------------------------- diff --git a/pig/accumulo.pig/src/main/java/mvm/rya/accumulo/pig/StatementPatternStorage.java b/pig/accumulo.pig/src/main/java/mvm/rya/accumulo/pig/StatementPatternStorage.java index 86d9356..9ec9d45 100644 --- a/pig/accumulo.pig/src/main/java/mvm/rya/accumulo/pig/StatementPatternStorage.java +++ b/pig/accumulo.pig/src/main/java/mvm/rya/accumulo/pig/StatementPatternStorage.java @@ -1,25 +1,26 @@ package mvm.rya.accumulo.pig; /* - * #%L - * mvm.rya.accumulo.pig - * %% - * Copyright (C) 2014 Rya - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ + + import java.io.IOException; import java.util.Collection; import java.util.Map; http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/pig/accumulo.pig/src/main/java/mvm/rya/accumulo/pig/optimizer/SimilarVarJoinOptimizer.java ---------------------------------------------------------------------- diff --git a/pig/accumulo.pig/src/main/java/mvm/rya/accumulo/pig/optimizer/SimilarVarJoinOptimizer.java b/pig/accumulo.pig/src/main/java/mvm/rya/accumulo/pig/optimizer/SimilarVarJoinOptimizer.java index 181f72b..4b458b6 100644 --- a/pig/accumulo.pig/src/main/java/mvm/rya/accumulo/pig/optimizer/SimilarVarJoinOptimizer.java +++ b/pig/accumulo.pig/src/main/java/mvm/rya/accumulo/pig/optimizer/SimilarVarJoinOptimizer.java @@ -1,25 +1,26 @@ package mvm.rya.accumulo.pig.optimizer; /* - * #%L - * mvm.rya.accumulo.pig - * %% - * Copyright (C) 2014 Rya - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ + + import org.openrdf.query.BindingSet; import org.openrdf.query.Dataset; import org.openrdf.query.algebra.*; @@ -206,4 +207,4 @@ public class SimilarVarJoinOptimizer implements QueryOptimizer { } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/pig/accumulo.pig/src/test/java/mvm/rya/accumulo/pig/AccumuloStorageTest.java ---------------------------------------------------------------------- diff --git a/pig/accumulo.pig/src/test/java/mvm/rya/accumulo/pig/AccumuloStorageTest.java b/pig/accumulo.pig/src/test/java/mvm/rya/accumulo/pig/AccumuloStorageTest.java index ca52afb..119ccb1 100644 --- a/pig/accumulo.pig/src/test/java/mvm/rya/accumulo/pig/AccumuloStorageTest.java +++ b/pig/accumulo.pig/src/test/java/mvm/rya/accumulo/pig/AccumuloStorageTest.java @@ -1,25 +1,26 @@ package mvm.rya.accumulo.pig; /* - * #%L - * mvm.rya.accumulo.pig - * %% - * Copyright (C) 2014 Rya - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ + + import java.io.IOException; import java.util.ArrayList; import java.util.List; http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/pig/accumulo.pig/src/test/java/mvm/rya/accumulo/pig/IndexWritingToolTest.java ---------------------------------------------------------------------- diff --git a/pig/accumulo.pig/src/test/java/mvm/rya/accumulo/pig/IndexWritingToolTest.java b/pig/accumulo.pig/src/test/java/mvm/rya/accumulo/pig/IndexWritingToolTest.java index 33ed54b..02a6f84 100644 --- a/pig/accumulo.pig/src/test/java/mvm/rya/accumulo/pig/IndexWritingToolTest.java +++ b/pig/accumulo.pig/src/test/java/mvm/rya/accumulo/pig/IndexWritingToolTest.java @@ -1,5 +1,25 @@ package mvm.rya.accumulo.pig; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + import java.io.File; import java.io.IOException; import java.util.Map; http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/pig/accumulo.pig/src/test/java/mvm/rya/accumulo/pig/SparqlQueryPigEngineTest.java ---------------------------------------------------------------------- diff --git a/pig/accumulo.pig/src/test/java/mvm/rya/accumulo/pig/SparqlQueryPigEngineTest.java b/pig/accumulo.pig/src/test/java/mvm/rya/accumulo/pig/SparqlQueryPigEngineTest.java index d6cde67..e4cf10e 100644 --- a/pig/accumulo.pig/src/test/java/mvm/rya/accumulo/pig/SparqlQueryPigEngineTest.java +++ b/pig/accumulo.pig/src/test/java/mvm/rya/accumulo/pig/SparqlQueryPigEngineTest.java @@ -1,25 +1,26 @@ package mvm.rya.accumulo.pig; /* - * #%L - * mvm.rya.accumulo.pig - * %% - * Copyright (C) 2014 Rya - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ + + import junit.framework.TestCase; import org.apache.pig.ExecType; http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/pig/accumulo.pig/src/test/java/mvm/rya/accumulo/pig/SparqlToPigTransformVisitorTest.java ---------------------------------------------------------------------- diff --git a/pig/accumulo.pig/src/test/java/mvm/rya/accumulo/pig/SparqlToPigTransformVisitorTest.java b/pig/accumulo.pig/src/test/java/mvm/rya/accumulo/pig/SparqlToPigTransformVisitorTest.java index 1b0a383..b011a24 100644 --- a/pig/accumulo.pig/src/test/java/mvm/rya/accumulo/pig/SparqlToPigTransformVisitorTest.java +++ b/pig/accumulo.pig/src/test/java/mvm/rya/accumulo/pig/SparqlToPigTransformVisitorTest.java @@ -1,25 +1,26 @@ package mvm.rya.accumulo.pig; /* - * #%L - * mvm.rya.accumulo.pig - * %% - * Copyright (C) 2014 Rya - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ + + import junit.framework.TestCase; import mvm.rya.accumulo.pig.optimizer.SimilarVarJoinOptimizer; import org.openrdf.query.algebra.QueryRoot; http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/pig/accumulo.pig/src/test/java/mvm/rya/accumulo/pig/StatementPatternStorageTest.java ---------------------------------------------------------------------- diff --git a/pig/accumulo.pig/src/test/java/mvm/rya/accumulo/pig/StatementPatternStorageTest.java b/pig/accumulo.pig/src/test/java/mvm/rya/accumulo/pig/StatementPatternStorageTest.java index ea6d438..5bc4a34 100644 --- a/pig/accumulo.pig/src/test/java/mvm/rya/accumulo/pig/StatementPatternStorageTest.java +++ b/pig/accumulo.pig/src/test/java/mvm/rya/accumulo/pig/StatementPatternStorageTest.java @@ -1,25 +1,26 @@ package mvm.rya.accumulo.pig; /* - * #%L - * mvm.rya.accumulo.pig - * %% - * Copyright (C) 2014 Rya - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ + + import java.io.IOException; import java.util.ArrayList; import java.util.List; http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/pig/cloudbase.pig/pom.xml ---------------------------------------------------------------------- diff --git a/pig/cloudbase.pig/pom.xml b/pig/cloudbase.pig/pom.xml deleted file mode 100644 index dfd44cf..0000000 --- a/pig/cloudbase.pig/pom.xml +++ /dev/null @@ -1,65 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <parent> - <groupId>mvm.rya</groupId> - <artifactId>rya.pig</artifactId> - <version>3.2.10-SNAPSHOT</version> - </parent> - <modelVersion>4.0.0</modelVersion> - - <artifactId>cloudbase.pig</artifactId> - <name>${project.groupId}.${project.artifactId}</name> - <dependencies> - <dependency> - <groupId>cloudbase</groupId> - <artifactId>cloudbase-core</artifactId> - <exclusions> - <exclusion> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-common</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> - <groupId>org.apache.zookeeper</groupId> - <artifactId>zookeeper</artifactId> - </dependency> - <dependency> - <groupId>mvm.rya</groupId> - <artifactId>rya.sail.impl</artifactId> - </dependency> - <dependency> - <groupId>mvm.rya</groupId> - <artifactId>cloudbase.rya</artifactId> - </dependency> - <dependency> - <groupId>org.openrdf.sesame</groupId> - <artifactId>sesame-queryparser-sparql</artifactId> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-common</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.pig</groupId> - <artifactId>pig</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.antlr</groupId> - <artifactId>antlr-runtime</artifactId> - <scope>provided</scope> - </dependency> - </dependencies> - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-shade-plugin</artifactId> - </plugin> - </plugins> - </build> - - -</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/pig/cloudbase.pig/src/main/java/mvm/rya/cloudbase/pig/CloudbaseStorage.java ---------------------------------------------------------------------- diff --git a/pig/cloudbase.pig/src/main/java/mvm/rya/cloudbase/pig/CloudbaseStorage.java b/pig/cloudbase.pig/src/main/java/mvm/rya/cloudbase/pig/CloudbaseStorage.java deleted file mode 100644 index bfda504..0000000 --- a/pig/cloudbase.pig/src/main/java/mvm/rya/cloudbase/pig/CloudbaseStorage.java +++ /dev/null @@ -1,318 +0,0 @@ -package mvm.rya.cloudbase.pig; - -import cloudbase.core.CBConstants; -import cloudbase.core.client.mapreduce.CloudbaseInputFormat; -import cloudbase.core.client.mapreduce.CloudbaseOutputFormat; -import cloudbase.core.data.Key; -import cloudbase.core.data.Mutation; -import cloudbase.core.data.Range; -import cloudbase.core.data.Value; -import cloudbase.core.security.Authorizations; -import cloudbase.core.security.ColumnVisibility; -import cloudbase.core.util.Pair; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.*; -import org.apache.pig.LoadFunc; -import org.apache.pig.ResourceSchema; -import org.apache.pig.StoreFuncInterface; -import org.apache.pig.backend.executionengine.ExecException; -import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit; -import org.apache.pig.data.DataByteArray; -import org.apache.pig.data.Tuple; -import org.apache.pig.data.TupleFactory; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.LinkedList; -import java.util.List; - -/** - * A LoadStoreFunc for retrieving data from and storing data to Accumulo - * <p/> - * A Key/Val pair will be returned as tuples: (key, colfam, colqual, colvis, timestamp, value). All fields except timestamp are DataByteArray, timestamp is a long. - * <p/> - * Tuples can be written in 2 forms: - * (key, colfam, colqual, colvis, value) - * OR - * (key, colfam, colqual, value) - */ -public class CloudbaseStorage extends LoadFunc implements StoreFuncInterface { - private static final Log logger = LogFactory.getLog(CloudbaseStorage.class); - - protected Configuration conf; - protected RecordReader<Key, Value> reader; - protected RecordWriter<Text, Mutation> writer; - - protected String inst; - protected String zookeepers; - protected String user = ""; - protected String password = ""; - protected String table; - protected Text tableName; - protected String auths; - protected Authorizations authorizations = CBConstants.NO_AUTHS; - protected List<Pair<Text, Text>> columnFamilyColumnQualifierPairs = new LinkedList<Pair<Text, Text>>(); - - protected Collection<Range> ranges = new ArrayList<Range>(); - protected boolean mock = false; - - public CloudbaseStorage() { - } - - @Override - public Tuple getNext() throws IOException { - try { - // load the next pair - if (!reader.nextKeyValue()) { - logger.info("Reached end of results"); - return null; - } - - Key key = (Key) reader.getCurrentKey(); - Value value = (Value) reader.getCurrentValue(); - assert key != null && value != null; - - if (logger.isTraceEnabled()) { - logger.trace("Found key[" + key + "] and value[" + value + "]"); - } - - // and wrap it in a tuple - Tuple tuple = TupleFactory.getInstance().newTuple(6); - tuple.set(0, new DataByteArray(key.getRow().getBytes())); - tuple.set(1, new DataByteArray(key.getColumnFamily().getBytes())); - tuple.set(2, new DataByteArray(key.getColumnQualifier().getBytes())); - tuple.set(3, new DataByteArray(key.getColumnVisibility().getBytes())); - tuple.set(4, key.getTimestamp()); - tuple.set(5, new DataByteArray(value.get())); - if (logger.isTraceEnabled()) { - logger.trace("Output tuple[" + tuple + "]"); - } - return tuple; - } catch (InterruptedException e) { - throw new IOException(e.getMessage()); - } - } - - @Override - public InputFormat getInputFormat() { - return new CloudbaseInputFormat(); - } - - @Override - public void prepareToRead(RecordReader reader, PigSplit split) { - this.reader = reader; - } - - @Override - public void setLocation(String location, Job job) throws IOException { - if (logger.isDebugEnabled()) { - logger.debug("Set Location[" + location + "] for job[" + job.getJobName() + "]"); - } - conf = job.getConfiguration(); - setLocationFromUri(location, job); - - if (!conf.getBoolean(CloudbaseInputFormat.class.getSimpleName() + ".configured", false)) { - CloudbaseInputFormat.setInputInfo(job, user, password.getBytes(), table, authorizations); - if (!mock) { - CloudbaseInputFormat.setZooKeeperInstance(job, inst, zookeepers); - } else { - CloudbaseInputFormat.setMockInstance(job, inst); - } - } - if (columnFamilyColumnQualifierPairs.size() > 0) - CloudbaseInputFormat.fetchColumns(job, columnFamilyColumnQualifierPairs); - logger.info("Set ranges[" + ranges + "] for job[" + job.getJobName() + "] on table[" + table + "] " + - "for columns[" + columnFamilyColumnQualifierPairs + "] with authorizations[" + authorizations + "]"); - - if (ranges.size() == 0) { - throw new IOException("Cloudbase Range must be specified"); - } - CloudbaseInputFormat.setRanges(job, ranges); - } - - protected void setLocationFromUri(String uri, Job job) throws IOException { - // ex: cloudbase://table1?instance=myinstance&user=root&password=secret&zookeepers=127.0.0.1:2181&auths=PRIVATE,PUBLIC&columns=col1|cq1,col2|cq2&range=a|z&range=1|9&mock=true - try { - if (!uri.startsWith("cloudbase://")) - throw new Exception("Bad scheme."); - String[] urlParts = uri.split("\\?"); - setLocationFromUriParts(urlParts); - - } catch (Exception e) { - throw new IOException("Expected 'cloudbase://<table>[?instance=<instanceName>&user=<user>&password=<password>&zookeepers=<zookeepers>&auths=<authorizations>&[range=startRow|endRow[...],columns=[cf1|cq1,cf2|cq2,...]],mock=true(false)]': " + e.getMessage(), e); - } - } - - protected void setLocationFromUriParts(String[] urlParts) { - String columns = ""; - if (urlParts.length > 1) { - for (String param : urlParts[1].split("&")) { - String[] pair = param.split("="); - if (pair[0].equals("instance")) { - inst = pair[1]; - } else if (pair[0].equals("user")) { - user = pair[1]; - } else if (pair[0].equals("password")) { - password = pair[1]; - } else if (pair[0].equals("zookeepers")) { - zookeepers = pair[1]; - } else if (pair[0].equals("auths")) { - auths = pair[1]; - } else if (pair[0].equals("columns")) { - columns = pair[1]; - } else if (pair[0].equals("range")) { - String[] r = pair[1].split("\\|"); - if (r.length == 2) { - addRange(new Range(r[0], r[1])); - } else { - addRange(new Range(r[0])); - } - } else if (pair[0].equals("mock")) { - this.mock = Boolean.parseBoolean(pair[1]); - } - addLocationFromUriPart(pair); - } - } - String[] parts = urlParts[0].split("/+"); - table = parts[1]; - tableName = new Text(table); - - if (auths == null || auths.equals("")) { - authorizations = new Authorizations(); - } else { - authorizations = new Authorizations(auths.split(",")); - } - - if (!columns.equals("")) { - for (String cfCq : columns.split(",")) { - if (cfCq.contains("|")) { - String[] c = cfCq.split("\\|"); - String cf = c[0]; - String cq = c[1]; - addColumnPair(cf, cq); - } else { - addColumnPair(cfCq, null); - } - } - } - } - - protected void addColumnPair(String cf, String cq) { - columnFamilyColumnQualifierPairs.add(new Pair<Text, Text>((cf != null) ? new Text(cf) : null, (cq != null) ? new Text(cq) : null)); - } - - protected void addLocationFromUriPart(String[] pair) { - - } - - protected void addRange(Range range) { - ranges.add(range); - } - - @Override - public String relativeToAbsolutePath(String location, Path curDir) throws IOException { - return location; - } - - @Override - public void setUDFContextSignature(String signature) { - - } - - /* StoreFunc methods */ - public void setStoreFuncUDFContextSignature(String signature) { - - } - - public String relToAbsPathForStoreLocation(String location, Path curDir) throws IOException { - return relativeToAbsolutePath(location, curDir); - } - - public void setStoreLocation(String location, Job job) throws IOException { - conf = job.getConfiguration(); - setLocationFromUri(location, job); - - if (!conf.getBoolean(CloudbaseOutputFormat.class.getSimpleName() + ".configured", false)) { - CloudbaseOutputFormat.setOutputInfo(job, user, password.getBytes(), true, table); - CloudbaseOutputFormat.setZooKeeperInstance(job, inst, zookeepers); - CloudbaseOutputFormat.setMaxLatency(job, 10 * 1000); - CloudbaseOutputFormat.setMaxMutationBufferSize(job, 10 * 1000 * 1000); - CloudbaseOutputFormat.setMaxWriteThreads(job, 10); - } - } - - public OutputFormat getOutputFormat() { - return new CloudbaseOutputFormat(); - } - - public void checkSchema(ResourceSchema schema) throws IOException { - // we don't care about types, they all get casted to ByteBuffers - } - - public void prepareToWrite(RecordWriter writer) { - this.writer = writer; - } - - public void putNext(Tuple t) throws ExecException, IOException { - Mutation mut = new Mutation(objToText(t.get(0))); - Text cf = objToText(t.get(1)); - Text cq = objToText(t.get(2)); - - if (t.size() > 4) { - Text cv = objToText(t.get(3)); - Value val = new Value(objToBytes(t.get(4))); - if (cv.getLength() == 0) { - mut.put(cf, cq, val); - } else { - mut.put(cf, cq, new ColumnVisibility(cv), val); - } - } else { - Value val = new Value(objToBytes(t.get(3))); - mut.put(cf, cq, val); - } - - try { - writer.write(tableName, mut); - } catch (InterruptedException e) { - throw new IOException(e); - } - } - - private static Text objToText(Object o) { - return new Text(objToBytes(o)); - } - - private static byte[] objToBytes(Object o) { - if (o instanceof String) { - String str = (String) o; - return str.getBytes(); - } else if (o instanceof Long) { - Long l = (Long) o; - return l.toString().getBytes(); - } else if (o instanceof Integer) { - Integer l = (Integer) o; - return l.toString().getBytes(); - } else if (o instanceof Boolean) { - Boolean l = (Boolean) o; - return l.toString().getBytes(); - } else if (o instanceof Float) { - Float l = (Float) o; - return l.toString().getBytes(); - } else if (o instanceof Double) { - Double l = (Double) o; - return l.toString().getBytes(); - } - - // TODO: handle DataBag, Map<Object, Object>, and Tuple - - return ((DataByteArray) o).get(); - } - - public void cleanupOnFailure(String failure, Job job) { - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/pig/cloudbase.pig/src/main/java/mvm/rya/cloudbase/pig/SparqlQueryPigEngine.java ---------------------------------------------------------------------- diff --git a/pig/cloudbase.pig/src/main/java/mvm/rya/cloudbase/pig/SparqlQueryPigEngine.java b/pig/cloudbase.pig/src/main/java/mvm/rya/cloudbase/pig/SparqlQueryPigEngine.java deleted file mode 100644 index daf6c4f..0000000 --- a/pig/cloudbase.pig/src/main/java/mvm/rya/cloudbase/pig/SparqlQueryPigEngine.java +++ /dev/null @@ -1,237 +0,0 @@ -package mvm.rya.cloudbase.pig; - -import cloudbase.core.client.Connector; -import cloudbase.core.client.ZooKeeperInstance; -import com.google.common.base.Preconditions; -import com.google.common.io.ByteStreams; -import mvm.rya.cloudbase.CloudbaseRdfConfiguration; -import mvm.rya.cloudbase.CloudbaseRdfEvalStatsDAO; -import mvm.rya.cloudbase.CloudbaseRyaDAO; -import mvm.rya.cloudbase.pig.optimizer.SimilarVarJoinOptimizer; -import mvm.rya.rdftriplestore.evaluation.QueryJoinOptimizer; -import mvm.rya.rdftriplestore.evaluation.RdfCloudTripleStoreEvaluationStatistics; -import mvm.rya.rdftriplestore.inference.InferenceEngine; -import mvm.rya.rdftriplestore.inference.InverseOfVisitor; -import mvm.rya.rdftriplestore.inference.SymmetricPropertyVisitor; -import mvm.rya.rdftriplestore.inference.TransitivePropertyVisitor; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.pig.ExecType; -import org.apache.pig.PigServer; -import org.openrdf.query.algebra.QueryRoot; -import org.openrdf.query.parser.ParsedQuery; -import org.openrdf.query.parser.QueryParser; -import org.openrdf.query.parser.sparql.SPARQLParser; - -import java.io.ByteArrayInputStream; -import java.io.FileInputStream; -import java.io.IOException; - -/** - * Created by IntelliJ IDEA. - * Date: 4/23/12 - * Time: 9:31 AM - * To change this template use File | Settings | File Templates. - */ -public class SparqlQueryPigEngine { - private static final Log logger = LogFactory.getLog(SparqlQueryPigEngine.class); - - private String hadoopDir; - private ExecType execType = ExecType.MAPREDUCE; //default to mapreduce - private boolean inference = true; - private boolean stats = true; - private SparqlToPigTransformVisitor sparqlToPigTransformVisitor; - private PigServer pigServer; - private InferenceEngine inferenceEngine = null; - private RdfCloudTripleStoreEvaluationStatistics rdfCloudTripleStoreEvaluationStatistics; - private CloudbaseRyaDAO ryaDAO; - private CloudbaseRdfEvalStatsDAO rdfEvalStatsDAO; - - public CloudbaseRdfConfiguration getConf() { - return conf; - } - - public void setConf(CloudbaseRdfConfiguration conf) { - this.conf = conf; - } - - CloudbaseRdfConfiguration conf = new CloudbaseRdfConfiguration(); - - public void init() throws Exception { - Preconditions.checkNotNull(sparqlToPigTransformVisitor, "Sparql To Pig Transform Visitor must not be null"); - logger.info("Initializing Sparql Query Pig Engine"); - if (hadoopDir != null) { - //set hadoop dir property - System.setProperty("HADOOPDIR", hadoopDir); - } - //TODO: Maybe have validation of the HadoopDir system property - - if (pigServer == null) { - pigServer = new PigServer(execType); - } - - if (inference || stats) { - Connector connector = new ZooKeeperInstance("stratus", "stratus13:2181").getConnector("root", "password".getBytes()); - - String tablePrefix = sparqlToPigTransformVisitor.getTablePrefix(); - conf.setTablePrefix(tablePrefix); - if (inference) { - logger.info("Using inference"); - inferenceEngine = new InferenceEngine(); - ryaDAO = new CloudbaseRyaDAO(); - ryaDAO.setConf(conf); - ryaDAO.setConnector(connector); - ryaDAO.init(); - - inferenceEngine.setRyaDAO(ryaDAO); - inferenceEngine.setConf(conf); - inferenceEngine.setSchedule(false); - inferenceEngine.init(); - } - if (stats) { - logger.info("Using stats"); - rdfEvalStatsDAO = new CloudbaseRdfEvalStatsDAO(); - rdfEvalStatsDAO.setConf(conf); - rdfEvalStatsDAO.setConnector(connector); -// rdfEvalStatsDAO.setEvalTable(tablePrefix + RdfCloudTripleStoreConstants.TBL_EVAL_SUFFIX); - rdfEvalStatsDAO.init(); - rdfCloudTripleStoreEvaluationStatistics = new RdfCloudTripleStoreEvaluationStatistics(conf, rdfEvalStatsDAO); - } - } - } - - public void destroy() throws Exception { - logger.info("Shutting down Sparql Query Pig Engine"); - pigServer.shutdown(); - if (ryaDAO != null) { - ryaDAO.destroy(); - } - if (inferenceEngine != null) { - inferenceEngine.destroy(); - } - if (rdfEvalStatsDAO != null) { - rdfEvalStatsDAO.destroy(); - } - } - - /** - * Transform a sparql query into a pig script and execute it. Save results in hdfsSaveLocation - * - * @param sparql to execute - * @param hdfsSaveLocation to save the execution - * @throws IOException - */ - public void runQuery(String sparql, String hdfsSaveLocation) throws IOException { - Preconditions.checkNotNull(sparql, "Sparql query cannot be null"); - Preconditions.checkNotNull(hdfsSaveLocation, "Hdfs save location cannot be null"); - logger.info("Running query[" + sparql + "]\n to Location[" + hdfsSaveLocation + "]"); - pigServer.deleteFile(hdfsSaveLocation); - try { - String pigScript = generatePigScript(sparql); - if (logger.isDebugEnabled()) { - logger.debug("Pig script [" + pigScript + "]"); - } - pigServer.registerScript(new ByteArrayInputStream(pigScript.getBytes())); - pigServer.store("PROJ", hdfsSaveLocation); //TODO: Make this a constant - } catch (Exception e) { - throw new IOException(e); - } - } - - public String generatePigScript(String sparql) throws Exception { - Preconditions.checkNotNull(sparql, "Sparql query cannot be null"); - QueryParser parser = new SPARQLParser(); - ParsedQuery parsedQuery = parser.parseQuery(sparql, null); - QueryRoot tupleExpr = new QueryRoot(parsedQuery.getTupleExpr()); - - SimilarVarJoinOptimizer similarVarJoinOptimizer = new SimilarVarJoinOptimizer(); - similarVarJoinOptimizer.optimize(tupleExpr, null, null); - - if (inference || stats) { - if (inference) { - tupleExpr.visit(new TransitivePropertyVisitor(conf, inferenceEngine)); - tupleExpr.visit(new SymmetricPropertyVisitor(conf, inferenceEngine)); - tupleExpr.visit(new InverseOfVisitor(conf, inferenceEngine)); - } - if (stats) { - (new QueryJoinOptimizer(rdfCloudTripleStoreEvaluationStatistics)).optimize(tupleExpr, null, null); - } - } - - sparqlToPigTransformVisitor.meet(tupleExpr); - return sparqlToPigTransformVisitor.getPigScript(); - } - - public static void main(String[] args) { - try { - Preconditions.checkArgument(args.length == 7, "Usage: java -cp <jar>:$PIG_LIB <class> sparqlFile hdfsSaveLocation cbinstance cbzk cbuser cbpassword rdfTablePrefix.\n " + - "Sample command: java -cp java -cp cloudbase.pig-2.0.0-SNAPSHOT-shaded.jar:/usr/local/hadoop-etc/hadoop-0.20.2/hadoop-0.20.2-core.jar:/srv_old/hdfs-tmp/pig/pig-0.9.2/pig-0.9.2.jar:$HADOOP_HOME/conf mvm.rya.cloudbase.pig.SparqlQueryPigEngine tstSpqrl.query temp/engineTest stratus stratus13:2181 root password l_"); - String sparql = new String(ByteStreams.toByteArray(new FileInputStream(args[0]))); - String hdfsSaveLocation = args[1]; - SparqlToPigTransformVisitor visitor = new SparqlToPigTransformVisitor(); - visitor.setTablePrefix(args[6]); - visitor.setInstance(args[2]); - visitor.setZk(args[3]); - visitor.setUser(args[4]); - visitor.setPassword(args[5]); - - SparqlQueryPigEngine engine = new SparqlQueryPigEngine(); - engine.setSparqlToPigTransformVisitor(visitor); - engine.init(); - - engine.runQuery(sparql, hdfsSaveLocation); - - engine.destroy(); - } catch (Exception e) { - e.printStackTrace(); - } - } - - public String getHadoopDir() { - return hadoopDir; - } - - public void setHadoopDir(String hadoopDir) { - this.hadoopDir = hadoopDir; - } - - public PigServer getPigServer() { - return pigServer; - } - - public void setPigServer(PigServer pigServer) { - this.pigServer = pigServer; - } - - public ExecType getExecType() { - return execType; - } - - public void setExecType(ExecType execType) { - this.execType = execType; - } - - public boolean isInference() { - return inference; - } - - public void setInference(boolean inference) { - this.inference = inference; - } - - public boolean isStats() { - return stats; - } - - public void setStats(boolean stats) { - this.stats = stats; - } - - public SparqlToPigTransformVisitor getSparqlToPigTransformVisitor() { - return sparqlToPigTransformVisitor; - } - - public void setSparqlToPigTransformVisitor(SparqlToPigTransformVisitor sparqlToPigTransformVisitor) { - this.sparqlToPigTransformVisitor = sparqlToPigTransformVisitor; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/pig/cloudbase.pig/src/main/java/mvm/rya/cloudbase/pig/SparqlToPigTransformVisitor.java ---------------------------------------------------------------------- diff --git a/pig/cloudbase.pig/src/main/java/mvm/rya/cloudbase/pig/SparqlToPigTransformVisitor.java b/pig/cloudbase.pig/src/main/java/mvm/rya/cloudbase/pig/SparqlToPigTransformVisitor.java deleted file mode 100644 index e037597..0000000 --- a/pig/cloudbase.pig/src/main/java/mvm/rya/cloudbase/pig/SparqlToPigTransformVisitor.java +++ /dev/null @@ -1,323 +0,0 @@ -package mvm.rya.cloudbase.pig; - -import org.openrdf.model.Literal; -import org.openrdf.model.URI; -import org.openrdf.model.Value; -import org.openrdf.query.algebra.*; -import org.openrdf.query.algebra.helpers.QueryModelVisitorBase; - -import java.util.*; - -/** - * Created by IntelliJ IDEA. - * Date: 4/12/12 - * Time: 10:17 AM - * To change this template use File | Settings | File Templates. - */ -public class SparqlToPigTransformVisitor extends QueryModelVisitorBase<RuntimeException> { - private StringBuilder pigScriptBuilder = new StringBuilder(); - private String tablePrefix; - private String instance, zk, user, password; //TODO: use a Configuration object to get these - - private Map<String, String> varToSet = new HashMap<String, String>(); - private Map<TupleExpr, List<String>> exprToNames = new HashMap<TupleExpr, List<String>>(); - private Map<TupleExpr, String> exprToVar = new HashMap<TupleExpr, String>(); - - private char i = 'A'; //TODO: do better, hack - - public SparqlToPigTransformVisitor() { - pigScriptBuilder.append("set pig.splitCombination false;\n") - .append("set default_parallel 13;\n") //TODO: set parallel properly - .append("set mapred.map.tasks.speculative.execution false;\n") - .append("set mapred.reduce.tasks.speculative.execution false;\n") - .append("set io.sort.mb 256;\n") - .append("set mapred.compress.map.output true;\n") - .append("set mapred.map.output.compression.codec org.apache.hadoop.io.compress.GzipCodec;\n") - .append("set io.file.buffer.size 65536;\n") - .append("set io.sort.factor 25;\n"); - } - - @Override - public void meet(StatementPattern node) throws RuntimeException { - super.meet(node); - String subjValue = getVarValue(node.getSubjectVar()); - String predValue = getVarValue(node.getPredicateVar()); - String objValue = getVarValue(node.getObjectVar()); - - String subj = i + "_s"; - String pred = i + "_p"; - String obj = i + "_o"; - String var = i + ""; - if (node.getSubjectVar().getValue() == null) { //TODO: look nicer - subj = node.getSubjectVar().getName(); - varToSet.put(subj, var); - - addToExprToNames(node, subj); - } - if (node.getPredicateVar().getValue() == null) { //TODO: look nicer - pred = node.getPredicateVar().getName(); - varToSet.put(pred, var); - - addToExprToNames(node, pred); - } - if (node.getObjectVar().getValue() == null) { //TODO: look nicer - obj = node.getObjectVar().getName(); - varToSet.put(obj, var); - - addToExprToNames(node, obj); - } - if(node.getContextVar() != null && node.getContextVar().getValue() == null) { - String cntxtName = node.getContextVar().getName(); - varToSet.put(cntxtName, var); - - addToExprToNames(node, cntxtName); - } - //load 'l_' using mvm.rya.cloudbase.pig.dep.StatementPatternStorage('<http://www.Department0.University0.edu>', '', '', - // 'stratus', 'stratus13:2181', 'root', 'password') AS (dept:chararray, p:chararray, univ:chararray); -// pigScriptBuilder.append(i).append(" = load '").append(tablePrefix).append("' using mvm.rya.cloudbase.pig.dep.StatementPatternStorage('") -// .append(subjValue).append("','").append(predValue).append("','").append(objValue).append("','").append(instance).append("','") -// .append(zk).append("','").append(user).append("','").append(password).append("') AS (").append(subj).append(":chararray, ") -// .append(pred).append(":chararray, ").append(obj).append(":chararray);\n"); - - //load 'cloudbase://tablePrefix?instance=myinstance&user=root&password=secret&zookeepers=127.0.0.1:2181&auths=PRIVATE,PUBLIC&subject=a&predicate=b&object=c' - //using mvm.rya.cloudbase.pig.StatementPatternStorage() AS (dept:chararray, p:chararray, univ:chararray); - pigScriptBuilder.append(i).append(" = load 'cloudbase://").append(tablePrefix).append("?instance=").append(instance).append("&user=").append(user) - .append("&password=").append(password).append("&zookeepers=").append(zk); - if (subjValue != null && subjValue.length() > 0) { - pigScriptBuilder.append("&subject=").append(subjValue); - } - if (predValue != null && predValue.length() > 0) { - pigScriptBuilder.append("&predicate=").append(predValue); - } - if (objValue != null && objValue.length() > 0) { - pigScriptBuilder.append("&object=").append(objValue); - } - if(node.getContextVar() != null && node.getContextVar().getValue() != null) { - pigScriptBuilder.append("&context=").append(getVarValue(node.getContextVar())); - } - - pigScriptBuilder.append("' using mvm.rya.cloudbase.pig.StatementPatternStorage() AS (").append(subj).append(":chararray, ") - .append(pred).append(":chararray, ").append(obj).append(":chararray"); - if(node.getContextVar() != null) { - Value cntxtValue = node.getContextVar().getValue(); - String cntxtName = null; - if(cntxtValue == null) { - //use name - cntxtName = node.getContextVar().getName(); - } else { - cntxtName = i + "_c"; - } - pigScriptBuilder.append(", ").append(cntxtName).append(":chararray"); - } - pigScriptBuilder.append(");\n"); - //TODO: add auths - - exprToVar.put(node, var); - i++; - } - - private void addToExprToNames(TupleExpr node, String name) { - List<String> names = exprToNames.get(node); - if (names == null) { - names = new ArrayList<String>(); - exprToNames.put(node, names); - } - names.add(name); - } - - @Override - public void meet(Union node) throws RuntimeException { - super.meet(node); - - TupleExpr leftArg = node.getLeftArg(); - TupleExpr rightArg = node.getRightArg(); - String left_var = exprToVar.get(leftArg); - String right_var = exprToVar.get(rightArg); - //Q = UNION ONSCHEMA B, P; - pigScriptBuilder.append(i).append(" = UNION ONSCHEMA ").append(left_var).append(", ").append(right_var).append(";\n"); - - String unionVar = i + ""; - List<String> left_names = exprToNames.get(leftArg); - List<String> right_names = exprToNames.get(rightArg); - for (String name : left_names) { - varToSet.put(name, unionVar); - addToExprToNames(node, name); - } - for (String name : right_names) { - varToSet.put(name, unionVar); - addToExprToNames(node, name); - } - exprToVar.put(node, unionVar); - i++; - } - - @Override - public void meet(Join node) throws RuntimeException { - super.meet(node); - - TupleExpr leftArg = node.getLeftArg(); - TupleExpr rightArg = node.getRightArg(); - List<String> left_names = exprToNames.get(leftArg); - List<String> right_names = exprToNames.get(rightArg); - - Set<String> joinNames = new HashSet<String>(left_names); - joinNames.retainAll(right_names); //intersection, this is what I join on - //SEC = join FIR by (MEMB_OF::ugrad, SUBORG_J::univ), UGRADDEG by (ugrad, univ); - StringBuilder joinStr = new StringBuilder(); - joinStr.append("("); - boolean first = true; - for (String name : joinNames) { //TODO: Make this a utility method - if (!first) { - joinStr.append(","); - } - first = false; - joinStr.append(name); - } - joinStr.append(")"); - - String left_var = exprToVar.get(leftArg); - String right_var = exprToVar.get(rightArg); - if (joinStr.length() <= 2) { - //no join params, need to cross - pigScriptBuilder.append(i).append(" = cross ").append(left_var).append(", ").append(right_var).append(";\n"); - } else { - //join - pigScriptBuilder.append(i).append(" = join ").append(left_var); - pigScriptBuilder.append(" by ").append(joinStr); - pigScriptBuilder.append(", ").append(right_var); - pigScriptBuilder.append(" by ").append(joinStr); - pigScriptBuilder.append(";\n"); - - } - - String joinVarStr = i + ""; - i++; - // D = foreach C GENERATE A::subj AS subj:chararray, A::A_p AS p:chararray; - String forEachVarStr = i + ""; - pigScriptBuilder.append(i).append(" = foreach ").append(joinVarStr).append(" GENERATE "); - Map<String, String> nameToJoinName = new HashMap<String, String>(); - for (String name : left_names) { - varToSet.put(name, forEachVarStr); - addToExprToNames(node, name); - nameToJoinName.put(name, left_var + "::" + name); - } - for (String name : right_names) { - varToSet.put(name, forEachVarStr); - addToExprToNames(node, name); - nameToJoinName.put(name, right_var + "::" + name); - } - - first = true; - for (Map.Entry entry : nameToJoinName.entrySet()) { - if (!first) { - pigScriptBuilder.append(","); - } - first = false; - pigScriptBuilder.append(entry.getValue()).append(" AS ").append(entry.getKey()).append(":chararray "); - } - pigScriptBuilder.append(";\n"); - - exprToVar.put(node, forEachVarStr); - i++; - } - - @Override - public void meet(Projection node) throws RuntimeException { - super.meet(node); - ProjectionElemList list = node.getProjectionElemList(); - String set = null; - StringBuilder projList = new StringBuilder(); - boolean first = true; - //TODO: we do not support projections from multiple pig statements yet - for (String name : list.getTargetNames()) { - set = varToSet.get(name); //TODO: overwrite - if (set == null) { - throw new IllegalArgumentException("Have not found any pig logic for name[" + name + "]"); - } - if (!first) { - projList.append(","); - } - first = false; - projList.append(name); - } - if (set == null) - throw new IllegalArgumentException(""); //TODO: Fill this - //SUBORG = FOREACH SUBORG_L GENERATE dept, univ; - pigScriptBuilder.append("PROJ = FOREACH ").append(set).append(" GENERATE ").append(projList.toString()).append(";\n"); - } - - @Override - public void meet(Slice node) throws RuntimeException { - super.meet(node); - long limit = node.getLimit(); - //PROJ = LIMIT PROJ 10; - pigScriptBuilder.append("PROJ = LIMIT PROJ ").append(limit).append(";\n"); - } - - public String getPassword() { - return password; - } - - public void setPassword(String password) { - this.password = password; - } - - public String getUser() { - return user; - } - - public void setUser(String user) { - this.user = user; - } - - public String getZk() { - return zk; - } - - public void setZk(String zk) { - this.zk = zk; - } - - public String getInstance() { - return instance; - } - - public void setInstance(String instance) { - this.instance = instance; - } - - public String getTablePrefix() { - return tablePrefix; - } - - public void setTablePrefix(String tablePrefix) { - this.tablePrefix = tablePrefix; - } - - public String getPigScript() { - return pigScriptBuilder.toString(); - } - - protected String getVarValue(Var var) { - if (var == null) { - return ""; - } else { - Value value = var.getValue(); - if (value == null) { - return ""; - } - if (value instanceof URI) { - return "<" + value.stringValue() + ">"; - } - if (value instanceof Literal) { - Literal lit = (Literal) value; - if (lit.getDatatype() == null) { - //string - return "\\'" + value.stringValue() + "\\'"; - } - } - return value.stringValue(); - } - - } -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/pig/cloudbase.pig/src/main/java/mvm/rya/cloudbase/pig/StatementPatternStorage.java ---------------------------------------------------------------------- diff --git a/pig/cloudbase.pig/src/main/java/mvm/rya/cloudbase/pig/StatementPatternStorage.java b/pig/cloudbase.pig/src/main/java/mvm/rya/cloudbase/pig/StatementPatternStorage.java deleted file mode 100644 index f9acb70..0000000 --- a/pig/cloudbase.pig/src/main/java/mvm/rya/cloudbase/pig/StatementPatternStorage.java +++ /dev/null @@ -1,278 +0,0 @@ -package mvm.rya.cloudbase.pig; - -import cloudbase.core.client.ZooKeeperInstance; -import cloudbase.core.client.mock.MockInstance; -import cloudbase.core.data.Key; -import cloudbase.core.data.Range; -import com.google.common.io.ByteArrayDataInput; -import com.google.common.io.ByteStreams; -import mvm.rya.api.RdfCloudTripleStoreConstants; -import mvm.rya.api.RdfCloudTripleStoreUtils; -import mvm.rya.api.domain.RyaStatement; -import mvm.rya.api.domain.RyaType; -import mvm.rya.api.domain.RyaURI; -import mvm.rya.api.persist.RyaDAOException; -import mvm.rya.api.query.strategy.ByteRange; -import mvm.rya.api.query.strategy.TriplePatternStrategy; -import mvm.rya.api.resolver.RdfToRyaConversions; -import mvm.rya.api.resolver.RyaContext; -import mvm.rya.api.resolver.triple.TripleRow; -import mvm.rya.cloudbase.CloudbaseRdfConfiguration; -import mvm.rya.cloudbase.CloudbaseRyaDAO; -import mvm.rya.rdftriplestore.inference.InferenceEngine; -import mvm.rya.rdftriplestore.inference.InferenceEngineException; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.Job; -import org.apache.pig.data.Tuple; -import org.apache.pig.data.TupleFactory; -import org.openrdf.model.Resource; -import org.openrdf.model.URI; -import org.openrdf.model.Value; -import org.openrdf.model.vocabulary.RDF; -import org.openrdf.query.MalformedQueryException; -import org.openrdf.query.algebra.StatementPattern; -import org.openrdf.query.algebra.Var; -import org.openrdf.query.algebra.helpers.QueryModelVisitorBase; -import org.openrdf.query.parser.ParsedQuery; -import org.openrdf.query.parser.QueryParser; -import org.openrdf.query.parser.sparql.SPARQLParser; - -import java.io.IOException; -import java.util.Collection; -import java.util.Map; -import java.util.Set; - -import static mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT; - -/** - */ -public class StatementPatternStorage extends CloudbaseStorage { - private static final Log logger = LogFactory.getLog(StatementPatternStorage.class); - protected TABLE_LAYOUT layout; - protected String subject = "?s"; - protected String predicate = "?p"; - protected String object = "?o"; - protected String context; - private Value subject_value; - private Value predicate_value; - private Value object_value; - - private RyaContext ryaContext = RyaContext.getInstance(); - - /** - * whether to turn inferencing on or off - */ - private boolean infer = true; - - public StatementPatternStorage() { - - } - - private Value getValue(Var subjectVar) { - return subjectVar.hasValue() ? subjectVar.getValue() : null; - } - - @Override - public void setLocation(String location, Job job) throws IOException { - super.setLocation(location, job); - } - - @Override - protected void setLocationFromUri(String uri, Job job) throws IOException { - super.setLocationFromUri(uri, job); - // ex: cloudbase://tablePrefix?instance=myinstance&user=root&password=secret&zookeepers=127.0.0.1:2181&auths=PRIVATE,PUBLIC&subject=a&predicate=b&object=c&context=c&infer=true - addStatementPatternRange(subject, predicate, object, context); - if (infer) { - addInferredRanges(table, job); - } - - if (layout == null || ranges.size() == 0) - throw new IllegalArgumentException("Range and/or layout is null. Check the query"); - table = RdfCloudTripleStoreUtils.layoutPrefixToTable(layout, table); - tableName = new Text(table); - } - - @Override - protected void addLocationFromUriPart(String[] pair) { - if (pair[0].equals("subject")) { - this.subject = pair[1]; - } else if (pair[0].equals("predicate")) { - this.predicate = pair[1]; - } else if (pair[0].equals("object")) { - this.object = pair[1]; - } else if (pair[0].equals("context")) { - this.context = pair[1]; - } else if (pair[0].equals("infer")) { - this.infer = Boolean.parseBoolean(pair[1]); - } - } - - protected void addStatementPatternRange(String subj, String pred, String obj, String ctxt) throws IOException { - logger.info("Adding statement pattern[subject:" + subj + ", predicate:" + pred + ", object:" + obj + ", context:" + ctxt + "]"); - StringBuilder sparqlBuilder = new StringBuilder(); - sparqlBuilder.append("select * where {\n"); - if (ctxt != null) { - /** - * select * where { - GRAPH ?g { - <http://www.example.org/exampleDocument#Monica> ?p ?o. - } - } - */ - sparqlBuilder.append("GRAPH ").append(ctxt).append(" {\n"); - } - sparqlBuilder.append(subj).append(" ").append(pred).append(" ").append(obj).append(".\n"); - if (ctxt != null) { - sparqlBuilder.append("}\n"); - } - sparqlBuilder.append("}\n"); - String sparql = sparqlBuilder.toString(); - - if (logger.isDebugEnabled()) { - logger.debug("Sparql statement range[" + sparql + "]"); - } - - QueryParser parser = new SPARQLParser(); - ParsedQuery parsedQuery = null; - try { - parsedQuery = parser.parseQuery(sparql, null); - } catch (MalformedQueryException e) { - throw new IOException(e); - } - parsedQuery.getTupleExpr().visitChildren(new QueryModelVisitorBase<IOException>() { - @Override - public void meet(StatementPattern node) throws IOException { - Var subjectVar = node.getSubjectVar(); - Var predicateVar = node.getPredicateVar(); - Var objectVar = node.getObjectVar(); - subject_value = getValue(subjectVar); - predicate_value = getValue(predicateVar); - object_value = getValue(objectVar); - Map.Entry<TABLE_LAYOUT, Range> temp = createRange(subject_value, predicate_value, object_value); -// Map.Entry<TABLE_LAYOUT, Range> temp = -// queryRangeFactory.defineRange(subject_value, predicate_value, object_value, null); - layout = temp.getKey(); - Range range = temp.getValue(); - addRange(range); - Var contextVar = node.getContextVar(); - if (contextVar != null && contextVar.getValue() != null) { - String context_str = contextVar.getValue().stringValue(); - addColumnPair(context_str, ""); - } - } - }); - } - - protected Map.Entry<TABLE_LAYOUT, Range> createRange(Value s_v, Value p_v, Value o_v) throws IOException { - RyaURI subject_rya = RdfToRyaConversions.convertResource((Resource) s_v); - RyaURI predicate_rya = RdfToRyaConversions.convertURI((URI) p_v); - RyaType object_rya = RdfToRyaConversions.convertValue(o_v); - TriplePatternStrategy strategy = ryaContext.retrieveStrategy(subject_rya, predicate_rya, object_rya, null); - if (strategy == null) - return new RdfCloudTripleStoreUtils.CustomEntry<TABLE_LAYOUT, Range>(TABLE_LAYOUT.SPO, new Range()); - Map.Entry<TABLE_LAYOUT, ByteRange> entry = strategy.defineRange(subject_rya, predicate_rya, object_rya, null, null); - ByteRange byteRange = entry.getValue(); - return new RdfCloudTripleStoreUtils.CustomEntry<mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT, Range>( - entry.getKey(), new Range(new Text(byteRange.getStart()), new Text(byteRange.getEnd())) - ); - } - - protected void addInferredRanges(String tablePrefix, Job job) throws IOException { - logger.info("Adding inferences to statement pattern[subject:" + subject_value + ", predicate:" + predicate_value + ", object:" + object_value + "]"); - //inference engine - CloudbaseRyaDAO ryaDAO = new CloudbaseRyaDAO(); - CloudbaseRdfConfiguration rdfConf = new CloudbaseRdfConfiguration(job.getConfiguration()); - rdfConf.setTablePrefix(tablePrefix); - ryaDAO.setConf(rdfConf); - InferenceEngine inferenceEngine = new InferenceEngine(); - inferenceEngine.setConf(rdfConf); - inferenceEngine.setRyaDAO(ryaDAO); - inferenceEngine.setSchedule(false); - try { - if (!mock) { - ryaDAO.setConnector(new ZooKeeperInstance(inst, zookeepers).getConnector(user, password.getBytes())); - } else { - ryaDAO.setConnector(new MockInstance(inst).getConnector(user, password.getBytes())); - } - - ryaDAO.init(); - inferenceEngine.init(); - //is it subclassof or subpropertyof - if (RDF.TYPE.equals(predicate_value)) { - //try subclassof - Collection<URI> parents = inferenceEngine.findParents(inferenceEngine.getSubClassOfGraph(), (URI) object_value); - if (parents != null && parents.size() > 0) { - //subclassof relationships found - //don't add self, that will happen anyway later - //add all relationships - for (URI parent : parents) { - Map.Entry<TABLE_LAYOUT, Range> temp = createRange(subject_value, predicate_value, parent); -// queryRangeFactory.defineRange(subject_value, predicate_value, parent, rdfConf); - Range range = temp.getValue(); - if (logger.isDebugEnabled()) { - logger.debug("Found subClassOf relationship [type:" + object_value + " is subClassOf:" + parent + "]"); - } - addRange(range); - } - } - } else if (predicate_value != null) { - //subpropertyof check - Set<URI> parents = inferenceEngine.findParents(inferenceEngine.getSubPropertyOfGraph(), (URI) predicate_value); - for (URI parent : parents) { - Map.Entry<TABLE_LAYOUT, Range> temp = createRange(subject_value, parent, object_value); -// queryRangeFactory.defineRange(subject_value, parent, object_value, rdfConf); - Range range = temp.getValue(); - if (logger.isDebugEnabled()) { - logger.debug("Found subPropertyOf relationship [type:" + predicate_value + " is subPropertyOf:" + parent + "]"); - } - addRange(range); - } - } - } catch (Exception e) { - throw new IOException(e); - } finally { - if (inferenceEngine != null) { - try { - inferenceEngine.destroy(); - } catch (InferenceEngineException e) { - throw new IOException(e); - } - } - - if (ryaDAO != null) - try { - ryaDAO.destroy(); - } catch (RyaDAOException e) { - throw new IOException(e); - } - } - - } - - @Override - public Tuple getNext() throws IOException { - try { - if (reader.nextKeyValue()) { - Key key = (Key) reader.getCurrentKey(); - cloudbase.core.data.Value value = (cloudbase.core.data.Value) reader.getCurrentValue(); - ByteArrayDataInput input = ByteStreams.newDataInput(key.getRow().getBytes()); - RyaStatement ryaStatement = ryaContext.deserializeTriple(layout, new TripleRow(key.getRow().getBytes(), - key.getColumnFamily().getBytes(), key.getColumnQualifier().getBytes())); -// RdfCloudTripleStoreUtils.translateStatementFromRow(input, -// key.getColumnFamily(), layout, RdfCloudTripleStoreConstants.VALUE_FACTORY); - - Tuple tuple = TupleFactory.getInstance().newTuple(4); - tuple.set(0, ryaStatement.getSubject().getData()); - tuple.set(1, ryaStatement.getPredicate().getData()); - tuple.set(2, ryaStatement.getObject().getData()); - tuple.set(3, (ryaStatement.getContext() != null) ? (ryaStatement.getContext().getData()) : (null)); - return tuple; - } - } catch (Exception e) { - throw new IOException(e); - } - return null; - } -}