[jira] [Comment Edited] (FLINK-8318) Conflict jackson library with ElasticSearch connector
[ https://issues.apache.org/jira/browse/FLINK-8318?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16313362#comment-16313362 ] Jihyun Cho edited comment on FLINK-8318 at 1/5/18 4:14 PM: --- Here is my code and pom files. {code:title=Test.scala} import java.net.{InetAddress, InetSocketAddress} import java.util.{Properties, TimeZone} import org.apache.commons.lang3.time.FastDateFormat import org.apache.flink.api.common.functions.RuntimeContext import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer} import org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010 import org.elasticsearch.action.index.IndexRequest import org.elasticsearch.client.Requests import org.json4s.DefaultFormats import org.json4s.JsonAST._ import org.json4s.native.JsonMethods._ import org.json4s.native.Serialization.write object Test { val consumeProperties = { val props = new Properties() props.setProperty("bootstrap.servers", "kafka-001:9092") props.setProperty("group.id", "test") props.setProperty("auto.offset.reset", "latest") props } def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val stream = env.addSource(new FlinkKafkaConsumer010[String]("test", new SimpleStringSchema(), consumeProperties)) val config = new java.util.HashMap[String, String] config.put("cluster.name", "test") val transportAddresses = new java.util.ArrayList[InetSocketAddress] transportAddresses.add(new InetSocketAddress(InetAddress.getByName("es-001"), 9300)) val esSink = new ElasticsearchSink[String](config, transportAddresses, new ElasticsearchSinkFunction[String] { def createIndexRequest(t: String): IndexRequest = { return Requests.indexRequest() .index("test") .`type`("message") .source(t) } override def process(t: String, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer) = { requestIndexer.add(createIndexRequest(t)) } } ) stream.map { value => try { val esDateFormat = FastDateFormat.getInstance("-MM-dd HH:mm:ss.SSS", TimeZone.getTimeZone("UTC")) implicit val formats = DefaultFormats val json = parse(value) val transJson = json transformField { case JField("short_message", JString(s)) => ("message", JString(s)) case JField("host", JString(s)) => ("source", JString(s)) case JField("timestamp", JInt(i)) => ("timestamp", JString(esDateFormat.format((i * 1000L).toLong))) case JField("timestamp", JDouble(d)) => ("timestamp", JString(esDateFormat.format((d * 1000L).toLong))) case JField(k, v) => (k.stripPrefix("_"), v) } write(transJson) } catch { case _: Exception => "" } }.filter(_.nonEmpty).addSink(esSink) env.execute("Test") } } {code} {code:title=pom.xml} http://maven.apache.org/POM/4.0.0; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;> 4.0.0 test.streaming test jar 1.0-SNAPSHOT 1.4.0 org.apache.flink flink-streaming-scala_2.11 ${flink.version} org.apache.flink flink-streaming-contrib_2.11 ${flink.version} org.apache.flink flink-connector-kafka-0.10_2.11 ${flink.version} org.apache.flink flink-connector-elasticsearch5_2.11 ${flink.version} org.json4s json4s-native_2.11 3.5.3 org.apache.maven.plugins maven-compiler-plugin 1.8 1.8 net.alchim31.maven scala-maven-plugin 3.2.2 compile testCompile org.apache.maven.plugins maven-shade-plugin 2.4.3 test.streaming.Test
[jira] [Comment Edited] (FLINK-8318) Conflict jackson library with ElasticSearch connector
[ https://issues.apache.org/jira/browse/FLINK-8318?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16308217#comment-16308217 ] Aljoscha Krettek edited comment on FLINK-8318 at 1/2/18 3:28 PM: - Do you have a specific reason for using {{parent-first}}? I think you case should work if you include ES and Jackson in your user jar and use {{child-first}}, which was introduced for exactly such cases of dependency clashes. was (Author: aljoscha): Do you have a specific reason for using `parent-first`. I think you case should work if you include ES and Jackson in your user jar and use `child-first`, which was introduced for exactly such cases of dependency clashes. > Conflict jackson library with ElasticSearch connector > - > > Key: FLINK-8318 > URL: https://issues.apache.org/jira/browse/FLINK-8318 > Project: Flink > Issue Type: Bug > Components: ElasticSearch Connector, Startup Shell Scripts >Affects Versions: 1.4.0 >Reporter: Jihyun Cho > > My flink job is failed after update flink version to 1.4.0. It uses > ElasticSearch connector. > I'm using CDH Hadoop with Flink option "classloader.resolve-order: > parent-first" > The failure log is below. > {noformat} > Using the result of 'hadoop classpath' to augment the Hadoop classpath: > /etc/hadoop/conf:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop/lib/*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop/.//*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-hdfs/./:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-hdfs/lib/*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-hdfs/.//*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-yarn/lib/*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-yarn/.//*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-mapreduce/lib/*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-mapreduce/.//* > 2017-12-26 14:13:21,160 INFO > org.apache.flink.runtime.taskmanager.TaskManager - > > 2017-12-26 14:13:21,161 INFO > org.apache.flink.runtime.taskmanager.TaskManager - Starting > TaskManager (Version: 1.4.0, Rev:3a9d9f2, Date:06.12.2017 @ 11:08:40 UTC) > 2017-12-26 14:13:21,161 INFO > org.apache.flink.runtime.taskmanager.TaskManager - OS current > user: www > 2017-12-26 14:13:21,446 INFO > org.apache.flink.runtime.taskmanager.TaskManager - Current > Hadoop/Kerberos user: www > 2017-12-26 14:13:21,446 INFO > org.apache.flink.runtime.taskmanager.TaskManager - JVM: Java > HotSpot(TM) 64-Bit Server VM - Oracle Corporation - 1.8/25.131-b11 > 2017-12-26 14:13:21,447 INFO > org.apache.flink.runtime.taskmanager.TaskManager - Maximum heap > size: 31403 MiBytes > 2017-12-26 14:13:21,447 INFO > org.apache.flink.runtime.taskmanager.TaskManager - JAVA_HOME: > (not set) > 2017-12-26 14:13:21,448 INFO > org.apache.flink.runtime.taskmanager.TaskManager - Hadoop > version: 2.6.5 > 2017-12-26 14:13:21,448 INFO > org.apache.flink.runtime.taskmanager.TaskManager - JVM Options: > 2017-12-26 14:13:21,448 INFO > org.apache.flink.runtime.taskmanager.TaskManager - -Xms32768M > 2017-12-26 14:13:21,448 INFO > org.apache.flink.runtime.taskmanager.TaskManager - -Xmx32768M > 2017-12-26 14:13:21,448 INFO > org.apache.flink.runtime.taskmanager.TaskManager - > -XX:MaxDirectMemorySize=8388607T > 2017-12-26 14:13:21,448 INFO > org.apache.flink.runtime.taskmanager.TaskManager - > -Djava.library.path=/home/cloudera/parcels/CDH/lib/hadoop/lib/native/ > 2017-12-26 14:13:21,449 INFO > org.apache.flink.runtime.taskmanager.TaskManager - > -Dlog4j.configuration=file:/home/www/service/flink-1.4.0/conf/log4j-console.properties > 2017-12-26 14:13:21,449 INFO > org.apache.flink.runtime.taskmanager.TaskManager - > -Dlogback.configurationFile=file:/home/www/service/flink-1.4.0/conf/logback-console.xml > 2017-12-26 14:13:21,449 INFO > org.apache.flink.runtime.taskmanager.TaskManager - Program > Arguments: > 2017-12-26 14:13:21,449 INFO > org.apache.flink.runtime.taskmanager.TaskManager - > --configDir > 2017-12-26 14:13:21,449 INFO > org.apache.flink.runtime.taskmanager.TaskManager - > /home/www/service/flink-1.4.0/conf > 2017-12-26 14:13:21,449 INFO >