[jira] [Created] (FLINK-10457) Support SequenceFile for StreamingFileSink
Jihyun Cho created FLINK-10457: -- Summary: Support SequenceFile for StreamingFileSink Key: FLINK-10457 URL: https://issues.apache.org/jira/browse/FLINK-10457 Project: Flink Issue Type: New Feature Components: Streaming Connectors Reporter: Jihyun Cho SequenceFile is major file format in Hadoop eco system. It is simple to manage file and easy to combine with other tools. So we are still needed SequenceFile format, even if the file format supports Parquet and ORC. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8318) Conflict jackson library with ElasticSearch connector
[ https://issues.apache.org/jira/browse/FLINK-8318?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16313362#comment-16313362 ] Jihyun Cho commented on FLINK-8318: --- Here is my code and pom files. {code:title=Test.scala} import java.net.{InetAddress, InetSocketAddress} import java.util.{Properties, TimeZone} import org.apache.commons.lang3.time.FastDateFormat import org.apache.flink.api.common.functions.RuntimeContext import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer} import org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010 import org.elasticsearch.action.index.IndexRequest import org.elasticsearch.client.Requests import org.json4s.DefaultFormats import org.json4s.JsonAST._ import org.json4s.native.JsonMethods._ import org.json4s.native.Serialization.write object Test { val consumeProperties = { val props = new Properties() props.setProperty("bootstrap.servers", "kafka-001:9092") props.setProperty("group.id", "test") props.setProperty("auto.offset.reset", "latest") props } def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val stream = env.addSource(new FlinkKafkaConsumer010[String]("clova-log-dev", new SimpleStringSchema(), consumeProperties)) val config = new java.util.HashMap[String, String] config.put("cluster.name", "test") val transportAddresses = new java.util.ArrayList[InetSocketAddress] transportAddresses.add(new InetSocketAddress(InetAddress.getByName("es-001"), 9300)) val esSink = new ElasticsearchSink[String](config, transportAddresses, new ElasticsearchSinkFunction[String] { def createIndexRequest(t: String): IndexRequest = { return Requests.indexRequest() .index("test") .`type`("message") .source(t) } override def process(t: String, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer) = { requestIndexer.add(createIndexRequest(t)) } } ) stream.map { value => try { val esDateFormat = FastDateFormat.getInstance("-MM-dd HH:mm:ss.SSS", TimeZone.getTimeZone("UTC")) implicit val formats = DefaultFormats val json = parse(value) val transJson = json transformField { case JField("short_message", JString(s)) => ("message", JString(s)) case JField("host", JString(s)) => ("source", JString(s)) case JField("timestamp", JInt(i)) => ("timestamp", JString(esDateFormat.format((i * 1000L).toLong))) case JField("timestamp", JDouble(d)) => ("timestamp", JString(esDateFormat.format((d * 1000L).toLong))) case JField(k, v) => (k.stripPrefix("_"), v) } write(transJson) } catch { case _: Exception => "" } }.filter(_.nonEmpty).addSink(esSink) env.execute("Test") } } {code} {code:title=pom.xml} http://maven.apache.org/POM/4.0.0; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;> 4.0.0 test.streaming test jar 1.0-SNAPSHOT 1.4.0 org.apache.flink flink-streaming-scala_2.11 ${flink.version} org.apache.flink flink-streaming-contrib_2.11 ${flink.version} org.apache.flink flink-connector-kafka-0.10_2.11 ${flink.version} org.apache.flink flink-connector-elasticsearch5_2.11 ${flink.version} org.json4s json4s-native_2.11 3.5.3 org.apache.maven.plugins maven-compiler-plugin 1.8 1.8 net.alchim31.maven scala-maven-plugin 3.2.2 compile testCompile org.apache.maven.plugins maven-shade-plugin 2.4.3 test.streaming.Test ${project.artifactId}
[jira] [Comment Edited] (FLINK-8318) Conflict jackson library with ElasticSearch connector
[ https://issues.apache.org/jira/browse/FLINK-8318?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16313362#comment-16313362 ] Jihyun Cho edited comment on FLINK-8318 at 1/5/18 4:14 PM: --- Here is my code and pom files. {code:title=Test.scala} import java.net.{InetAddress, InetSocketAddress} import java.util.{Properties, TimeZone} import org.apache.commons.lang3.time.FastDateFormat import org.apache.flink.api.common.functions.RuntimeContext import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer} import org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010 import org.elasticsearch.action.index.IndexRequest import org.elasticsearch.client.Requests import org.json4s.DefaultFormats import org.json4s.JsonAST._ import org.json4s.native.JsonMethods._ import org.json4s.native.Serialization.write object Test { val consumeProperties = { val props = new Properties() props.setProperty("bootstrap.servers", "kafka-001:9092") props.setProperty("group.id", "test") props.setProperty("auto.offset.reset", "latest") props } def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val stream = env.addSource(new FlinkKafkaConsumer010[String]("test", new SimpleStringSchema(), consumeProperties)) val config = new java.util.HashMap[String, String] config.put("cluster.name", "test") val transportAddresses = new java.util.ArrayList[InetSocketAddress] transportAddresses.add(new InetSocketAddress(InetAddress.getByName("es-001"), 9300)) val esSink = new ElasticsearchSink[String](config, transportAddresses, new ElasticsearchSinkFunction[String] { def createIndexRequest(t: String): IndexRequest = { return Requests.indexRequest() .index("test") .`type`("message") .source(t) } override def process(t: String, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer) = { requestIndexer.add(createIndexRequest(t)) } } ) stream.map { value => try { val esDateFormat = FastDateFormat.getInstance("-MM-dd HH:mm:ss.SSS", TimeZone.getTimeZone("UTC")) implicit val formats = DefaultFormats val json = parse(value) val transJson = json transformField { case JField("short_message", JString(s)) => ("message", JString(s)) case JField("host", JString(s)) => ("source", JString(s)) case JField("timestamp", JInt(i)) => ("timestamp", JString(esDateFormat.format((i * 1000L).toLong))) case JField("timestamp", JDouble(d)) => ("timestamp", JString(esDateFormat.format((d * 1000L).toLong))) case JField(k, v) => (k.stripPrefix("_"), v) } write(transJson) } catch { case _: Exception => "" } }.filter(_.nonEmpty).addSink(esSink) env.execute("Test") } } {code} {code:title=pom.xml} http://maven.apache.org/POM/4.0.0; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;> 4.0.0 test.streaming test jar 1.0-SNAPSHOT 1.4.0 org.apache.flink flink-streaming-scala_2.11 ${flink.version} org.apache.flink flink-streaming-contrib_2.11 ${flink.version} org.apache.flink flink-connector-kafka-0.10_2.11 ${flink.version} org.apache.flink flink-connector-elasticsearch5_2.11 ${flink.version} org.json4s json4s-native_2.11 3.5.3 org.apache.maven.plugins maven-compiler-plugin 1.8 1.8 net.alchim31.maven scala-maven-plugin 3.2.2 compile testCompile org.apache.maven.plugins maven-shade-plugin 2.4.3 test.streaming.Test
[jira] [Commented] (FLINK-8318) Conflict jackson library with ElasticSearch connector
[ https://issues.apache.org/jira/browse/FLINK-8318?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16312448#comment-16312448 ] Jihyun Cho commented on FLINK-8318: --- When I tryed the new default option {{child-first}}, the error is occured. {noformat} org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function. at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:235) at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:355) at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:282) at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:346) at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:282) at org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:126) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:231) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.ClassCastException: cannot assign instance of test.streaming.Test$$anonfun$main$2 to field org.apache.flink.streaming.api.scala.DataStream$$anon$7.cleanFun$6 of type scala.Function1 in instance of org.apache.flink.streaming.api.scala.DataStream$$anon$7 at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2233) at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1405) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2293) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:433) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:290) at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:248) at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:220) ... 8 more {noformat} So I changed to the previous default, {{parent-first}}. Regardless of the above options, I think that the problem is that Flink loads Hadoop libraries even if it does not set {{HADOOP_CLASSPATH}}. > Conflict jackson library with ElasticSearch connector > - > > Key: FLINK-8318 > URL: https://issues.apache.org/jira/browse/FLINK-8318 > Project: Flink > Issue Type: Bug > Components: ElasticSearch Connector, Startup Shell Scripts >Affects Versions: 1.4.0 >Reporter: Jihyun Cho > > My flink job is failed after update flink version to 1.4.0. It uses > ElasticSearch connector. > I'm using CDH Hadoop with Flink option "classloader.resolve-order: > parent-first" > The failure log is below. > {noformat} > Using the result of 'hadoop classpath' to augment the Hadoop classpath: > /etc/hadoop/conf:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop/lib/*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop/.//*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-hdfs/./:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-hdfs/lib/*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-hdfs/.//*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-yarn/lib/*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-yarn/.//*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-mapreduce/lib/*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-mapreduce/.//* > 2017-12-26 14:13:21,160 INFO > org.apache.flink.runtime.taskmanager.TaskManager - > > 2017-12-26 14:13:21,161 INFO > org.apache.flink.runtime.taskmanager.TaskManager - Starting > TaskManager (Version: 1.4.0, Rev:3a9d9f2, Date:06.12.2017 @ 11:08:40 UTC) > 2017-12-26 14:13:21,161 INFO >
[jira] [Created] (FLINK-8318) Conflict jackson library with ElasticSearch connector
Jihyun Cho created FLINK-8318: - Summary: Conflict jackson library with ElasticSearch connector Key: FLINK-8318 URL: https://issues.apache.org/jira/browse/FLINK-8318 Project: Flink Issue Type: Bug Components: ElasticSearch Connector, Startup Shell Scripts Affects Versions: 1.4.0 Reporter: Jihyun Cho My flink job is failed after update flink version to 1.4.0. It uses ElasticSearch connector. I'm using CDH Hadoop with Flink option "classloader.resolve-order: parent-first" The failure log is below. {noformat} Using the result of 'hadoop classpath' to augment the Hadoop classpath: /etc/hadoop/conf:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop/lib/*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop/.//*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-hdfs/./:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-hdfs/lib/*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-hdfs/.//*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-yarn/lib/*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-yarn/.//*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-mapreduce/lib/*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-mapreduce/.//* 2017-12-26 14:13:21,160 INFO org.apache.flink.runtime.taskmanager.TaskManager - 2017-12-26 14:13:21,161 INFO org.apache.flink.runtime.taskmanager.TaskManager - Starting TaskManager (Version: 1.4.0, Rev:3a9d9f2, Date:06.12.2017 @ 11:08:40 UTC) 2017-12-26 14:13:21,161 INFO org.apache.flink.runtime.taskmanager.TaskManager - OS current user: www 2017-12-26 14:13:21,446 INFO org.apache.flink.runtime.taskmanager.TaskManager - Current Hadoop/Kerberos user: www 2017-12-26 14:13:21,446 INFO org.apache.flink.runtime.taskmanager.TaskManager - JVM: Java HotSpot(TM) 64-Bit Server VM - Oracle Corporation - 1.8/25.131-b11 2017-12-26 14:13:21,447 INFO org.apache.flink.runtime.taskmanager.TaskManager - Maximum heap size: 31403 MiBytes 2017-12-26 14:13:21,447 INFO org.apache.flink.runtime.taskmanager.TaskManager - JAVA_HOME: (not set) 2017-12-26 14:13:21,448 INFO org.apache.flink.runtime.taskmanager.TaskManager - Hadoop version: 2.6.5 2017-12-26 14:13:21,448 INFO org.apache.flink.runtime.taskmanager.TaskManager - JVM Options: 2017-12-26 14:13:21,448 INFO org.apache.flink.runtime.taskmanager.TaskManager - -Xms32768M 2017-12-26 14:13:21,448 INFO org.apache.flink.runtime.taskmanager.TaskManager - -Xmx32768M 2017-12-26 14:13:21,448 INFO org.apache.flink.runtime.taskmanager.TaskManager - -XX:MaxDirectMemorySize=8388607T 2017-12-26 14:13:21,448 INFO org.apache.flink.runtime.taskmanager.TaskManager - -Djava.library.path=/home/cloudera/parcels/CDH/lib/hadoop/lib/native/ 2017-12-26 14:13:21,449 INFO org.apache.flink.runtime.taskmanager.TaskManager - -Dlog4j.configuration=file:/home/www/service/flink-1.4.0/conf/log4j-console.properties 2017-12-26 14:13:21,449 INFO org.apache.flink.runtime.taskmanager.TaskManager - -Dlogback.configurationFile=file:/home/www/service/flink-1.4.0/conf/logback-console.xml 2017-12-26 14:13:21,449 INFO org.apache.flink.runtime.taskmanager.TaskManager - Program Arguments: 2017-12-26 14:13:21,449 INFO org.apache.flink.runtime.taskmanager.TaskManager - --configDir 2017-12-26 14:13:21,449 INFO org.apache.flink.runtime.taskmanager.TaskManager - /home/www/service/flink-1.4.0/conf 2017-12-26 14:13:21,449 INFO org.apache.flink.runtime.taskmanager.TaskManager - Classpath: ...:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-mapreduce/.//jackson-core-2.2.3.jar:... 2017-12-26 14:14:01,393 INFO org.apache.flink.runtime.taskmanager.Task - Source: Custom Source -> Filter -> Map -> Filter -> Sink: Unnamed (3/10) (fb33a6e0c1a7e859eaef9cf8bcf4565e) switched from RUNNING to FAILED. java.lang.NoSuchFieldError: FAIL_ON_SYMBOL_HASH_OVERFLOW at org.elasticsearch.common.xcontent.json.JsonXContent.(JsonXContent.java:76) at org.elasticsearch.common.xcontent.XContentType$1.xContent(XContentType.java:59) at org.elasticsearch.common.settings.Setting.arrayToParsableString(Setting.java:726) at org.elasticsearch.common.settings.Setting.lambda$listSetting$26(Setting.java:672) at