[ https://issues.apache.org/jira/browse/PIO-168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16641089#comment-16641089 ]
ASF GitHub Bot commented on PIO-168: ------------------------------------ asfgit closed pull request #466: [PIO-168] ES 6.X support + patch version dependency updates URL: https://github.com/apache/predictionio/pull/466 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/.travis.yml b/.travis.yml index bdceb7ea5..abd4ab05c 100644 --- a/.travis.yml +++ b/.travis.yml @@ -59,13 +59,13 @@ env: METADATA_REP=ELASTICSEARCH EVENTDATA_REP=HBASE MODELDATA_REP=LOCALFS PIO_SCALA_VERSION=2.11.8 PIO_SPARK_VERSION=2.0.2 - PIO_ELASTICSEARCH_VERSION=5.5.2 + PIO_ELASTICSEARCH_VERSION=5.6.9 PIO_HADOOP_VERSION=2.6.5 - BUILD_TYPE=Integration METADATA_REP=ELASTICSEARCH EVENTDATA_REP=PGSQL MODELDATA_REP=HDFS PIO_SCALA_VERSION=2.11.8 PIO_SPARK_VERSION=2.0.2 - PIO_ELASTICSEARCH_VERSION=5.5.2 + PIO_ELASTICSEARCH_VERSION=5.6.9 PIO_HADOOP_VERSION=2.6.5 - BUILD_TYPE=Integration METADATA_REP=ELASTICSEARCH EVENTDATA_REP=HBASE MODELDATA_REP=LOCALFS @@ -83,66 +83,66 @@ env: METADATA_REP=ELASTICSEARCH EVENTDATA_REP=ELASTICSEARCH MODELDATA_REP=S3 PIO_SCALA_VERSION=2.11.8 PIO_SPARK_VERSION=2.0.2 - PIO_ELASTICSEARCH_VERSION=5.5.2 + PIO_ELASTICSEARCH_VERSION=5.6.9 PIO_HADOOP_VERSION=2.6.5 - BUILD_TYPE=Unit METADATA_REP=PGSQL EVENTDATA_REP=PGSQL MODELDATA_REP=PGSQL - PIO_SCALA_VERSION=2.11.8 + PIO_SCALA_VERSION=2.11.12 PIO_SPARK_VERSION=2.1.1 - BUILD_TYPE=Integration METADATA_REP=PGSQL EVENTDATA_REP=PGSQL MODELDATA_REP=PGSQL - PIO_SCALA_VERSION=2.11.8 + PIO_SCALA_VERSION=2.11.12 PIO_SPARK_VERSION=2.1.1 - BUILD_TYPE=Integration METADATA_REP=ELASTICSEARCH EVENTDATA_REP=HBASE MODELDATA_REP=LOCALFS - PIO_SCALA_VERSION=2.11.8 + PIO_SCALA_VERSION=2.11.12 PIO_SPARK_VERSION=2.1.1 - PIO_ELASTICSEARCH_VERSION=5.5.2 + PIO_ELASTICSEARCH_VERSION=5.6.9 - BUILD_TYPE=Integration METADATA_REP=ELASTICSEARCH EVENTDATA_REP=PGSQL MODELDATA_REP=HDFS - PIO_SCALA_VERSION=2.11.8 + PIO_SCALA_VERSION=2.11.12 PIO_SPARK_VERSION=2.1.1 - PIO_ELASTICSEARCH_VERSION=5.5.2 + PIO_ELASTICSEARCH_VERSION=5.6.9 - BUILD_TYPE=Integration METADATA_REP=ELASTICSEARCH EVENTDATA_REP=HBASE MODELDATA_REP=LOCALFS - PIO_SCALA_VERSION=2.11.8 + PIO_SCALA_VERSION=2.11.12 PIO_SPARK_VERSION=2.1.1 PIO_ELASTICSEARCH_VERSION=1.7.3 - BUILD_TYPE=Integration METADATA_REP=ELASTICSEARCH EVENTDATA_REP=PGSQL MODELDATA_REP=HDFS - PIO_SCALA_VERSION=2.11.8 + PIO_SCALA_VERSION=2.11.12 PIO_SPARK_VERSION=2.1.1 PIO_ELASTICSEARCH_VERSION=1.7.3 - BUILD_TYPE=Integration METADATA_REP=ELASTICSEARCH EVENTDATA_REP=ELASTICSEARCH MODELDATA_REP=S3 - PIO_SCALA_VERSION=2.11.8 + PIO_SCALA_VERSION=2.11.12 PIO_SPARK_VERSION=2.1.1 - PIO_ELASTICSEARCH_VERSION=5.5.2 + PIO_ELASTICSEARCH_VERSION=5.6.9 - BUILD_TYPE=Unit METADATA_REP=PGSQL EVENTDATA_REP=PGSQL MODELDATA_REP=PGSQL - PIO_SCALA_VERSION=2.11.8 + PIO_SCALA_VERSION=2.11.12 PIO_SPARK_VERSION=2.2.0 - BUILD_TYPE=Integration METADATA_REP=PGSQL EVENTDATA_REP=PGSQL MODELDATA_REP=PGSQL - PIO_SCALA_VERSION=2.11.8 + PIO_SCALA_VERSION=2.11.12 PIO_SPARK_VERSION=2.2.0 - BUILD_TYPE=Integration METADATA_REP=ELASTICSEARCH EVENTDATA_REP=HBASE MODELDATA_REP=LOCALFS - PIO_SCALA_VERSION=2.11.8 + PIO_SCALA_VERSION=2.11.12 PIO_SPARK_VERSION=2.2.0 PIO_ELASTICSEARCH_VERSION=1.7.3 - BUILD_TYPE=Integration METADATA_REP=ELASTICSEARCH EVENTDATA_REP=PGSQL MODELDATA_REP=HDFS - PIO_SCALA_VERSION=2.11.8 + PIO_SCALA_VERSION=2.11.12 PIO_SPARK_VERSION=2.2.0 - PIO_ELASTICSEARCH_VERSION=5.5.2 + PIO_ELASTICSEARCH_VERSION=5.6.9 - BUILD_TYPE=Integration METADATA_REP=ELASTICSEARCH EVENTDATA_REP=ELASTICSEARCH MODELDATA_REP=S3 - PIO_SCALA_VERSION=2.11.8 + PIO_SCALA_VERSION=2.11.12 PIO_SPARK_VERSION=2.2.0 - PIO_ELASTICSEARCH_VERSION=5.5.2 + PIO_ELASTICSEARCH_VERSION=5.6.9 - BUILD_TYPE=Unit METADATA_REP=PGSQL EVENTDATA_REP=PGSQL MODELDATA_REP=PGSQL @@ -164,13 +164,13 @@ env: METADATA_REP=ELASTICSEARCH EVENTDATA_REP=PGSQL MODELDATA_REP=HDFS PIO_SCALA_VERSION=2.11.12 PIO_SPARK_VERSION=2.3.1 - PIO_ELASTICSEARCH_VERSION=5.5.2 + PIO_ELASTICSEARCH_VERSION=5.6.9 PIO_HADOOP_VERSION=2.7.7 - BUILD_TYPE=Integration METADATA_REP=ELASTICSEARCH EVENTDATA_REP=ELASTICSEARCH MODELDATA_REP=S3 PIO_SCALA_VERSION=2.11.12 PIO_SPARK_VERSION=2.3.1 - PIO_ELASTICSEARCH_VERSION=5.5.2 + PIO_ELASTICSEARCH_VERSION=5.6.9 PIO_HADOOP_VERSION=2.7.7 - BUILD_TYPE=LicenseCheck diff --git a/LICENSE.txt b/LICENSE.txt index a05f3d83b..3d5c57c7c 100644 --- a/LICENSE.txt +++ b/LICENSE.txt @@ -1703,15 +1703,15 @@ EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -------------------------------------------------------------------------------- Binary distribution bundles - org.scala-lang # scala-library # 2.11.8 (http://scala-lang.org/) - org.scala-lang # scala-compiler # 2.11.8 (http://scala-lang.org/) - org.scala-lang # scala-reflect # 2.11.8 (http://scala-lang.org/) - org.scala-lang # scalap # 2.11.8 (http://scala-lang.org/) + org.scala-lang # scala-library # 2.11.12 (http://scala-lang.org/) + org.scala-lang # scala-compiler # 2.11.12 (http://scala-lang.org/) + org.scala-lang # scala-reflect # 2.11.12 (http://scala-lang.org/) + org.scala-lang # scalap # 2.11.12 (http://scala-lang.org/) org.scala-lang.modules # scala-java8-compat_2.11 # 0.7.0 (http://scala-lang.org/) org.scala-lang.modules # scala-parser-combinators_2.11 # 1.0.4 (http://scala-lang.org/) org.scala-lang.modules # scala-parser-combinators_2.11 # 1.0.6 (http://scala-lang.org/) org.scala-lang.modules # scala-xml_2.11 # 1.0.3 (http://scala-lang.org/) - org.scala-lang.modules # scala-xml_2.11 # 1.0.4 (http://scala-lang.org/) + org.scala-lang.modules # scala-xml_2.11 # 1.0.5 (http://scala-lang.org/) org.scala-lang.modules # scala-xml_2.11 # 1.0.6 (http://scala-lang.org/) which is available under the BSD license (http://www.scala-lang.org/downloads/license.html) diff --git a/bin/install.sh b/bin/install.sh index d785b1237..93e4eb501 100755 --- a/bin/install.sh +++ b/bin/install.sh @@ -20,7 +20,7 @@ OS=`uname` SPARK_VERSION=2.1.1 # Looks like support for Elasticsearch 2.0 will require 2.0 so deferring -ELASTICSEARCH_VERSION=5.5.2 +ELASTICSEARCH_VERSION=5.6.9 HBASE_VERSION=1.2.6 POSTGRES_VERSION=42.0.0 MYSQL_VERSION=5.1.41 diff --git a/build.sbt b/build.sbt index 8b69b0338..e0b875b93 100644 --- a/build.sbt +++ b/build.sbt @@ -41,11 +41,11 @@ version in ThisBuild := "0.14.0-SNAPSHOT" organization in ThisBuild := "org.apache.predictionio" -scalaVersion in ThisBuild := sys.props.getOrElse("scala.version", "2.11.8") +scalaVersion in ThisBuild := sys.props.getOrElse("scala.version", "2.11.12") scalaBinaryVersion in ThisBuild := binaryVersion(scalaVersion.value) -crossScalaVersions in ThisBuild := Seq("2.11.8") +crossScalaVersions in ThisBuild := Seq("2.11.12") scalacOptions in ThisBuild ++= Seq("-deprecation", "-unchecked", "-feature") @@ -56,7 +56,7 @@ javacOptions in (ThisBuild, compile) ++= Seq("-source", "1.8", "-target", "1.8", "-Xlint:deprecation", "-Xlint:unchecked") // Ignore differentiation of Spark patch levels -sparkVersion in ThisBuild := sys.props.getOrElse("spark.version", "2.1.1") +sparkVersion in ThisBuild := sys.props.getOrElse("spark.version", "2.1.2") sparkBinaryVersion in ThisBuild := binaryVersion(sparkVersion.value) @@ -64,7 +64,7 @@ akkaVersion in ThisBuild := sys.props.getOrElse( "akka.version", scalaSparkDepsVersion(scalaBinaryVersion.value)(sparkBinaryVersion.value)("akka")) -lazy val es = sys.props.getOrElse("elasticsearch.version", "5.5.2") +lazy val es = sys.props.getOrElse("elasticsearch.version", "5.6.9") elasticsearchVersion in ThisBuild := es diff --git a/conf/pio-env.sh.template b/conf/pio-env.sh.template index 16ebcd31f..3cd2415f9 100644 --- a/conf/pio-env.sh.template +++ b/conf/pio-env.sh.template @@ -89,7 +89,7 @@ PIO_STORAGE_SOURCES_PGSQL_PASSWORD=pio # PIO_STORAGE_SOURCES_ELASTICSEARCH_HOSTS=localhost # PIO_STORAGE_SOURCES_ELASTICSEARCH_PORTS=9200 # PIO_STORAGE_SOURCES_ELASTICSEARCH_SCHEMES=http -# PIO_STORAGE_SOURCES_ELASTICSEARCH_HOME=$PIO_HOME/vendors/elasticsearch-5.5.2 +# PIO_STORAGE_SOURCES_ELASTICSEARCH_HOME=$PIO_HOME/vendors/elasticsearch-5.6.9 # Optional basic HTTP auth # PIO_STORAGE_SOURCES_ELASTICSEARCH_USERNAME=my-name # PIO_STORAGE_SOURCES_ELASTICSEARCH_PASSWORD=my-secret diff --git a/conf/pio-vendors.sh b/conf/pio-vendors.sh index 162372f53..0489e855b 100644 --- a/conf/pio-vendors.sh +++ b/conf/pio-vendors.sh @@ -32,7 +32,7 @@ if [ -z "$PIO_HADOOP_VERSION" ]; then fi if [ -z "$PIO_ELASTICSEARCH_VERSION" ]; then - PIO_ELASTICSEARCH_VERSION="5.5.2" + PIO_ELASTICSEARCH_VERSION="5.6.9" fi ES_MAJOR=`echo $PIO_ELASTICSEARCH_VERSION | awk -F. '{print $1}'` @@ -42,7 +42,7 @@ if [ "$ES_MAJOR" = "1" ]; then export ES_TAG="1" else export ES_IMAGE="docker.elastic.co/elasticsearch/elasticsearch" - export ES_TAG="5.5.2" + export ES_TAG="5.6.9" fi PGSQL_JAR=postgresql-9.4-1204.jdbc41.jar diff --git a/docs/manual/data/versions.yml b/docs/manual/data/versions.yml index 31a065998..46b03e0e3 100644 --- a/docs/manual/data/versions.yml +++ b/docs/manual/data/versions.yml @@ -1,7 +1,7 @@ pio: 0.13.0 spark: 2.1.1 spark_download_filename: spark-2.1.1-bin-hadoop2.6 -elasticsearch_download_filename: elasticsearch-5.5.2 +elasticsearch_download_filename: elasticsearch-5.6.9 hbase_version: 1.2.6 hbase_basename: hbase-1.2.6 hbase_variant: bin diff --git a/docs/manual/source/install/install-sourcecode.html.md.erb b/docs/manual/source/install/install-sourcecode.html.md.erb index ed6996533..ee3085c95 100644 --- a/docs/manual/source/install/install-sourcecode.html.md.erb +++ b/docs/manual/source/install/install-sourcecode.html.md.erb @@ -30,7 +30,7 @@ building against * Scala 2.11.8 * Spark 2.1.1 * Hadoop 2.7.3 -* Elasticsearch 5.5.2 +* Elasticsearch 5.6.9 Download [binary release from an Apache mirror](https://www.apache.org/dyn/closer.lua/predictionio/<%= data.versions.pio @@ -105,7 +105,7 @@ Apache PredictionIO®. By default, the build will be against * Scala 2.11.8 * Spark 2.1.1 * Hadoop 2.7.3 -* Elasticsearch 5.5.2 +* Elasticsearch 5.6.9 ``` $ tar zxvf apache-predictionio-<%= data.versions.pio %>.tar.gz diff --git a/project/build.properties b/project/build.properties index 0cd8b0798..5f528e474 100644 --- a/project/build.properties +++ b/project/build.properties @@ -1 +1 @@ -sbt.version=1.2.3 +sbt.version=1.2.3 \ No newline at end of file diff --git a/storage/elasticsearch/build.sbt b/storage/elasticsearch/build.sbt index 754aefbc7..f340136eb 100644 --- a/storage/elasticsearch/build.sbt +++ b/storage/elasticsearch/build.sbt @@ -21,12 +21,12 @@ name := "apache-predictionio-data-elasticsearch" elasticsearchSparkArtifact := (if (majorVersion(sparkVersion.value) == 2) "elasticsearch-spark-20" else "elasticsearch-spark-13") -elasticsearchVersion := (if (majorVersion(elasticsearchVersion.value) < 5) "5.5.2" else elasticsearchVersion.value) +elasticsearchVersion := (if (majorVersion(elasticsearchVersion.value) < 5) "5.6.9" else elasticsearchVersion.value) libraryDependencies ++= Seq( "org.apache.predictionio" %% "apache-predictionio-core" % version.value % "provided", "org.apache.spark" %% "spark-core" % sparkVersion.value % "provided", - "org.elasticsearch.client" % "rest" % elasticsearchVersion.value, + "org.elasticsearch.client" % "elasticsearch-rest-client" % elasticsearchVersion.value, "org.elasticsearch" %% elasticsearchSparkArtifact.value % elasticsearchVersion.value exclude("org.apache.spark", "*"), "org.elasticsearch" % "elasticsearch-hadoop-mr" % elasticsearchVersion.value, diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala index 73ef1d0d2..15f223f81 100644 --- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala +++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala @@ -40,17 +40,18 @@ class ESAccessKeys(client: RestClient, config: StorageClientConfig, index: Strin extends AccessKeys with Logging { implicit val formats = DefaultFormats.lossless private val estype = "accesskeys" - - ESUtils.createIndex(client, index, - ESUtils.getNumberOfShards(config, index.toUpperCase), - ESUtils.getNumberOfReplicas(config, index.toUpperCase)) + private val internalIndex = index + "_" + estype + + ESUtils.createIndex(client, internalIndex, + ESUtils.getNumberOfShards(config, internalIndex.toUpperCase), + ESUtils.getNumberOfReplicas(config, internalIndex.toUpperCase)) val mappingJson = (estype -> ("_all" -> ("enabled" -> false)) ~ ("properties" -> ("key" -> ("type" -> "keyword")) ~ ("events" -> ("type" -> "keyword")))) - ESUtils.createMapping(client, index, estype, compact(render(mappingJson))) + ESUtils.createMapping(client, internalIndex, estype, compact(render(mappingJson))) def insert(accessKey: AccessKey): Option[String] = { val key = if (accessKey.key.isEmpty) generateKey else accessKey.key @@ -65,7 +66,7 @@ class ESAccessKeys(client: RestClient, config: StorageClientConfig, index: Strin try { val response = client.performRequest( "GET", - s"/$index/$estype/$id", + s"/$internalIndex/$estype/$id", Map.empty[String, String].asJava) val jsonResponse = parse(EntityUtils.toString(response.getEntity)) (jsonResponse \ "found").extract[Boolean] match { @@ -79,11 +80,11 @@ class ESAccessKeys(client: RestClient, config: StorageClientConfig, index: Strin e.getResponse.getStatusLine.getStatusCode match { case 404 => None case _ => - error(s"Failed to access to /$index/$estype/$id", e) + error(s"Failed to access to /$internalIndex/$estype/$id", e) None } case e: IOException => - error(s"Failed to access to /$index/$estype/$id", e) + error(s"Failed to access to /$internalIndex/$estype/$id", e) None } } @@ -93,10 +94,10 @@ class ESAccessKeys(client: RestClient, config: StorageClientConfig, index: Strin val json = ("query" -> ("match_all" -> List.empty)) - ESUtils.getAll[AccessKey](client, index, estype, compact(render(json))) + ESUtils.getAll[AccessKey](client, internalIndex, estype, compact(render(json))) } catch { case e: IOException => - error("Failed to access to /$index/$estype/_search", e) + error("Failed to access to /$internalIndex/$estype/_search", e) Nil } } @@ -107,10 +108,10 @@ class ESAccessKeys(client: RestClient, config: StorageClientConfig, index: Strin ("query" -> ("term" -> ("appid" -> appid))) - ESUtils.getAll[AccessKey](client, index, estype, compact(render(json))) + ESUtils.getAll[AccessKey](client, internalIndex, estype, compact(render(json))) } catch { case e: IOException => - error("Failed to access to /$index/$estype/_search", e) + error("Failed to access to /$internalIndex/$estype/_search", e) Nil } } @@ -121,7 +122,7 @@ class ESAccessKeys(client: RestClient, config: StorageClientConfig, index: Strin val entity = new NStringEntity(write(accessKey), ContentType.APPLICATION_JSON) val response = client.performRequest( "POST", - s"/$index/$estype/$id", + s"/$internalIndex/$estype/$id", Map("refresh" -> "true").asJava, entity) val jsonResponse = parse(EntityUtils.toString(response.getEntity)) @@ -130,11 +131,11 @@ class ESAccessKeys(client: RestClient, config: StorageClientConfig, index: Strin case "created" => case "updated" => case _ => - error(s"[$result] Failed to update $index/$estype/$id") + error(s"[$result] Failed to update $internalIndex/$estype/$id") } } catch { case e: IOException => - error(s"Failed to update $index/$estype/$id", e) + error(s"Failed to update $internalIndex/$estype/$id", e) } } @@ -142,18 +143,18 @@ class ESAccessKeys(client: RestClient, config: StorageClientConfig, index: Strin try { val response = client.performRequest( "DELETE", - s"/$index/$estype/$id", + s"/$internalIndex/$estype/$id", Map("refresh" -> "true").asJava) val json = parse(EntityUtils.toString(response.getEntity)) val result = (json \ "result").extract[String] result match { case "deleted" => case _ => - error(s"[$result] Failed to update $index/$estype/id") + error(s"[$result] Failed to update $internalIndex/$estype/id") } } catch { case e: IOException => - error(s"Failed to update $index/$estype/id", e) + error(s"Failed to update $internalIndex/$estype/id", e) } } } diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala index ba480658e..cb17af8eb 100644 --- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala +++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala @@ -40,18 +40,20 @@ class ESApps(client: RestClient, config: StorageClientConfig, index: String) extends Apps with Logging { implicit val formats = DefaultFormats.lossless private val estype = "apps" - private val seq = new ESSequences(client, config, index) + private val internalIndex = index + "_" + estype - ESUtils.createIndex(client, index, - ESUtils.getNumberOfShards(config, index.toUpperCase), - ESUtils.getNumberOfReplicas(config, index.toUpperCase)) + private val seq = new ESSequences(client, config, internalIndex) + + ESUtils.createIndex(client, internalIndex, + ESUtils.getNumberOfShards(config, internalIndex.toUpperCase), + ESUtils.getNumberOfReplicas(config, internalIndex.toUpperCase)) val mappingJson = (estype -> ("_all" -> ("enabled" -> false)) ~ ("properties" -> ("id" -> ("type" -> "keyword")) ~ ("name" -> ("type" -> "keyword")))) - ESUtils.createMapping(client, index, estype, compact(render(mappingJson))) + ESUtils.createMapping(client, internalIndex, estype, compact(render(mappingJson))) def insert(app: App): Option[Int] = { val id = app.id match { @@ -74,7 +76,7 @@ class ESApps(client: RestClient, config: StorageClientConfig, index: String) try { val response = client.performRequest( "GET", - s"/$index/$estype/$id", + s"/$internalIndex/$estype/$id", Map.empty[String, String].asJava) val jsonResponse = parse(EntityUtils.toString(response.getEntity)) (jsonResponse \ "found").extract[Boolean] match { @@ -88,11 +90,11 @@ class ESApps(client: RestClient, config: StorageClientConfig, index: String) e.getResponse.getStatusLine.getStatusCode match { case 404 => None case _ => - error(s"Failed to access to /$index/$estype/$id", e) + error(s"Failed to access to /$internalIndex/$estype/$id", e) None } case e: IOException => - error(s"Failed to access to /$index/$estype/$id", e) + error(s"Failed to access to /$internalIndex/$estype/$id", e) None } } @@ -106,7 +108,7 @@ class ESApps(client: RestClient, config: StorageClientConfig, index: String) val entity = new NStringEntity(compact(render(json)), ContentType.APPLICATION_JSON) val response = client.performRequest( "POST", - s"/$index/$estype/_search", + s"/$internalIndex/$estype/_search", Map.empty[String, String].asJava, entity) val jsonResponse = parse(EntityUtils.toString(response.getEntity)) @@ -119,7 +121,7 @@ class ESApps(client: RestClient, config: StorageClientConfig, index: String) } } catch { case e: IOException => - error(s"Failed to access to /$index/$estype/_search", e) + error(s"Failed to access to /$internalIndex/$estype/_search", e) None } } @@ -129,10 +131,10 @@ class ESApps(client: RestClient, config: StorageClientConfig, index: String) val json = ("query" -> ("match_all" -> Nil)) - ESUtils.getAll[App](client, index, estype, compact(render(json))) + ESUtils.getAll[App](client, internalIndex, estype, compact(render(json))) } catch { case e: IOException => - error("Failed to access to /$index/$estype/_search", e) + error("Failed to access to /$internalIndex/$estype/_search", e) Nil } } @@ -143,7 +145,7 @@ class ESApps(client: RestClient, config: StorageClientConfig, index: String) val entity = new NStringEntity(write(app), ContentType.APPLICATION_JSON); val response = client.performRequest( "POST", - s"/$index/$estype/$id", + s"/$internalIndex/$estype/$id", Map("refresh" -> "true").asJava, entity) val jsonResponse = parse(EntityUtils.toString(response.getEntity)) @@ -152,11 +154,11 @@ class ESApps(client: RestClient, config: StorageClientConfig, index: String) case "created" => case "updated" => case _ => - error(s"[$result] Failed to update $index/$estype/$id") + error(s"[$result] Failed to update $internalIndex/$estype/$id") } } catch { case e: IOException => - error(s"Failed to update $index/$estype/$id", e) + error(s"Failed to update $internalIndex/$estype/$id", e) } } @@ -164,18 +166,18 @@ class ESApps(client: RestClient, config: StorageClientConfig, index: String) try { val response = client.performRequest( "DELETE", - s"/$index/$estype/$id", + s"/$internalIndex/$estype/$id", Map("refresh" -> "true").asJava) val json = parse(EntityUtils.toString(response.getEntity)) val result = (json \ "result").extract[String] result match { case "deleted" => case _ => - error(s"[$result] Failed to update $index/$estype/$id") + error(s"[$result] Failed to update $internalIndex/$estype/$id") } } catch { case e: IOException => - error(s"Failed to update $index/$estype/id", e) + error(s"Failed to update $internalIndex/$estype/id", e) } } } diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala index b5eb5c8fc..63b108f10 100644 --- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala +++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala @@ -39,17 +39,18 @@ class ESChannels(client: RestClient, config: StorageClientConfig, index: String) extends Channels with Logging { implicit val formats = DefaultFormats.lossless private val estype = "channels" - private val seq = new ESSequences(client, config, index) - - ESUtils.createIndex(client, index, - ESUtils.getNumberOfShards(config, index.toUpperCase), - ESUtils.getNumberOfReplicas(config, index.toUpperCase)) + private val seq = new ESSequences(client, config, internalIndex) + private val internalIndex = index + "_" + estype + + ESUtils.createIndex(client, internalIndex, + ESUtils.getNumberOfShards(config, internalIndex.toUpperCase), + ESUtils.getNumberOfReplicas(config, internalIndex.toUpperCase)) val mappingJson = (estype -> ("_all" -> ("enabled" -> false)) ~ ("properties" -> ("name" -> ("type" -> "keyword")))) - ESUtils.createMapping(client, index, estype, compact(render(mappingJson))) + ESUtils.createMapping(client, internalIndex, estype, compact(render(mappingJson))) def insert(channel: Channel): Option[Int] = { val id = channel.id match { @@ -72,7 +73,7 @@ class ESChannels(client: RestClient, config: StorageClientConfig, index: String) try { val response = client.performRequest( "GET", - s"/$index/$estype/$id", + s"/$internalIndex/$estype/$id", Map.empty[String, String].asJava) val jsonResponse = parse(EntityUtils.toString(response.getEntity)) (jsonResponse \ "found").extract[Boolean] match { @@ -86,11 +87,11 @@ class ESChannels(client: RestClient, config: StorageClientConfig, index: String) e.getResponse.getStatusLine.getStatusCode match { case 404 => None case _ => - error(s"Failed to access to /$index/$estype/$id", e) + error(s"Failed to access to /$internalIndex/$estype/$id", e) None } case e: IOException => - error(s"Failed to access to /$index/$estype/$id", e) + error(s"Failed to access to /$internalIndex/$estype/$id", e) None } } @@ -101,10 +102,10 @@ class ESChannels(client: RestClient, config: StorageClientConfig, index: String) ("query" -> ("term" -> ("appid" -> appid))) - ESUtils.getAll[Channel](client, index, estype, compact(render(json))) + ESUtils.getAll[Channel](client, internalIndex, estype, compact(render(json))) } catch { case e: IOException => - error(s"Failed to access to /$index/$estype/_search", e) + error(s"Failed to access to /$internalIndex/$estype/_search", e) Nil } } @@ -115,7 +116,7 @@ class ESChannels(client: RestClient, config: StorageClientConfig, index: String) val entity = new NStringEntity(write(channel), ContentType.APPLICATION_JSON) val response = client.performRequest( "POST", - s"/$index/$estype/$id", + s"/$internalIndex/$estype/$id", Map("refresh" -> "true").asJava, entity) val json = parse(EntityUtils.toString(response.getEntity)) @@ -124,12 +125,12 @@ class ESChannels(client: RestClient, config: StorageClientConfig, index: String) case "created" => true case "updated" => true case _ => - error(s"[$result] Failed to update $index/$estype/$id") + error(s"[$result] Failed to update $internalIndex/$estype/$id") false } } catch { case e: IOException => - error(s"Failed to update $index/$estype/$id", e) + error(s"Failed to update $internalIndex/$estype/$id", e) false } } @@ -138,18 +139,18 @@ class ESChannels(client: RestClient, config: StorageClientConfig, index: String) try { val response = client.performRequest( "DELETE", - s"/$index/$estype/$id", + s"/$internalIndex/$estype/$id", Map("refresh" -> "true").asJava) val jsonResponse = parse(EntityUtils.toString(response.getEntity)) val result = (jsonResponse \ "result").extract[String] result match { case "deleted" => case _ => - error(s"[$result] Failed to update $index/$estype/$id") + error(s"[$result] Failed to update $internalIndex/$estype/$id") } } catch { case e: IOException => - error(s"Failed to update $index/$estype/$id", e) + error(s"Failed to update $internalIndex/$estype/$id", e) } } } diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala index eec5b64b4..02f7b9824 100644 --- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala +++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala @@ -59,8 +59,8 @@ class ESEngineInstances(client: RestClient, config: StorageClientConfig, index: ("dataSourceParams" -> ("type" -> "keyword")) ~ ("preparatorParams" -> ("type" -> "keyword")) ~ ("algorithmsParams" -> ("type" -> "keyword")) ~ - ("servingParams" -> ("type" -> "keyword")) ~ - ("status" -> ("type" -> "keyword")))) + ("servingParams" -> ("type" -> "keyword")) + )) ESUtils.createMapping(client, index, estype, compact(render(mappingJson))) def insert(i: EngineInstance): String = { diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala index 1706583ad..03b851d49 100644 --- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala +++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala @@ -41,11 +41,12 @@ class ESEvaluationInstances(client: RestClient, config: StorageClientConfig, ind extends EvaluationInstances with Logging { implicit val formats = DefaultFormats + new EvaluationInstanceSerializer private val estype = "evaluation_instances" - private val seq = new ESSequences(client, config, index) - - ESUtils.createIndex(client, index, - ESUtils.getNumberOfShards(config, index.toUpperCase), - ESUtils.getNumberOfReplicas(config, index.toUpperCase)) + private val seq = new ESSequences(client, config, internalIndex) + private val internalIndex = index + "_" + estype + + ESUtils.createIndex(client, internalIndex, + ESUtils.getNumberOfShards(config, internalIndex.toUpperCase), + ESUtils.getNumberOfReplicas(config, internalIndex.toUpperCase)) val mappingJson = (estype -> ("_all" -> ("enabled" -> false)) ~ @@ -59,7 +60,7 @@ class ESEvaluationInstances(client: RestClient, config: StorageClientConfig, ind ("evaluatorResults" -> ("type" -> "text")) ~ ("evaluatorResultsHTML" -> ("enabled" -> false)) ~ ("evaluatorResultsJSON" -> ("enabled" -> false)))) - ESUtils.createMapping(client, index, estype, compact(render(mappingJson))) + ESUtils.createMapping(client, internalIndex, estype, compact(render(mappingJson))) def insert(i: EvaluationInstance): String = { val id = i.id match { @@ -82,7 +83,7 @@ class ESEvaluationInstances(client: RestClient, config: StorageClientConfig, ind try { val response = client.performRequest( "GET", - s"/$index/$estype/$id", + s"/$internalIndex/$estype/$id", Map.empty[String, String].asJava) val jsonResponse = parse(EntityUtils.toString(response.getEntity)) (jsonResponse \ "found").extract[Boolean] match { @@ -96,11 +97,11 @@ class ESEvaluationInstances(client: RestClient, config: StorageClientConfig, ind e.getResponse.getStatusLine.getStatusCode match { case 404 => None case _ => - error(s"Failed to access to /$index/$estype/$id", e) + error(s"Failed to access to /$internalIndex/$estype/$id", e) None } case e: IOException => - error(s"Failed to access to /$index/$estype/$id", e) + error(s"Failed to access to /$internalIndex/$estype/$id", e) None } } @@ -110,10 +111,10 @@ class ESEvaluationInstances(client: RestClient, config: StorageClientConfig, ind val json = ("query" -> ("match_all" -> List.empty)) - ESUtils.getAll[EvaluationInstance](client, index, estype, compact(render(json))) + ESUtils.getAll[EvaluationInstance](client, internalIndex, estype, compact(render(json))) } catch { case e: IOException => - error("Failed to access to /$index/$estype/_search", e) + error("Failed to access to /$internalIndex/$estype/_search", e) Nil } } @@ -127,10 +128,10 @@ class ESEvaluationInstances(client: RestClient, config: StorageClientConfig, ind ("sort" -> ("startTime" -> ("order" -> "desc"))) - ESUtils.getAll[EvaluationInstance](client, index, estype, compact(render(json))) + ESUtils.getAll[EvaluationInstance](client, internalIndex, estype, compact(render(json))) } catch { case e: IOException => - error("Failed to access to /$index/$estype/_search", e) + error("Failed to access to /$internalIndex/$estype/_search", e) Nil } } @@ -141,7 +142,7 @@ class ESEvaluationInstances(client: RestClient, config: StorageClientConfig, ind val entity = new NStringEntity(write(i), ContentType.APPLICATION_JSON) val response = client.performRequest( "POST", - s"/$index/$estype/$id", + s"/$internalIndex/$estype/$id", Map("refresh" -> "true").asJava, entity) val json = parse(EntityUtils.toString(response.getEntity)) @@ -150,11 +151,11 @@ class ESEvaluationInstances(client: RestClient, config: StorageClientConfig, ind case "created" => case "updated" => case _ => - error(s"[$result] Failed to update $index/$estype/$id") + error(s"[$result] Failed to update $internalIndex/$estype/$id") } } catch { case e: IOException => - error(s"Failed to update $index/$estype/$id", e) + error(s"Failed to update $internalIndex/$estype/$id", e) } } @@ -162,18 +163,18 @@ class ESEvaluationInstances(client: RestClient, config: StorageClientConfig, ind try { val response = client.performRequest( "DELETE", - s"/$index/$estype/$id", + s"/$internalIndex/$estype/$id", Map("refresh" -> "true").asJava) val json = parse(EntityUtils.toString(response.getEntity)) val result = (json \ "result").extract[String] result match { case "deleted" => case _ => - error(s"[$result] Failed to update $index/$estype/$id") + error(s"[$result] Failed to update $internalIndex/$estype/$id") } } catch { case e: IOException => - error(s"Failed to update $index/$estype/$id", e) + error(s"Failed to update $internalIndex/$estype/$id", e) } } } diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala index 018ef85c5..d43ecc6cd 100644 --- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala +++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala @@ -38,23 +38,24 @@ import grizzled.slf4j.Logging class ESSequences(client: RestClient, config: StorageClientConfig, index: String) extends Logging { implicit val formats = DefaultFormats private val estype = "sequences" + private val internalIndex = index + "_" + estype - ESUtils.createIndex(client, index, - ESUtils.getNumberOfShards(config, index.toUpperCase), - ESUtils.getNumberOfReplicas(config, index.toUpperCase)) + ESUtils.createIndex(client, internalIndex, + ESUtils.getNumberOfShards(config, internalIndex.toUpperCase), + ESUtils.getNumberOfReplicas(config, internalIndex.toUpperCase)) val mappingJson = (estype -> ("_all" -> ("enabled" -> false)) ~ ("properties" -> ("n" -> ("enabled" -> false)))) - ESUtils.createMapping(client, index, estype, compact(render(mappingJson))) + ESUtils.createMapping(client, internalIndex, estype, compact(render(mappingJson))) def genNext(name: String): Long = { try { val entity = new NStringEntity(write("n" -> name), ContentType.APPLICATION_JSON) val response = client.performRequest( "POST", - s"/$index/$estype/$name", + s"/$internalIndex/$estype/$name", Map("refresh" -> "false").asJava, entity) val jsonResponse = parse(EntityUtils.toString(response.getEntity)) @@ -65,11 +66,11 @@ class ESSequences(client: RestClient, config: StorageClientConfig, index: String case "updated" => (jsonResponse \ "_version").extract[Long] case _ => - throw new IllegalStateException(s"[$result] Failed to update $index/$estype/$name") + throw new IllegalStateException(s"[$result] Failed to update $internalIndex/$estype/$name") } } catch { case e: IOException => - throw new StorageClientException(s"Failed to update $index/$estype/$name", e) + throw new StorageClientException(s"Failed to update $internalIndex/$estype/$name", e) } } } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Elasticsearch 6.x support > ------------------------- > > Key: PIO-168 > URL: https://issues.apache.org/jira/browse/PIO-168 > Project: PredictionIO > Issue Type: New Feature > Components: Core > Affects Versions: 0.13.0 > Reporter: Donald Szeto > Assignee: Alexander Merritt > Priority: Major > Fix For: 0.14.0 > > > This is a JIRA ticket for tracking the pull request from [~emergentorder] > that adds support of Elasticsearch 6.x. -- This message was sent by Atlassian JIRA (v7.6.3#76005)