This is an automated email from the ASF dual-hosted git repository. djkevincr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/gora.git
The following commit(s) were added to refs/heads/master by this push: new 392b264 GORA-546 Hazelcast Jet execution engine support (#175) 392b264 is described below commit 392b264fdb0da86556a632be5a12d41da3512f49 Author: Lahiru Jayasekara <mlpjayasek...@gmail.com> AuthorDate: Mon Mar 23 15:37:47 2020 +0530 GORA-546 Hazelcast Jet execution engine support (#175) * Add hazelcast jet source initial impls * Add HazelcastJet Source Vertex with distributed execution support * Rename class name * Improve test cases * Add gora-jet sink connector implementations * Refactor code and moved into gora-jet module * Fix Indentation to 2 spaces * Fix injecting datastore key into jet source * Fix JetSource data partitioning * Add Apache License * Remove printStackTrace * Add java doc comments * Fix test cases via hbase testing utilities * Remove printStackTrace * Resolve hazelcast dependency conflict * Resolve issues from review * Add LogAnalyticsJet tutorial class * Move gora-jet module up in pom.xml * Change snapshot version in gora-jet module * Add license header --- gora-jet/conf/gora-aerospike-mapping.xml | 44 + gora-jet/conf/gora-cassandra-mapping.xml | 51 ++ gora-jet/conf/gora-couchdb-mapping.xml | 41 + gora-jet/conf/gora-hbase-mapping.xml | 55 ++ gora-jet/conf/gora-solr-mapping.xml | 43 + gora-jet/conf/gora-sql-mapping.xml | 43 + gora-jet/conf/gora.properties | 66 ++ gora-jet/conf/hazelcast-client.xml | 31 + gora-jet/conf/hazelcast.xml | 34 + gora-jet/pom.xml | 131 +++ .../main/java/org/apache/gora/jet/JetEngine.java | 51 ++ .../org/apache/gora/jet/JetInputOutputFormat.java | 50 ++ .../src/main/java/org/apache/gora/jet/JetSink.java | 91 +++ .../main/java/org/apache/gora/jet/JetSource.java | 109 +++ gora-jet/src/test/avro/pageview.json | 15 + gora-jet/src/test/avro/resultPageView.json | 10 + .../src/test/java/org/apache/gora/jet/JetTest.java | 150 ++++ .../org/apache/gora/jet/generated/Pageview.java | 881 +++++++++++++++++++++ .../apache/gora/jet/generated/ResultPageView.java | 468 +++++++++++ gora-tutorial/pom.xml | 5 + .../apache/gora/tutorial/log/LogAnalyticsJet.java | 108 +++ pom.xml | 16 +- 22 files changed, 2492 insertions(+), 1 deletion(-) diff --git a/gora-jet/conf/gora-aerospike-mapping.xml b/gora-jet/conf/gora-aerospike-mapping.xml new file mode 100644 index 0000000..5506880 --- /dev/null +++ b/gora-jet/conf/gora-aerospike-mapping.xml @@ -0,0 +1,44 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<!-- + Gora Mapping file for Aerospike Backend +--> +<gora-otd> + + <policy name="write" gen="NONE" recordExists="UPDATE" commitLevel="COMMIT_ALL" durableDelete="false"/> + <policy name="read" priority="DEFAULT" consistencyLevel="CONSISTENCY_ONE" replica="SEQUENCE" maxRetries="2"/> + + <class name="org.apache.gora.tutorial.log.generated.Pageview" keyClass="java.lang.Long" set="AccessLog" namespace = "test"> + <field name="url" bin="url"/> + <field name="timestamp" bin="timestamp"/> + <field name="ip" bin="ip" /> + <field name="httpMethod" bin="httpMethod"/> + <field name="httpStatusCode" bin="httpStatusCode"/> + <field name="responseSize" bin="responseSize"/> + <field name="referrer" bin="referrer"/> + <field name="userAgent" bin="userAgent"/> + </class> + + <class name="org.apache.gora.tutorial.log.generated.MetricDatum" keyClass="java.lang.String" set="Metrics" namespace = "test"> + <field name="metricDimension" bin="metricDimension"/> + <field name="timestamp" bin="ts"/> + <field name="metric" bin="metric"/> + </class> + +</gora-otd> diff --git a/gora-jet/conf/gora-cassandra-mapping.xml b/gora-jet/conf/gora-cassandra-mapping.xml new file mode 100644 index 0000000..bb0275f --- /dev/null +++ b/gora-jet/conf/gora-cassandra-mapping.xml @@ -0,0 +1,51 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<!-- + Gora Mapping file for Cassandra Backend +--> +<gora-otd> + + <keyspace name="Pageview" cluster="Test Cluster" host="localhost"> + <family name="common"/> + <family name="http"/> + <family name="misc"/> + </keyspace> + + <keyspace name="Metrics" cluster="Test Cluster" host="localhost"> + <family name="common"/> + </keyspace> + + <class name="org.apache.gora.tutorial.log.generated.Pageview" keyClass="java.lang.Long" keyspace="Pageview"> + <field name="url" family="common" qualifier="url"/> + <field name="timestamp" family="common" qualifier="timestamp"/> + <field name="ip" family="common" qualifier="ip" /> + <field name="httpMethod" family="http" qualifier="httpMethod"/> + <field name="httpStatusCode" family="http" qualifier="httpStatusCode"/> + <field name="responseSize" family="http" qualifier="responseSize"/> + <field name="referrer" family="misc" qualifier="referrer"/> + <field name="userAgent" family="misc" qualifier="userAgent"/> + </class> + + <class name="org.apache.gora.tutorial.log.generated.MetricDatum" keyClass="java.lang.String" keyspace="Metrics"> + <field name="metricDimension" family="common" qualifier="metricDimension"/> + <field name="timestamp" family="common" qualifier="ts"/> + <field name="metric" family="common" qualifier="metric"/> + </class> + +</gora-otd> diff --git a/gora-jet/conf/gora-couchdb-mapping.xml b/gora-jet/conf/gora-couchdb-mapping.xml new file mode 100644 index 0000000..2cbee77 --- /dev/null +++ b/gora-jet/conf/gora-couchdb-mapping.xml @@ -0,0 +1,41 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<!-- + Gora Mapping file for CouchDB Backend +--> +<gora-otd> + + <class name="org.apache.gora.tutorial.log.generated.Pageview" keyClass="java.lang.Long" document="AccessLog"> + <field name="url"/> + <field name="timestamp"/> + <field name="ip"/> + <field name="httpMethod"/> + <field name="httpStatusCode"/> + <field name="responseSize"/> + <field name="referrer"/> + <field name="userAgent"/> + </class> + + <class name="org.apache.gora.tutorial.log.generated.MetricDatum" keyClass="java.lang.String" document="Metrics"> + <field name="metricDimension"/> + <field name="timestamp"/> + <field name="metric"/> + </class> + +</gora-otd> diff --git a/gora-jet/conf/gora-hbase-mapping.xml b/gora-jet/conf/gora-hbase-mapping.xml new file mode 100644 index 0000000..9722144 --- /dev/null +++ b/gora-jet/conf/gora-hbase-mapping.xml @@ -0,0 +1,55 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<!-- + Gora Mapping file for HBase Backend +--> +<gora-otd> + <table name="Pageview"> <!-- optional descriptors for tables --> + <family name="common"/> <!-- This can also have params like compression, bloom filters --> + <family name="http"/> + <family name="misc"/> + </table> + + <table name="ResultPageView"> <!-- optional descriptors for tables --> + <family name="common"/> <!-- This can also have params like compression, bloom filters --> + </table> + + <class name="org.apache.gora.jet.generated.Pageview" keyClass="java.lang.Long" table="AccessLog"> + <field name="url" family="common" qualifier="url"/> + <field name="timestamp" family="common" qualifier="timestamp"/> + <field name="ip" family="common" qualifier="ip" /> + <field name="httpMethod" family="http" qualifier="httpMethod"/> + <field name="httpStatusCode" family="http" qualifier="httpStatusCode"/> + <field name="responseSize" family="http" qualifier="responseSize"/> + <field name="referrer" family="misc" qualifier="referrer"/> + <field name="userAgent" family="misc" qualifier="userAgent"/> + </class> + + <class name="org.apache.gora.jet.generated.MetricDatum" keyClass="java.lang.String" table="Metrics"> + <field name="metricDimension" family="common" qualifier="metricDimension"/> + <field name="timestamp" family="common" qualifier="ts"/> + <field name="metric" family="common" qualifier="metric"/> + </class> + + <class name="org.apache.gora.jet.generated.ResultPageView" keyClass="java.lang.Long" table="Results"> + <field name="url" family="common" qualifier="url"/> + <field name="timestamp" family="common" qualifier="timestamp"/> + <field name="ip" family="common" qualifier="ip" /> + </class> +</gora-otd> diff --git a/gora-jet/conf/gora-solr-mapping.xml b/gora-jet/conf/gora-solr-mapping.xml new file mode 100644 index 0000000..5fe10fb --- /dev/null +++ b/gora-jet/conf/gora-solr-mapping.xml @@ -0,0 +1,43 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<!-- + Gora Mapping file for SQL Backend +--> +<gora-otd> + <class name="org.apache.gora.tutorial.log.generated.Pageview" keyClass="java.lang.Long" table="AccessLog"> + <primarykey column="line"/> + <field name="url" column="url"/> + <field name="timestamp" column="timestamp"/> + <field name="ip" column="ip"/> + <field name="httpMethod" column="httpMethod"/> + <field name="httpStatusCode" column="httpStatusCode"/> + <field name="responseSize" column="responseSize"/> + <field name="referrer" column="referrer"/> + <field name="userAgent" column="userAgent"/> + </class> + + <class name="org.apache.gora.tutorial.log.generated.MetricDatum" keyClass="java.lang.String" table="Metrics"> + <primarykey column="id"/> + <field name="metricDimension" column="metricDimension"/> + <field name="timestamp" column="ts"/> + <field name="metric" column="metric"/> + </class> + +</gora-otd> + diff --git a/gora-jet/conf/gora-sql-mapping.xml b/gora-jet/conf/gora-sql-mapping.xml new file mode 100644 index 0000000..90f1cfc --- /dev/null +++ b/gora-jet/conf/gora-sql-mapping.xml @@ -0,0 +1,43 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<!-- + Gora Mapping file for SQL Backend +--> +<gora-otd> + <class name="org.apache.gora.tutorial.log.generated.Pageview" keyClass="java.lang.Long" table="AccessLog"> + <primarykey column="line"/> + <field name="url" column="url" length="512" primarykey="true"/> + <field name="timestamp" column="timestamp"/> + <field name="ip" column="ip" length="16"/> + <field name="httpMethod" column="httpMethod" length="6"/> + <field name="httpStatusCode" column="httpStatusCode"/> + <field name="responseSize" column="responseSize"/> + <field name="referrer" column="referrer" length="512"/> + <field name="userAgent" column="userAgent" length="512"/> + </class> + + <class name="org.apache.gora.tutorial.log.generated.MetricDatum" keyClass="java.lang.String" table="Metrics"> + <primarykey column="id" length="512"/> + <field name="metricDimension" column="metricDimension" length="512"/> + <field name="timestamp" column="ts"/> + <field name="metric" column="metric"/> + </class> + +</gora-otd> + diff --git a/gora-jet/conf/gora.properties b/gora-jet/conf/gora.properties new file mode 100644 index 0000000..ddce0ed --- /dev/null +++ b/gora-jet/conf/gora.properties @@ -0,0 +1,66 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +##gora.datastore.default is the default detastore implementation to use +##if it is not passed to the DataStoreFactory#createDataStore() method. +gora.datastore.default=org.apache.gora.hbase.store.HBaseStore +#gora.datastore.default=org.apache.gora.couchdb.store.CouchDBStore +#gora.datastore.default=org.apache.gora.cassandra.store.CassandraStore +#gora.datastore.default=org.apache.gora.solr.store.SolrStore +#gora.datastore.default=org.apache.gora.aerospike.store.AerospikeStore + +#gora.datastore.default=org.apache.gora.avro.store.AvroStore +#gora.avrostore.input.path=hdfs://localhost:9000/gora.avrostore.test.input +#gora.avrostore.output.path=hdfs://localhost:9000/gora.avrostore.test.output +#gora.avrostore.codec.type=JSON || BINARY + +##whether to create schema automatically if not exists. +gora.datastore.autocreateschema=true + +##Cassandra properties for gora-cassandra module using Cassandra +#gora.cassandrastore.servers=localhost:9160 + +##JDBC properties for gora-sql module using HSQL +gora.sqlstore.jdbc.driver=org.hsqldb.jdbcDriver +##HSQL jdbc connection as persistent in-process database +gora.sqlstore.jdbc.url=jdbc:hsqldb:file:./hsql-data + +##HSQL jdbc connection as network server +#gora.sqlstore.jdbc.url=jdbc:hsqldb:hsql://localhost/goratest + +##JDBC properties for gora-sql module using MySQL +#gora.sqlstore.jdbc.driver=com.mysql.jdbc.Driver +#gora.sqlstore.jdbc.url=jdbc:mysql://localhost:3306/goratest +#gora.sqlstore.jdbc.user=root +#gora.sqlstore.jdbc.password= + +gora.solrstore.solr.url=http://localhost:8983/solr +gora.solrstore.solr.commitwithin=0 +gora.solrstore.solr.batchsize=100 +# set which Solrj server impl you wish to use +# cloud, concurrent, http, loadbalance +gora.solrstore.solr.solrjserver=http + +#JCache dataStore properties +gora.cache.datastore.default=org.apache.gora.jcache.store.JCacheStore +gora.datastore.jcache.provider=com.hazelcast.cache.impl.HazelcastServerCachingProvider +#gora.datastore.jcache.provider=com.hazelcast.client.cache.impl.HazelcastClientCachingProvider + +##Aerospike dataStore properties +#gora.aerospikestore.server.ip=localhost +#gora.aerospikestore.server.port=3000 +#gora.aerospikestore.server.username= +#gora.aerospikestore.server.password= diff --git a/gora-jet/conf/hazelcast-client.xml b/gora-jet/conf/hazelcast-client.xml new file mode 100644 index 0000000..93f21e2 --- /dev/null +++ b/gora-jet/conf/hazelcast-client.xml @@ -0,0 +1,31 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<!-- + Hazelcast client side cache provider configuration. +--> + +<hazelcast-client xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://www.hazelcast.com/schema/client-config + http://www.hazelcast.com/schema/client-config/hazelcast-client-config-3.6.xsd" + xmlns="http://www.hazelcast.com/schema/client-config"> + <network> + <cluster-members> + <address>127.0.0.1</address> + </cluster-members> + </network> +</hazelcast-client> diff --git a/gora-jet/conf/hazelcast.xml b/gora-jet/conf/hazelcast.xml new file mode 100755 index 0000000..9838710 --- /dev/null +++ b/gora-jet/conf/hazelcast.xml @@ -0,0 +1,34 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<!-- + Hazelcast server side cache provider configuration. +--> + +<hazelcast xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://www.hazelcast.com/schema/config + http://www.hazelcast.com/schema/config/hazelcast-config-3.6.xsd" + xmlns="http://www.hazelcast.com/schema/config"> + <network> + <join> + <multicast enabled="false"/> + <tcp-ip enabled="true"> + <member>127.0.0.1</member> + </tcp-ip> + </join> + </network> +</hazelcast> diff --git a/gora-jet/pom.xml b/gora-jet/pom.xml new file mode 100644 index 0000000..36b1734 --- /dev/null +++ b/gora-jet/pom.xml @@ -0,0 +1,131 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.gora</groupId> + <artifactId>gora</artifactId> + <version>1.0-SNAPSHOT</version> + <relativePath>../</relativePath> + </parent> + + <artifactId>gora-jet</artifactId> + <packaging>bundle</packaging> + + <name>Apache Gora :: Jet</name> + <url>http://gora.apache.org</url> + <description>Jet -> Gora -> Jet Sink and Source connectors</description> + <inceptionYear>2010</inceptionYear> + <organization> + <name>The Apache Software Foundation</name> + <url>http://www.apache.org/</url> + </organization> + + <properties> + <osgi.import>*</osgi.import> + <osgi.export>org.apache.gora.jet*;version="${project.version}";-noimport:=true</osgi.export> + </properties> + + <build> + <directory>target</directory> + <outputDirectory>target/classes</outputDirectory> + <finalName>${project.artifactId}-${project.version}</finalName> + <testOutputDirectory>target/test-classes</testOutputDirectory> + <testSourceDirectory>src/test/java</testSourceDirectory> + <sourceDirectory>src/main/java</sourceDirectory> + <resources> + <resource> + <directory>${basedir}/src/main/resources</directory> + </resource> + <resource> + <directory>${basedir}/conf</directory> + </resource> + </resources> + <plugins> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>build-helper-maven-plugin</artifactId> + <version>${build-helper-maven-plugin.version}</version> + <executions> + <execution> + <phase>generate-sources</phase> + <goals> + <goal>add-source</goal> + </goals> + <configuration> + <sources> + <source>src/examples/java</source> + </sources> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + + <dependencies> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>jcl-over-slf4j</artifactId> + </dependency> + + <!-- Gora Internal Dependencies --> + <dependency> + <groupId>org.apache.gora</groupId> + <artifactId>gora-core</artifactId> + <scope>compile</scope> + </dependency> + + <dependency> + <groupId>org.apache.gora</groupId> + <artifactId>gora-hbase</artifactId> + </dependency> + + <!-- jet Dependencies --> + <dependency> + <groupId>com.hazelcast.jet</groupId> + <artifactId>hazelcast-jet</artifactId> + </dependency> + + <!-- Logging Dependencies --> + <dependency> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + <exclusions> + <exclusion> + <groupId>javax.jms</groupId> + <artifactId>jms</artifactId> + </exclusion> + </exclusions> + </dependency> + + <!-- Testing Dependencies --> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <version>2.5.2</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-minicluster</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-testing-util</artifactId> + <type>test-jar</type> + <scope>test</scope> + </dependency> + </dependencies> + +</project> \ No newline at end of file diff --git a/gora-jet/src/main/java/org/apache/gora/jet/JetEngine.java b/gora-jet/src/main/java/org/apache/gora/jet/JetEngine.java new file mode 100644 index 0000000..004b561 --- /dev/null +++ b/gora-jet/src/main/java/org/apache/gora/jet/JetEngine.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gora.jet; + +import com.hazelcast.jet.pipeline.BatchSource; +import com.hazelcast.jet.pipeline.Sink; +import com.hazelcast.jet.pipeline.Sinks; +import com.hazelcast.jet.pipeline.Sources; +import org.apache.gora.persistency.impl.PersistentBase; +import org.apache.gora.query.Query; +import org.apache.gora.store.DataStore; + +/** + * Core class which handles Gora - Jet Engine integration. + */ +public class JetEngine<KeyIn, ValueIn extends PersistentBase, KeyOut, ValueOut extends PersistentBase> { + static DataStore dataOutStore; + static DataStore dataInStore; + static Query query; + + public BatchSource<JetInputOutputFormat<KeyIn, ValueIn>> createDataSource(DataStore<KeyIn, ValueIn> dataOutStore) { + return createDataSource(dataOutStore, dataOutStore.newQuery()); + } + + public BatchSource<JetInputOutputFormat<KeyIn, ValueIn>> createDataSource(DataStore<KeyIn, ValueIn> dataOutStore, + Query<KeyIn, ValueIn> query) { + JetEngine.dataInStore = dataOutStore; + JetEngine.query = query; + return Sources.batchFromProcessor("gora-jet-source", new JetSource<KeyIn, ValueIn>()); + } + + public Sink<JetInputOutputFormat<KeyOut, ValueOut>> createDataSink(DataStore<KeyOut, ValueOut> dataOutStore) { + JetEngine.dataOutStore = dataOutStore; + return Sinks.fromProcessor("gora-jet-sink", new JetSink<KeyOut, ValueOut>()); + } +} diff --git a/gora-jet/src/main/java/org/apache/gora/jet/JetInputOutputFormat.java b/gora-jet/src/main/java/org/apache/gora/jet/JetInputOutputFormat.java new file mode 100644 index 0000000..b4652be --- /dev/null +++ b/gora-jet/src/main/java/org/apache/gora/jet/JetInputOutputFormat.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gora.jet; + +import org.apache.gora.persistency.impl.PersistentBase; + +/** + * Wrapper class which will be used to fetch data from data stores to Gora- + * jet-source and to write data into data stores through Gora-jet-sink. + */ +public class JetInputOutputFormat<KeyOut, ValueOut extends PersistentBase> { + KeyOut key; + ValueOut value; + + public JetInputOutputFormat(KeyOut key, ValueOut value) { + this.key = key; + this.value = value; + } + + public KeyOut getKey() { + return key; + } + + public void setKey(KeyOut key) { + this.key = key; + } + + public ValueOut getValue() { + return value; + } + + public void setValue(ValueOut value) { + this.value = value; + } +} diff --git a/gora-jet/src/main/java/org/apache/gora/jet/JetSink.java b/gora-jet/src/main/java/org/apache/gora/jet/JetSink.java new file mode 100644 index 0000000..dcdb28e --- /dev/null +++ b/gora-jet/src/main/java/org/apache/gora/jet/JetSink.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gora.jet; + +import com.hazelcast.jet.core.AbstractProcessor; +import com.hazelcast.jet.core.ProcessorMetaSupplier; +import com.hazelcast.jet.core.ProcessorSupplier; +import com.hazelcast.nio.Address; +import org.apache.gora.persistency.impl.PersistentBase; +import org.apache.gora.util.GoraException; + +import javax.annotation.Nonnull; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Function; + +import static java.util.stream.Collectors.toList; +import static java.util.stream.IntStream.range; + +/** + * jet-sink implementation. + */ +public class JetSink<KeyOut, ValueOut extends PersistentBase> implements ProcessorMetaSupplier { + + private transient int localParallelism; + + @Override + public void init(@Nonnull Context context) { + localParallelism = context.localParallelism(); + } + + @Nonnull + @Override + public Function<? super Address, ? extends ProcessorSupplier> get(@Nonnull List<Address> addresses) { + Map<Address, ProcessorSupplier> map = new HashMap<>(); + for (int i = 0; i < addresses.size(); i++) { + //globalIndexBase is the first processor index in a certain Jet-Cluster member + int globalIndexBase = localParallelism * i; + + // processorCount will be equal to localParallelism: + ProcessorSupplier supplier = processorCount -> + range(globalIndexBase, globalIndexBase + processorCount) + .mapToObj(globalIndex -> + new SinkProcessor<KeyOut, ValueOut>() + ).collect(toList()); + map.put(addresses.get(i), supplier); + } + return map::get; + } +} + +class SinkProcessor<KeyOut, ValueOut extends PersistentBase> extends AbstractProcessor { + + @Override + public boolean isCooperative() { + return false; + } + + @Override + @SuppressWarnings("unchecked") + protected boolean tryProcess(int ordinal, Object item) throws Exception { + JetEngine.dataOutStore.put(((JetInputOutputFormat<KeyOut, ValueOut>) item).getKey(), + ((JetInputOutputFormat<KeyOut, ValueOut>) item).getValue()); + return true; + } + + @Override + public void close() { + try { + JetEngine.dataOutStore.flush(); + } catch (GoraException e) { + throw new RuntimeException(e); + } + } +} diff --git a/gora-jet/src/main/java/org/apache/gora/jet/JetSource.java b/gora-jet/src/main/java/org/apache/gora/jet/JetSource.java new file mode 100644 index 0000000..86d1d23 --- /dev/null +++ b/gora-jet/src/main/java/org/apache/gora/jet/JetSource.java @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gora.jet; + +import com.hazelcast.jet.Traverser; +import com.hazelcast.jet.core.AbstractProcessor; +import com.hazelcast.jet.core.ProcessorMetaSupplier; +import com.hazelcast.jet.core.ProcessorSupplier; +import com.hazelcast.nio.Address; +import org.apache.gora.persistency.impl.PersistentBase; +import org.apache.gora.query.PartitionQuery; +import org.apache.gora.query.Result; + +import javax.annotation.Nonnull; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Function; + +import static com.hazelcast.jet.Traversers.traverseIterable; +import static java.util.stream.Collectors.toList; +import static java.util.stream.IntStream.range; + +/** + * jet-source implementation. + */ +public class JetSource<KeyIn, ValueIn extends PersistentBase> implements ProcessorMetaSupplier { + + private int totalParallelism; + private transient int localParallelism; + + @Override + public void init(@Nonnull Context context) { + totalParallelism = context.totalParallelism(); + localParallelism = context.localParallelism(); + } + + @Nonnull + @Override + public Function<? super Address, ? extends ProcessorSupplier> get(@Nonnull List<Address> addresses) { + Map<Address, ProcessorSupplier> map = new HashMap<>(); + for (int i = 0; i < addresses.size(); i++) { + // We'll calculate the global index of each processor in the cluster: + //globalIndexBase is the first processor index in a certain Jet-Cluster member + int globalIndexBase = localParallelism * i; + + // processorCount will be equal to localParallelism: + ProcessorSupplier supplier = processorCount -> + range(globalIndexBase, globalIndexBase + processorCount) + .mapToObj(globalIndex -> + new GoraJetProcessor<KeyIn, ValueIn>(getPartitionedData(globalIndex)) + ).collect(toList()); + map.put(addresses.get(i), supplier); + } + return map::get; + } + + @SuppressWarnings("unchecked") + private List<JetInputOutputFormat<KeyIn, ValueIn>> getPartitionedData(int globalIndex) { + try { + List<PartitionQuery<KeyIn, ValueIn>> partitionQueries = JetEngine.dataInStore.getPartitions(JetEngine.query); + List<JetInputOutputFormat<KeyIn, ValueIn>> resultsList = new ArrayList<>(); + int i = 1; + int partitionNo = globalIndex; + while (partitionNo < partitionQueries.size()) { + Result<KeyIn, ValueIn> result = null; + result = partitionQueries.get(partitionNo).execute(); + while (result.next()) { + resultsList.add(new JetInputOutputFormat<>(result.getKey(), result.get())); + } + partitionNo = (i * totalParallelism) + globalIndex; + i++; + } + return resultsList; + } catch (Exception e) { + throw new RuntimeException(e); + } + } +} + +class GoraJetProcessor<KeyIn, ValueIn extends PersistentBase> extends AbstractProcessor { + + private final Traverser<JetInputOutputFormat<KeyIn, ValueIn>> traverser; + + GoraJetProcessor(List<JetInputOutputFormat<KeyIn, ValueIn>> list) { + this.traverser = traverseIterable(list); + } + + @Override + public boolean complete() { + return emitFromTraverser(traverser); + } +} diff --git a/gora-jet/src/test/avro/pageview.json b/gora-jet/src/test/avro/pageview.json new file mode 100644 index 0000000..10412f2 --- /dev/null +++ b/gora-jet/src/test/avro/pageview.json @@ -0,0 +1,15 @@ +{ + "type": "record", + "name": "Pageview", "default":null, + "namespace": "org.apache.gora.jet.generated", + "fields" : [ + {"name": "url", "type": ["null","string"], "default":null}, + {"name": "timestamp", "type": "long", "default":0}, + {"name": "ip", "type": ["null","string"], "default":null}, + {"name": "httpMethod", "type": ["null","string"], "default":null}, + {"name": "httpStatusCode", "type": "int", "default":0}, + {"name": "responseSize", "type": "int", "default":0}, + {"name": "referrer", "type": ["null","string"], "default":null}, + {"name": "userAgent", "type": ["null","string"], "default":null} + ] +} diff --git a/gora-jet/src/test/avro/resultPageView.json b/gora-jet/src/test/avro/resultPageView.json new file mode 100644 index 0000000..50eb914 --- /dev/null +++ b/gora-jet/src/test/avro/resultPageView.json @@ -0,0 +1,10 @@ +{ + "type": "record", + "name": "ResultPageView", "default":null, + "namespace": "org.apache.gora.jet.generated", + "fields" : [ + {"name": "url", "type": ["null","string"], "default":null}, + {"name": "timestamp", "type": "long", "default":0}, + {"name": "ip", "type": ["null","string"], "default":null} + ] +} diff --git a/gora-jet/src/test/java/org/apache/gora/jet/JetTest.java b/gora-jet/src/test/java/org/apache/gora/jet/JetTest.java new file mode 100644 index 0000000..1f813db --- /dev/null +++ b/gora-jet/src/test/java/org/apache/gora/jet/JetTest.java @@ -0,0 +1,150 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gora.jet; + +import com.hazelcast.core.IMap; +import com.hazelcast.jet.Jet; +import com.hazelcast.jet.JetInstance; +import com.hazelcast.jet.pipeline.BatchSource; +import com.hazelcast.jet.pipeline.Pipeline; +import com.hazelcast.jet.pipeline.Sinks; +import org.apache.gora.jet.generated.Pageview; +import org.apache.gora.jet.generated.ResultPageView; +import org.apache.gora.query.Query; +import org.apache.gora.query.Result; +import org.apache.gora.store.DataStore; +import org.apache.gora.store.DataStoreFactory; +import org.apache.gora.util.GoraException; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.regex.Pattern; + +import static com.hazelcast.jet.Traversers.traverseArray; +import static com.hazelcast.jet.aggregate.AggregateOperations.counting; +import static com.hazelcast.jet.function.Functions.wholeItem; +import static org.junit.Assert.assertEquals; + +/** + * Test case for jet sink and source connectors. + */ +public class JetTest { + + private static DataStore<Long, ResultPageView> dataStoreOut; + private static Query<Long, Pageview> query = null; + + private static HBaseTestingUtility utility; + + @BeforeClass + public static void insertData() throws Exception { + utility = new HBaseTestingUtility(); + utility.startMiniCluster(); + + dataStoreOut = DataStoreFactory.getDataStore(Long.class, ResultPageView.class, utility.getConfiguration()); + + ResultPageView resultPageView = new ResultPageView(); + resultPageView.setIp("88.240.129.183"); + resultPageView.setTimestamp(123L); + resultPageView.setUrl("I am the the one"); + + ResultPageView resultPageView1 = new ResultPageView(); + resultPageView1.setIp("87.240.129.170"); + resultPageView1.setTimestamp(124L); + resultPageView1.setUrl("How are you"); + + ResultPageView resultPageView2 = new ResultPageView(); + resultPageView1.setIp("88.240.129.183"); + resultPageView1.setTimestamp(124L); + resultPageView1.setUrl("This is the jet engine"); + + dataStoreOut.put(1L,resultPageView); + dataStoreOut.put(2L,resultPageView1); + dataStoreOut.put(3L,resultPageView2); + dataStoreOut.flush(); + + } + + @Test + public void testNewJetSource() throws Exception { + + DataStore<Long, Pageview> dataStoreIn; + + dataStoreIn = DataStoreFactory.getDataStore(Long.class, Pageview.class, utility.getConfiguration()); + + dataStoreOut = DataStoreFactory.getDataStore(Long.class, ResultPageView.class, utility.getConfiguration()); + + query = dataStoreIn.newQuery(); + query.setStartKey(0L); + query.setEndKey(55L); + + JetEngine<Long, Pageview, Long, ResultPageView> jetEngine = new JetEngine<>(); + BatchSource<JetInputOutputFormat<Long, Pageview>> fileSource = jetEngine.createDataSource(dataStoreIn, query); + Pipeline p = Pipeline.create(); + p.drawFrom(fileSource) + .filter(item -> item.getValue().getIp().toString().equals("88.240.129.183")) + .map(e -> { + ResultPageView resultPageView = new ResultPageView(); + resultPageView.setIp(e.getValue().getIp()); + resultPageView.setTimestamp(e.getValue().getTimestamp()); + resultPageView.setUrl(e.getValue().getUrl()); + return new JetInputOutputFormat<Long, ResultPageView>(e.getValue().getTimestamp(), resultPageView); + }) + .drainTo(jetEngine.createDataSink(dataStoreOut)); + + JetInstance jet = Jet.newJetInstance(); + Jet.newJetInstance(); + try { + jet.newJob(p).join(); + } finally { + Jet.shutdownAll(); + } + + Query<Long, ResultPageView> query = dataStoreOut.newQuery(); + Result<Long, ResultPageView> result = query.execute(); + int noOfOutputRecords = 0; + String ip = ""; + while (result.next()) { + noOfOutputRecords++; + ip = result.get().getIp().toString(); + assertEquals("88.240.129.183", ip); + } + assertEquals(2, noOfOutputRecords); + } + + @Test + public void jetWordCount() throws GoraException { + dataStoreOut = DataStoreFactory.getDataStore(Long.class, ResultPageView.class, utility.getConfiguration()); + + Query<Long, ResultPageView> query = dataStoreOut.newQuery(); + JetEngine<Long, ResultPageView, Long, ResultPageView> jetEngine = new JetEngine<>(); + + Pattern delimiter = Pattern.compile("\\W+"); + Pipeline p = Pipeline.create(); + p.drawFrom(jetEngine.createDataSource(dataStoreOut, query)) + .flatMap(e -> traverseArray(delimiter.split(e.getValue().getUrl().toString()))) + .filter(word -> !word.isEmpty()) + .groupingKey(wholeItem()) + .aggregate(counting()) + .drainTo(Sinks.map("COUNTS")); + JetInstance jet = Jet.newJetInstance();; + jet.newJob(p).join(); + IMap<String, Long> counts = jet.getMap("COUNTS"); + assertEquals(3L, (long)counts.get("the")); + } +} diff --git a/gora-jet/src/test/java/org/apache/gora/jet/generated/Pageview.java b/gora-jet/src/test/java/org/apache/gora/jet/generated/Pageview.java new file mode 100644 index 0000000..9cef268 --- /dev/null +++ b/gora-jet/src/test/java/org/apache/gora/jet/generated/Pageview.java @@ -0,0 +1,881 @@ +/** + *Licensed to the Apache Software Foundation (ASF) under one + *or more contributor license agreements. See the NOTICE file + *distributed with this work for additional information + *regarding copyright ownership. The ASF licenses this file + *to you under the Apache License, Version 2.0 (the" + *License"); you may not use this file except in compliance + *with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + *Unless required by applicable law or agreed to in writing, software + *distributed under the License is distributed on an "AS IS" BASIS, + *WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + *See the License for the specific language governing permissions and + *limitations under the License. + */ +package org.apache.gora.jet.generated; + +public class Pageview extends org.apache.gora.persistency.impl.PersistentBase implements org.apache.avro.specific.SpecificRecord, org.apache.gora.persistency.Persistent { + public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Pageview\",\"namespace\":\"org.apache.gora.jet.generated\",\"fields\":[{\"name\":\"url\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"timestamp\",\"type\":\"long\",\"default\":0},{\"name\":\"ip\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"httpMethod\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"httpStatusCode\ [...] + private static final long serialVersionUID = 483475731342327584L; + /** Enum containing all data bean's fields. */ + public static enum Field { + URL(0, "url"), + TIMESTAMP(1, "timestamp"), + IP(2, "ip"), + HTTP_METHOD(3, "httpMethod"), + HTTP_STATUS_CODE(4, "httpStatusCode"), + RESPONSE_SIZE(5, "responseSize"), + REFERRER(6, "referrer"), + USER_AGENT(7, "userAgent"), + ; + /** + * Field's index. + */ + private int index; + + /** + * Field's name. + */ + private String name; + + /** + * Field's constructor + * @param index field's index. + * @param name field's name. + */ + Field(int index, String name) {this.index=index;this.name=name;} + + /** + * Gets field's index. + * @return int field's index. + */ + public int getIndex() {return index;} + + /** + * Gets field's name. + * @return String field's name. + */ + public String getName() {return name;} + + /** + * Gets field's attributes to string. + * @return String field's attributes to string. + */ + public String toString() {return name;} + }; + + public static final String[] _ALL_FIELDS = { + "url", + "timestamp", + "ip", + "httpMethod", + "httpStatusCode", + "responseSize", + "referrer", + "userAgent", + }; + + /** + * Gets the total field count. + * @return int field count + */ + public int getFieldsCount() { + return Pageview._ALL_FIELDS.length; + } + + private java.lang.CharSequence url; + private long timestamp; + private java.lang.CharSequence ip; + private java.lang.CharSequence httpMethod; + private int httpStatusCode; + private int responseSize; + private java.lang.CharSequence referrer; + private java.lang.CharSequence userAgent; + public org.apache.avro.Schema getSchema() { return SCHEMA$; } + // Used by DatumWriter. Applications should not call. + public java.lang.Object get(int field$) { + switch (field$) { + case 0: return url; + case 1: return timestamp; + case 2: return ip; + case 3: return httpMethod; + case 4: return httpStatusCode; + case 5: return responseSize; + case 6: return referrer; + case 7: return userAgent; + default: throw new org.apache.avro.AvroRuntimeException("Bad index"); + } + } + + // Used by DatumReader. Applications should not call. + @SuppressWarnings(value="unchecked") + public void put(int field$, java.lang.Object value) { + switch (field$) { + case 0: url = (java.lang.CharSequence)(value); break; + case 1: timestamp = (java.lang.Long)(value); break; + case 2: ip = (java.lang.CharSequence)(value); break; + case 3: httpMethod = (java.lang.CharSequence)(value); break; + case 4: httpStatusCode = (java.lang.Integer)(value); break; + case 5: responseSize = (java.lang.Integer)(value); break; + case 6: referrer = (java.lang.CharSequence)(value); break; + case 7: userAgent = (java.lang.CharSequence)(value); break; + default: throw new org.apache.avro.AvroRuntimeException("Bad index"); + } + } + + /** + * Gets the value of the 'url' field. + */ + public java.lang.CharSequence getUrl() { + return url; + } + + /** + * Sets the value of the 'url' field. + * @param value the value to set. + */ + public void setUrl(java.lang.CharSequence value) { + this.url = value; + setDirty(0); + } + + /** + * Checks the dirty status of the 'url' field. A field is dirty if it represents a change that has not yet been written to the database. + * @param value the value to set. + */ + public boolean isUrlDirty() { + return isDirty(0); + } + + /** + * Gets the value of the 'timestamp' field. + */ + public java.lang.Long getTimestamp() { + return timestamp; + } + + /** + * Sets the value of the 'timestamp' field. + * @param value the value to set. + */ + public void setTimestamp(java.lang.Long value) { + this.timestamp = value; + setDirty(1); + } + + /** + * Checks the dirty status of the 'timestamp' field. A field is dirty if it represents a change that has not yet been written to the database. + * @param value the value to set. + */ + public boolean isTimestampDirty() { + return isDirty(1); + } + + /** + * Gets the value of the 'ip' field. + */ + public java.lang.CharSequence getIp() { + return ip; + } + + /** + * Sets the value of the 'ip' field. + * @param value the value to set. + */ + public void setIp(java.lang.CharSequence value) { + this.ip = value; + setDirty(2); + } + + /** + * Checks the dirty status of the 'ip' field. A field is dirty if it represents a change that has not yet been written to the database. + * @param value the value to set. + */ + public boolean isIpDirty() { + return isDirty(2); + } + + /** + * Gets the value of the 'httpMethod' field. + */ + public java.lang.CharSequence getHttpMethod() { + return httpMethod; + } + + /** + * Sets the value of the 'httpMethod' field. + * @param value the value to set. + */ + public void setHttpMethod(java.lang.CharSequence value) { + this.httpMethod = value; + setDirty(3); + } + + /** + * Checks the dirty status of the 'httpMethod' field. A field is dirty if it represents a change that has not yet been written to the database. + * @param value the value to set. + */ + public boolean isHttpMethodDirty() { + return isDirty(3); + } + + /** + * Gets the value of the 'httpStatusCode' field. + */ + public java.lang.Integer getHttpStatusCode() { + return httpStatusCode; + } + + /** + * Sets the value of the 'httpStatusCode' field. + * @param value the value to set. + */ + public void setHttpStatusCode(java.lang.Integer value) { + this.httpStatusCode = value; + setDirty(4); + } + + /** + * Checks the dirty status of the 'httpStatusCode' field. A field is dirty if it represents a change that has not yet been written to the database. + * @param value the value to set. + */ + public boolean isHttpStatusCodeDirty() { + return isDirty(4); + } + + /** + * Gets the value of the 'responseSize' field. + */ + public java.lang.Integer getResponseSize() { + return responseSize; + } + + /** + * Sets the value of the 'responseSize' field. + * @param value the value to set. + */ + public void setResponseSize(java.lang.Integer value) { + this.responseSize = value; + setDirty(5); + } + + /** + * Checks the dirty status of the 'responseSize' field. A field is dirty if it represents a change that has not yet been written to the database. + * @param value the value to set. + */ + public boolean isResponseSizeDirty() { + return isDirty(5); + } + + /** + * Gets the value of the 'referrer' field. + */ + public java.lang.CharSequence getReferrer() { + return referrer; + } + + /** + * Sets the value of the 'referrer' field. + * @param value the value to set. + */ + public void setReferrer(java.lang.CharSequence value) { + this.referrer = value; + setDirty(6); + } + + /** + * Checks the dirty status of the 'referrer' field. A field is dirty if it represents a change that has not yet been written to the database. + * @param value the value to set. + */ + public boolean isReferrerDirty() { + return isDirty(6); + } + + /** + * Gets the value of the 'userAgent' field. + */ + public java.lang.CharSequence getUserAgent() { + return userAgent; + } + + /** + * Sets the value of the 'userAgent' field. + * @param value the value to set. + */ + public void setUserAgent(java.lang.CharSequence value) { + this.userAgent = value; + setDirty(7); + } + + /** + * Checks the dirty status of the 'userAgent' field. A field is dirty if it represents a change that has not yet been written to the database. + * @param value the value to set. + */ + public boolean isUserAgentDirty() { + return isDirty(7); + } + + /** Creates a new Pageview RecordBuilder */ + public static org.apache.gora.jet.generated.Pageview.Builder newBuilder() { + return new org.apache.gora.jet.generated.Pageview.Builder(); + } + + /** Creates a new Pageview RecordBuilder by copying an existing Builder */ + public static org.apache.gora.jet.generated.Pageview.Builder newBuilder(org.apache.gora.jet.generated.Pageview.Builder other) { + return new org.apache.gora.jet.generated.Pageview.Builder(other); + } + + /** Creates a new Pageview RecordBuilder by copying an existing Pageview instance */ + public static org.apache.gora.jet.generated.Pageview.Builder newBuilder(org.apache.gora.jet.generated.Pageview other) { + return new org.apache.gora.jet.generated.Pageview.Builder(other); + } + + @Override + public org.apache.gora.jet.generated.Pageview clone() { + return newBuilder(this).build(); + } + + private static java.nio.ByteBuffer deepCopyToReadOnlyBuffer( + java.nio.ByteBuffer input) { + java.nio.ByteBuffer copy = java.nio.ByteBuffer.allocate(input.capacity()); + int position = input.position(); + input.reset(); + int mark = input.position(); + int limit = input.limit(); + input.rewind(); + input.limit(input.capacity()); + copy.put(input); + input.rewind(); + copy.rewind(); + input.position(mark); + input.mark(); + copy.position(mark); + copy.mark(); + input.position(position); + copy.position(position); + input.limit(limit); + copy.limit(limit); + return copy.asReadOnlyBuffer(); + } + + /** + * RecordBuilder for Pageview instances. + */ + public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase<Pageview> + implements org.apache.avro.data.RecordBuilder<Pageview> { + + private java.lang.CharSequence url; + private long timestamp; + private java.lang.CharSequence ip; + private java.lang.CharSequence httpMethod; + private int httpStatusCode; + private int responseSize; + private java.lang.CharSequence referrer; + private java.lang.CharSequence userAgent; + + /** Creates a new Builder */ + private Builder() { + super(org.apache.gora.jet.generated.Pageview.SCHEMA$); + } + + /** Creates a Builder by copying an existing Builder */ + private Builder(org.apache.gora.jet.generated.Pageview.Builder other) { + super(other); + } + + /** Creates a Builder by copying an existing Pageview instance */ + private Builder(org.apache.gora.jet.generated.Pageview other) { + super(org.apache.gora.jet.generated.Pageview.SCHEMA$); + if (isValidValue(fields()[0], other.url)) { + this.url = (java.lang.CharSequence) data().deepCopy(fields()[0].schema(), other.url); + fieldSetFlags()[0] = true; + } + if (isValidValue(fields()[1], other.timestamp)) { + this.timestamp = (java.lang.Long) data().deepCopy(fields()[1].schema(), other.timestamp); + fieldSetFlags()[1] = true; + } + if (isValidValue(fields()[2], other.ip)) { + this.ip = (java.lang.CharSequence) data().deepCopy(fields()[2].schema(), other.ip); + fieldSetFlags()[2] = true; + } + if (isValidValue(fields()[3], other.httpMethod)) { + this.httpMethod = (java.lang.CharSequence) data().deepCopy(fields()[3].schema(), other.httpMethod); + fieldSetFlags()[3] = true; + } + if (isValidValue(fields()[4], other.httpStatusCode)) { + this.httpStatusCode = (java.lang.Integer) data().deepCopy(fields()[4].schema(), other.httpStatusCode); + fieldSetFlags()[4] = true; + } + if (isValidValue(fields()[5], other.responseSize)) { + this.responseSize = (java.lang.Integer) data().deepCopy(fields()[5].schema(), other.responseSize); + fieldSetFlags()[5] = true; + } + if (isValidValue(fields()[6], other.referrer)) { + this.referrer = (java.lang.CharSequence) data().deepCopy(fields()[6].schema(), other.referrer); + fieldSetFlags()[6] = true; + } + if (isValidValue(fields()[7], other.userAgent)) { + this.userAgent = (java.lang.CharSequence) data().deepCopy(fields()[7].schema(), other.userAgent); + fieldSetFlags()[7] = true; + } + } + + /** Gets the value of the 'url' field */ + public java.lang.CharSequence getUrl() { + return url; + } + + /** Sets the value of the 'url' field */ + public org.apache.gora.jet.generated.Pageview.Builder setUrl(java.lang.CharSequence value) { + validate(fields()[0], value); + this.url = value; + fieldSetFlags()[0] = true; + return this; + } + + /** Checks whether the 'url' field has been set */ + public boolean hasUrl() { + return fieldSetFlags()[0]; + } + + /** Clears the value of the 'url' field */ + public org.apache.gora.jet.generated.Pageview.Builder clearUrl() { + url = null; + fieldSetFlags()[0] = false; + return this; + } + + /** Gets the value of the 'timestamp' field */ + public java.lang.Long getTimestamp() { + return timestamp; + } + + /** Sets the value of the 'timestamp' field */ + public org.apache.gora.jet.generated.Pageview.Builder setTimestamp(long value) { + validate(fields()[1], value); + this.timestamp = value; + fieldSetFlags()[1] = true; + return this; + } + + /** Checks whether the 'timestamp' field has been set */ + public boolean hasTimestamp() { + return fieldSetFlags()[1]; + } + + /** Clears the value of the 'timestamp' field */ + public org.apache.gora.jet.generated.Pageview.Builder clearTimestamp() { + fieldSetFlags()[1] = false; + return this; + } + + /** Gets the value of the 'ip' field */ + public java.lang.CharSequence getIp() { + return ip; + } + + /** Sets the value of the 'ip' field */ + public org.apache.gora.jet.generated.Pageview.Builder setIp(java.lang.CharSequence value) { + validate(fields()[2], value); + this.ip = value; + fieldSetFlags()[2] = true; + return this; + } + + /** Checks whether the 'ip' field has been set */ + public boolean hasIp() { + return fieldSetFlags()[2]; + } + + /** Clears the value of the 'ip' field */ + public org.apache.gora.jet.generated.Pageview.Builder clearIp() { + ip = null; + fieldSetFlags()[2] = false; + return this; + } + + /** Gets the value of the 'httpMethod' field */ + public java.lang.CharSequence getHttpMethod() { + return httpMethod; + } + + /** Sets the value of the 'httpMethod' field */ + public org.apache.gora.jet.generated.Pageview.Builder setHttpMethod(java.lang.CharSequence value) { + validate(fields()[3], value); + this.httpMethod = value; + fieldSetFlags()[3] = true; + return this; + } + + /** Checks whether the 'httpMethod' field has been set */ + public boolean hasHttpMethod() { + return fieldSetFlags()[3]; + } + + /** Clears the value of the 'httpMethod' field */ + public org.apache.gora.jet.generated.Pageview.Builder clearHttpMethod() { + httpMethod = null; + fieldSetFlags()[3] = false; + return this; + } + + /** Gets the value of the 'httpStatusCode' field */ + public java.lang.Integer getHttpStatusCode() { + return httpStatusCode; + } + + /** Sets the value of the 'httpStatusCode' field */ + public org.apache.gora.jet.generated.Pageview.Builder setHttpStatusCode(int value) { + validate(fields()[4], value); + this.httpStatusCode = value; + fieldSetFlags()[4] = true; + return this; + } + + /** Checks whether the 'httpStatusCode' field has been set */ + public boolean hasHttpStatusCode() { + return fieldSetFlags()[4]; + } + + /** Clears the value of the 'httpStatusCode' field */ + public org.apache.gora.jet.generated.Pageview.Builder clearHttpStatusCode() { + fieldSetFlags()[4] = false; + return this; + } + + /** Gets the value of the 'responseSize' field */ + public java.lang.Integer getResponseSize() { + return responseSize; + } + + /** Sets the value of the 'responseSize' field */ + public org.apache.gora.jet.generated.Pageview.Builder setResponseSize(int value) { + validate(fields()[5], value); + this.responseSize = value; + fieldSetFlags()[5] = true; + return this; + } + + /** Checks whether the 'responseSize' field has been set */ + public boolean hasResponseSize() { + return fieldSetFlags()[5]; + } + + /** Clears the value of the 'responseSize' field */ + public org.apache.gora.jet.generated.Pageview.Builder clearResponseSize() { + fieldSetFlags()[5] = false; + return this; + } + + /** Gets the value of the 'referrer' field */ + public java.lang.CharSequence getReferrer() { + return referrer; + } + + /** Sets the value of the 'referrer' field */ + public org.apache.gora.jet.generated.Pageview.Builder setReferrer(java.lang.CharSequence value) { + validate(fields()[6], value); + this.referrer = value; + fieldSetFlags()[6] = true; + return this; + } + + /** Checks whether the 'referrer' field has been set */ + public boolean hasReferrer() { + return fieldSetFlags()[6]; + } + + /** Clears the value of the 'referrer' field */ + public org.apache.gora.jet.generated.Pageview.Builder clearReferrer() { + referrer = null; + fieldSetFlags()[6] = false; + return this; + } + + /** Gets the value of the 'userAgent' field */ + public java.lang.CharSequence getUserAgent() { + return userAgent; + } + + /** Sets the value of the 'userAgent' field */ + public org.apache.gora.jet.generated.Pageview.Builder setUserAgent(java.lang.CharSequence value) { + validate(fields()[7], value); + this.userAgent = value; + fieldSetFlags()[7] = true; + return this; + } + + /** Checks whether the 'userAgent' field has been set */ + public boolean hasUserAgent() { + return fieldSetFlags()[7]; + } + + /** Clears the value of the 'userAgent' field */ + public org.apache.gora.jet.generated.Pageview.Builder clearUserAgent() { + userAgent = null; + fieldSetFlags()[7] = false; + return this; + } + + @Override + public Pageview build() { + try { + Pageview record = new Pageview(); + record.url = fieldSetFlags()[0] ? this.url : (java.lang.CharSequence) defaultValue(fields()[0]); + record.timestamp = fieldSetFlags()[1] ? this.timestamp : (java.lang.Long) defaultValue(fields()[1]); + record.ip = fieldSetFlags()[2] ? this.ip : (java.lang.CharSequence) defaultValue(fields()[2]); + record.httpMethod = fieldSetFlags()[3] ? this.httpMethod : (java.lang.CharSequence) defaultValue(fields()[3]); + record.httpStatusCode = fieldSetFlags()[4] ? this.httpStatusCode : (java.lang.Integer) defaultValue(fields()[4]); + record.responseSize = fieldSetFlags()[5] ? this.responseSize : (java.lang.Integer) defaultValue(fields()[5]); + record.referrer = fieldSetFlags()[6] ? this.referrer : (java.lang.CharSequence) defaultValue(fields()[6]); + record.userAgent = fieldSetFlags()[7] ? this.userAgent : (java.lang.CharSequence) defaultValue(fields()[7]); + return record; + } catch (Exception e) { + throw new org.apache.avro.AvroRuntimeException(e); + } + } + } + + public Pageview.Tombstone getTombstone(){ + return TOMBSTONE; + } + + public Pageview newInstance(){ + return newBuilder().build(); + } + + private static final Tombstone TOMBSTONE = new Tombstone(); + + public static final class Tombstone extends Pageview implements org.apache.gora.persistency.Tombstone { + + private Tombstone() { } + + /** + * Gets the value of the 'url' field. + */ + public java.lang.CharSequence getUrl() { + throw new java.lang.UnsupportedOperationException("Get is not supported on tombstones"); + } + + /** + * Sets the value of the 'url' field. + * @param value the value to set. + */ + public void setUrl(java.lang.CharSequence value) { + throw new java.lang.UnsupportedOperationException("Set is not supported on tombstones"); + } + + /** + * Checks the dirty status of the 'url' field. A field is dirty if it represents a change that has not yet been written to the database. + * @param value the value to set. + */ + public boolean isUrlDirty() { + throw new java.lang.UnsupportedOperationException("IsDirty is not supported on tombstones"); + } + + /** + * Gets the value of the 'timestamp' field. + */ + public java.lang.Long getTimestamp() { + throw new java.lang.UnsupportedOperationException("Get is not supported on tombstones"); + } + + /** + * Sets the value of the 'timestamp' field. + * @param value the value to set. + */ + public void setTimestamp(java.lang.Long value) { + throw new java.lang.UnsupportedOperationException("Set is not supported on tombstones"); + } + + /** + * Checks the dirty status of the 'timestamp' field. A field is dirty if it represents a change that has not yet been written to the database. + * @param value the value to set. + */ + public boolean isTimestampDirty() { + throw new java.lang.UnsupportedOperationException("IsDirty is not supported on tombstones"); + } + + /** + * Gets the value of the 'ip' field. + */ + public java.lang.CharSequence getIp() { + throw new java.lang.UnsupportedOperationException("Get is not supported on tombstones"); + } + + /** + * Sets the value of the 'ip' field. + * @param value the value to set. + */ + public void setIp(java.lang.CharSequence value) { + throw new java.lang.UnsupportedOperationException("Set is not supported on tombstones"); + } + + /** + * Checks the dirty status of the 'ip' field. A field is dirty if it represents a change that has not yet been written to the database. + * @param value the value to set. + */ + public boolean isIpDirty() { + throw new java.lang.UnsupportedOperationException("IsDirty is not supported on tombstones"); + } + + /** + * Gets the value of the 'httpMethod' field. + */ + public java.lang.CharSequence getHttpMethod() { + throw new java.lang.UnsupportedOperationException("Get is not supported on tombstones"); + } + + /** + * Sets the value of the 'httpMethod' field. + * @param value the value to set. + */ + public void setHttpMethod(java.lang.CharSequence value) { + throw new java.lang.UnsupportedOperationException("Set is not supported on tombstones"); + } + + /** + * Checks the dirty status of the 'httpMethod' field. A field is dirty if it represents a change that has not yet been written to the database. + * @param value the value to set. + */ + public boolean isHttpMethodDirty() { + throw new java.lang.UnsupportedOperationException("IsDirty is not supported on tombstones"); + } + + /** + * Gets the value of the 'httpStatusCode' field. + */ + public java.lang.Integer getHttpStatusCode() { + throw new java.lang.UnsupportedOperationException("Get is not supported on tombstones"); + } + + /** + * Sets the value of the 'httpStatusCode' field. + * @param value the value to set. + */ + public void setHttpStatusCode(java.lang.Integer value) { + throw new java.lang.UnsupportedOperationException("Set is not supported on tombstones"); + } + + /** + * Checks the dirty status of the 'httpStatusCode' field. A field is dirty if it represents a change that has not yet been written to the database. + * @param value the value to set. + */ + public boolean isHttpStatusCodeDirty() { + throw new java.lang.UnsupportedOperationException("IsDirty is not supported on tombstones"); + } + + /** + * Gets the value of the 'responseSize' field. + */ + public java.lang.Integer getResponseSize() { + throw new java.lang.UnsupportedOperationException("Get is not supported on tombstones"); + } + + /** + * Sets the value of the 'responseSize' field. + * @param value the value to set. + */ + public void setResponseSize(java.lang.Integer value) { + throw new java.lang.UnsupportedOperationException("Set is not supported on tombstones"); + } + + /** + * Checks the dirty status of the 'responseSize' field. A field is dirty if it represents a change that has not yet been written to the database. + * @param value the value to set. + */ + public boolean isResponseSizeDirty() { + throw new java.lang.UnsupportedOperationException("IsDirty is not supported on tombstones"); + } + + /** + * Gets the value of the 'referrer' field. + */ + public java.lang.CharSequence getReferrer() { + throw new java.lang.UnsupportedOperationException("Get is not supported on tombstones"); + } + + /** + * Sets the value of the 'referrer' field. + * @param value the value to set. + */ + public void setReferrer(java.lang.CharSequence value) { + throw new java.lang.UnsupportedOperationException("Set is not supported on tombstones"); + } + + /** + * Checks the dirty status of the 'referrer' field. A field is dirty if it represents a change that has not yet been written to the database. + * @param value the value to set. + */ + public boolean isReferrerDirty() { + throw new java.lang.UnsupportedOperationException("IsDirty is not supported on tombstones"); + } + + /** + * Gets the value of the 'userAgent' field. + */ + public java.lang.CharSequence getUserAgent() { + throw new java.lang.UnsupportedOperationException("Get is not supported on tombstones"); + } + + /** + * Sets the value of the 'userAgent' field. + * @param value the value to set. + */ + public void setUserAgent(java.lang.CharSequence value) { + throw new java.lang.UnsupportedOperationException("Set is not supported on tombstones"); + } + + /** + * Checks the dirty status of the 'userAgent' field. A field is dirty if it represents a change that has not yet been written to the database. + * @param value the value to set. + */ + public boolean isUserAgentDirty() { + throw new java.lang.UnsupportedOperationException("IsDirty is not supported on tombstones"); + } + + + } + + private static final org.apache.avro.io.DatumWriter + DATUM_WRITER$ = new org.apache.avro.specific.SpecificDatumWriter(SCHEMA$); + private static final org.apache.avro.io.DatumReader + DATUM_READER$ = new org.apache.avro.specific.SpecificDatumReader(SCHEMA$); + + /** + * Writes AVRO data bean to output stream in the form of AVRO Binary encoding format. This will transform + * AVRO data bean from its Java object form to it s serializable form. + * + * @param out java.io.ObjectOutput output stream to write data bean in serializable form + */ + @Override + public void writeExternal(java.io.ObjectOutput out) + throws java.io.IOException { + out.write(super.getDirtyBytes().array()); + DATUM_WRITER$.write(this, org.apache.avro.io.EncoderFactory.get() + .directBinaryEncoder((java.io.OutputStream) out, + null)); + } + + /** + * Reads AVRO data bean from input stream in it s AVRO Binary encoding format to Java object format. + * This will transform AVRO data bean from it s serializable form to deserialized Java object form. + * + * @param in java.io.ObjectOutput input stream to read data bean in serializable form + */ + @Override + public void readExternal(java.io.ObjectInput in) + throws java.io.IOException { + byte[] __g__dirty = new byte[getFieldsCount()]; + in.read(__g__dirty); + super.setDirtyBytes(java.nio.ByteBuffer.wrap(__g__dirty)); + DATUM_READER$.read(this, org.apache.avro.io.DecoderFactory.get() + .directBinaryDecoder((java.io.InputStream) in, + null)); + } + +} + diff --git a/gora-jet/src/test/java/org/apache/gora/jet/generated/ResultPageView.java b/gora-jet/src/test/java/org/apache/gora/jet/generated/ResultPageView.java new file mode 100644 index 0000000..c1f43fb --- /dev/null +++ b/gora-jet/src/test/java/org/apache/gora/jet/generated/ResultPageView.java @@ -0,0 +1,468 @@ +/** + *Licensed to the Apache Software Foundation (ASF) under one + *or more contributor license agreements. See the NOTICE file + *distributed with this work for additional information + *regarding copyright ownership. The ASF licenses this file + *to you under the Apache License, Version 2.0 (the" + *License"); you may not use this file except in compliance + *with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + *Unless required by applicable law or agreed to in writing, software + *distributed under the License is distributed on an "AS IS" BASIS, + *WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + *See the License for the specific language governing permissions and + *limitations under the License. + */ +package org.apache.gora.jet.generated; + +public class ResultPageView extends org.apache.gora.persistency.impl.PersistentBase implements org.apache.avro.specific.SpecificRecord, org.apache.gora.persistency.Persistent { + public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"ResultPageView\",\"namespace\":\"org.apache.gora.jet.generated\",\"fields\":[{\"name\":\"url\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"timestamp\",\"type\":\"long\",\"default\":0},{\"name\":\"ip\",\"type\":[\"null\",\"string\"],\"default\":null}],\"default\":null}"); + private static final long serialVersionUID = -7453871077431322534L; + /** Enum containing all data bean's fields. */ + public static enum Field { + URL(0, "url"), + TIMESTAMP(1, "timestamp"), + IP(2, "ip"), + ; + /** + * Field's index. + */ + private int index; + + /** + * Field's name. + */ + private String name; + + /** + * Field's constructor + * @param index field's index. + * @param name field's name. + */ + Field(int index, String name) {this.index=index;this.name=name;} + + /** + * Gets field's index. + * @return int field's index. + */ + public int getIndex() {return index;} + + /** + * Gets field's name. + * @return String field's name. + */ + public String getName() {return name;} + + /** + * Gets field's attributes to string. + * @return String field's attributes to string. + */ + public String toString() {return name;} + }; + + public static final String[] _ALL_FIELDS = { + "url", + "timestamp", + "ip", + }; + + /** + * Gets the total field count. + * @return int field count + */ + public int getFieldsCount() { + return ResultPageView._ALL_FIELDS.length; + } + + private java.lang.CharSequence url; + private long timestamp; + private java.lang.CharSequence ip; + public org.apache.avro.Schema getSchema() { return SCHEMA$; } + // Used by DatumWriter. Applications should not call. + public java.lang.Object get(int field$) { + switch (field$) { + case 0: return url; + case 1: return timestamp; + case 2: return ip; + default: throw new org.apache.avro.AvroRuntimeException("Bad index"); + } + } + + // Used by DatumReader. Applications should not call. + @SuppressWarnings(value="unchecked") + public void put(int field$, java.lang.Object value) { + switch (field$) { + case 0: url = (java.lang.CharSequence)(value); break; + case 1: timestamp = (java.lang.Long)(value); break; + case 2: ip = (java.lang.CharSequence)(value); break; + default: throw new org.apache.avro.AvroRuntimeException("Bad index"); + } + } + + /** + * Gets the value of the 'url' field. + */ + public java.lang.CharSequence getUrl() { + return url; + } + + /** + * Sets the value of the 'url' field. + * @param value the value to set. + */ + public void setUrl(java.lang.CharSequence value) { + this.url = value; + setDirty(0); + } + + /** + * Checks the dirty status of the 'url' field. A field is dirty if it represents a change that has not yet been written to the database. + * @param value the value to set. + */ + public boolean isUrlDirty() { + return isDirty(0); + } + + /** + * Gets the value of the 'timestamp' field. + */ + public java.lang.Long getTimestamp() { + return timestamp; + } + + /** + * Sets the value of the 'timestamp' field. + * @param value the value to set. + */ + public void setTimestamp(java.lang.Long value) { + this.timestamp = value; + setDirty(1); + } + + /** + * Checks the dirty status of the 'timestamp' field. A field is dirty if it represents a change that has not yet been written to the database. + * @param value the value to set. + */ + public boolean isTimestampDirty() { + return isDirty(1); + } + + /** + * Gets the value of the 'ip' field. + */ + public java.lang.CharSequence getIp() { + return ip; + } + + /** + * Sets the value of the 'ip' field. + * @param value the value to set. + */ + public void setIp(java.lang.CharSequence value) { + this.ip = value; + setDirty(2); + } + + /** + * Checks the dirty status of the 'ip' field. A field is dirty if it represents a change that has not yet been written to the database. + * @param value the value to set. + */ + public boolean isIpDirty() { + return isDirty(2); + } + + /** Creates a new ResultPageView RecordBuilder */ + public static org.apache.gora.jet.generated.ResultPageView.Builder newBuilder() { + return new org.apache.gora.jet.generated.ResultPageView.Builder(); + } + + /** Creates a new ResultPageView RecordBuilder by copying an existing Builder */ + public static org.apache.gora.jet.generated.ResultPageView.Builder newBuilder(org.apache.gora.jet.generated.ResultPageView.Builder other) { + return new org.apache.gora.jet.generated.ResultPageView.Builder(other); + } + + /** Creates a new ResultPageView RecordBuilder by copying an existing ResultPageView instance */ + public static org.apache.gora.jet.generated.ResultPageView.Builder newBuilder(org.apache.gora.jet.generated.ResultPageView other) { + return new org.apache.gora.jet.generated.ResultPageView.Builder(other); + } + + @Override + public org.apache.gora.jet.generated.ResultPageView clone() { + return newBuilder(this).build(); + } + + private static java.nio.ByteBuffer deepCopyToReadOnlyBuffer( + java.nio.ByteBuffer input) { + java.nio.ByteBuffer copy = java.nio.ByteBuffer.allocate(input.capacity()); + int position = input.position(); + input.reset(); + int mark = input.position(); + int limit = input.limit(); + input.rewind(); + input.limit(input.capacity()); + copy.put(input); + input.rewind(); + copy.rewind(); + input.position(mark); + input.mark(); + copy.position(mark); + copy.mark(); + input.position(position); + copy.position(position); + input.limit(limit); + copy.limit(limit); + return copy.asReadOnlyBuffer(); + } + + /** + * RecordBuilder for ResultPageView instances. + */ + public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase<ResultPageView> + implements org.apache.avro.data.RecordBuilder<ResultPageView> { + + private java.lang.CharSequence url; + private long timestamp; + private java.lang.CharSequence ip; + + /** Creates a new Builder */ + private Builder() { + super(org.apache.gora.jet.generated.ResultPageView.SCHEMA$); + } + + /** Creates a Builder by copying an existing Builder */ + private Builder(org.apache.gora.jet.generated.ResultPageView.Builder other) { + super(other); + } + + /** Creates a Builder by copying an existing ResultPageView instance */ + private Builder(org.apache.gora.jet.generated.ResultPageView other) { + super(org.apache.gora.jet.generated.ResultPageView.SCHEMA$); + if (isValidValue(fields()[0], other.url)) { + this.url = (java.lang.CharSequence) data().deepCopy(fields()[0].schema(), other.url); + fieldSetFlags()[0] = true; + } + if (isValidValue(fields()[1], other.timestamp)) { + this.timestamp = (java.lang.Long) data().deepCopy(fields()[1].schema(), other.timestamp); + fieldSetFlags()[1] = true; + } + if (isValidValue(fields()[2], other.ip)) { + this.ip = (java.lang.CharSequence) data().deepCopy(fields()[2].schema(), other.ip); + fieldSetFlags()[2] = true; + } + } + + /** Gets the value of the 'url' field */ + public java.lang.CharSequence getUrl() { + return url; + } + + /** Sets the value of the 'url' field */ + public org.apache.gora.jet.generated.ResultPageView.Builder setUrl(java.lang.CharSequence value) { + validate(fields()[0], value); + this.url = value; + fieldSetFlags()[0] = true; + return this; + } + + /** Checks whether the 'url' field has been set */ + public boolean hasUrl() { + return fieldSetFlags()[0]; + } + + /** Clears the value of the 'url' field */ + public org.apache.gora.jet.generated.ResultPageView.Builder clearUrl() { + url = null; + fieldSetFlags()[0] = false; + return this; + } + + /** Gets the value of the 'timestamp' field */ + public java.lang.Long getTimestamp() { + return timestamp; + } + + /** Sets the value of the 'timestamp' field */ + public org.apache.gora.jet.generated.ResultPageView.Builder setTimestamp(long value) { + validate(fields()[1], value); + this.timestamp = value; + fieldSetFlags()[1] = true; + return this; + } + + /** Checks whether the 'timestamp' field has been set */ + public boolean hasTimestamp() { + return fieldSetFlags()[1]; + } + + /** Clears the value of the 'timestamp' field */ + public org.apache.gora.jet.generated.ResultPageView.Builder clearTimestamp() { + fieldSetFlags()[1] = false; + return this; + } + + /** Gets the value of the 'ip' field */ + public java.lang.CharSequence getIp() { + return ip; + } + + /** Sets the value of the 'ip' field */ + public org.apache.gora.jet.generated.ResultPageView.Builder setIp(java.lang.CharSequence value) { + validate(fields()[2], value); + this.ip = value; + fieldSetFlags()[2] = true; + return this; + } + + /** Checks whether the 'ip' field has been set */ + public boolean hasIp() { + return fieldSetFlags()[2]; + } + + /** Clears the value of the 'ip' field */ + public org.apache.gora.jet.generated.ResultPageView.Builder clearIp() { + ip = null; + fieldSetFlags()[2] = false; + return this; + } + + @Override + public ResultPageView build() { + try { + ResultPageView record = new ResultPageView(); + record.url = fieldSetFlags()[0] ? this.url : (java.lang.CharSequence) defaultValue(fields()[0]); + record.timestamp = fieldSetFlags()[1] ? this.timestamp : (java.lang.Long) defaultValue(fields()[1]); + record.ip = fieldSetFlags()[2] ? this.ip : (java.lang.CharSequence) defaultValue(fields()[2]); + return record; + } catch (Exception e) { + throw new org.apache.avro.AvroRuntimeException(e); + } + } + } + + public ResultPageView.Tombstone getTombstone(){ + return TOMBSTONE; + } + + public ResultPageView newInstance(){ + return newBuilder().build(); + } + + private static final Tombstone TOMBSTONE = new Tombstone(); + + public static final class Tombstone extends ResultPageView implements org.apache.gora.persistency.Tombstone { + + private Tombstone() { } + + /** + * Gets the value of the 'url' field. + */ + public java.lang.CharSequence getUrl() { + throw new java.lang.UnsupportedOperationException("Get is not supported on tombstones"); + } + + /** + * Sets the value of the 'url' field. + * @param value the value to set. + */ + public void setUrl(java.lang.CharSequence value) { + throw new java.lang.UnsupportedOperationException("Set is not supported on tombstones"); + } + + /** + * Checks the dirty status of the 'url' field. A field is dirty if it represents a change that has not yet been written to the database. + * @param value the value to set. + */ + public boolean isUrlDirty() { + throw new java.lang.UnsupportedOperationException("IsDirty is not supported on tombstones"); + } + + /** + * Gets the value of the 'timestamp' field. + */ + public java.lang.Long getTimestamp() { + throw new java.lang.UnsupportedOperationException("Get is not supported on tombstones"); + } + + /** + * Sets the value of the 'timestamp' field. + * @param value the value to set. + */ + public void setTimestamp(java.lang.Long value) { + throw new java.lang.UnsupportedOperationException("Set is not supported on tombstones"); + } + + /** + * Checks the dirty status of the 'timestamp' field. A field is dirty if it represents a change that has not yet been written to the database. + * @param value the value to set. + */ + public boolean isTimestampDirty() { + throw new java.lang.UnsupportedOperationException("IsDirty is not supported on tombstones"); + } + + /** + * Gets the value of the 'ip' field. + */ + public java.lang.CharSequence getIp() { + throw new java.lang.UnsupportedOperationException("Get is not supported on tombstones"); + } + + /** + * Sets the value of the 'ip' field. + * @param value the value to set. + */ + public void setIp(java.lang.CharSequence value) { + throw new java.lang.UnsupportedOperationException("Set is not supported on tombstones"); + } + + /** + * Checks the dirty status of the 'ip' field. A field is dirty if it represents a change that has not yet been written to the database. + * @param value the value to set. + */ + public boolean isIpDirty() { + throw new java.lang.UnsupportedOperationException("IsDirty is not supported on tombstones"); + } + + + } + + private static final org.apache.avro.io.DatumWriter + DATUM_WRITER$ = new org.apache.avro.specific.SpecificDatumWriter(SCHEMA$); + private static final org.apache.avro.io.DatumReader + DATUM_READER$ = new org.apache.avro.specific.SpecificDatumReader(SCHEMA$); + + /** + * Writes AVRO data bean to output stream in the form of AVRO Binary encoding format. This will transform + * AVRO data bean from its Java object form to it s serializable form. + * + * @param out java.io.ObjectOutput output stream to write data bean in serializable form + */ + @Override + public void writeExternal(java.io.ObjectOutput out) + throws java.io.IOException { + out.write(super.getDirtyBytes().array()); + DATUM_WRITER$.write(this, org.apache.avro.io.EncoderFactory.get() + .directBinaryEncoder((java.io.OutputStream) out, + null)); + } + + /** + * Reads AVRO data bean from input stream in it s AVRO Binary encoding format to Java object format. + * This will transform AVRO data bean from it s serializable form to deserialized Java object form. + * + * @param in java.io.ObjectOutput input stream to read data bean in serializable form + */ + @Override + public void readExternal(java.io.ObjectInput in) + throws java.io.IOException { + byte[] __g__dirty = new byte[getFieldsCount()]; + in.read(__g__dirty); + super.setDirtyBytes(java.nio.ByteBuffer.wrap(__g__dirty)); + DATUM_READER$.read(this, org.apache.avro.io.DecoderFactory.get() + .directBinaryDecoder((java.io.InputStream) in, + null)); + } + +} + diff --git a/gora-tutorial/pom.xml b/gora-tutorial/pom.xml index e89ff95..d0fcf8f 100644 --- a/gora-tutorial/pom.xml +++ b/gora-tutorial/pom.xml @@ -105,6 +105,11 @@ <dependency> <groupId>org.apache.gora</groupId> + <artifactId>gora-jet</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.gora</groupId> <artifactId>gora-jcache</artifactId> </dependency> diff --git a/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsJet.java b/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsJet.java new file mode 100644 index 0000000..e17d490 --- /dev/null +++ b/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsJet.java @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gora.tutorial.log; + +import com.hazelcast.jet.Jet; +import com.hazelcast.jet.JetInstance; +import com.hazelcast.jet.pipeline.Pipeline; +import org.apache.gora.jet.JetEngine; +import org.apache.gora.jet.JetInputOutputFormat; +import org.apache.gora.query.Query; +import org.apache.gora.store.DataStore; +import org.apache.gora.store.DataStoreFactory; +import org.apache.gora.tutorial.log.generated.MetricDatum; +import org.apache.gora.tutorial.log.generated.Pageview; +import org.apache.hadoop.conf.Configuration; + +import java.util.Map; + +import static com.hazelcast.jet.aggregate.AggregateOperations.counting; +import static com.hazelcast.jet.aggregate.AggregateOperations.groupingBy; + +/** + * LogAnalyticsJet is the tutorial class to illustrate Gora's Hazelcast Jet API. + * The analytics jet reads the web access data stored earlier by the + * {@link LogManager}, and calculates the aggregate daily pageviews. The + * output is stored in a Gora compatible data store. + * + * <p>See the tutorial.html file in docs or go to the + * <a href="http://gora.apache.org/current/tutorial.html"> + * web site</a>for more information.</p> + */ +public class LogAnalyticsJet { + + private static DataStore<Long, Pageview> inStore; + private static DataStore<String, MetricDatum> outStore; + + /** + * The number of miliseconds in a day + */ + private static final long DAY_MILIS = 1000 * 60 * 60 * 24; + + /** + * In the main method pageviews are fetched though the jet source connector. + * Then those are grouped by url and day. Then a counting aggregator is + * applied to calculate the aggregated daily pageviews. Then the result is + * output through the jet sink connector to a gora compatible data store. + */ + public static void main(String[] args) throws Exception{ + + Configuration conf = new Configuration(); + + inStore = DataStoreFactory.getDataStore(Long.class, Pageview.class, conf); + outStore = DataStoreFactory.getDataStore(String.class, MetricDatum.class, conf); + + Query<Long, Pageview> query = inStore.newQuery(); + JetEngine<Long, Pageview, String, MetricDatum> jetEngine = new JetEngine<>(); + + Pipeline p = Pipeline.create(); + p.drawFrom(jetEngine.createDataSource(inStore, query)) + .groupingKey(e -> e.getValue().getUrl().toString()) + .aggregate(groupingBy(e -> getDay(e.getValue().getTimestamp()), counting())) + .map(e -> { + MetricDatum metricDatum = new MetricDatum(); + String url = e.getKey(); + for (Map.Entry<Long, Long> item : e.getValue().entrySet()) { + long timeStamp = item.getKey(); + long sum = item.getKey(); + metricDatum.setTimestamp(timeStamp); + metricDatum.setMetric(sum); + } + metricDatum.setMetricDimension(url); + return new JetInputOutputFormat<String, MetricDatum>(url + "_" + "ip", metricDatum); + }) + .peek() + .drainTo(jetEngine.createDataSink(outStore)); + + JetInstance jet = Jet.newJetInstance(); + try { + jet.newJob(p).join(); + } finally { + Jet.shutdownAll(); + } + } + + /** + * Rolls up the given timestamp to the day cardinality, so that + * data can be aggregated daily + */ + private static long getDay(long timeStamp) { + return (timeStamp / DAY_MILIS) * DAY_MILIS; + } + +} diff --git a/pom.xml b/pom.xml index 6710e88..8f82b20 100755 --- a/pom.xml +++ b/pom.xml @@ -801,6 +801,7 @@ <module>gora-kudu</module> <module>gora-hive</module> <module>gora-redis</module> + <module>gora-jet</module> <module>gora-tutorial</module> <module>gora-benchmark</module> <module>sources-dist</module> @@ -856,7 +857,8 @@ <!-- JCache Dependencies --> <jsr107.api.version>1.0.0</jsr107.api.version> - <hazelcast.version>3.6.4</hazelcast.version> + <hazelcast.version>3.12.2</hazelcast.version> + <hazelcast.jet.version>3.1</hazelcast.jet.version> <!-- OrientDB Dependencies --> <orientdb.version>2.2.22</orientdb.version> @@ -973,6 +975,12 @@ <dependency> <groupId>org.apache.gora</groupId> + <artifactId>gora-jet</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.gora</groupId> <artifactId>gora-hbase</artifactId> <version>${project.version}</version> </dependency> @@ -1763,6 +1771,12 @@ </exclusions> </dependency> + <dependency> + <groupId>com.hazelcast.jet</groupId> + <artifactId>hazelcast-jet</artifactId> + <version>${hazelcast.jet.version}</version> + </dependency> + <!--Aerospike Dependency --> <dependency> <groupId>com.aerospike</groupId>