[jira] [Created] (FLINK-10457) Support SequenceFile for StreamingFileSink

2018-09-28 Thread Jihyun Cho (JIRA)
Jihyun Cho created FLINK-10457:
--

 Summary: Support SequenceFile for StreamingFileSink
 Key: FLINK-10457
 URL: https://issues.apache.org/jira/browse/FLINK-10457
 Project: Flink
  Issue Type: New Feature
  Components: Streaming Connectors
Reporter: Jihyun Cho


SequenceFile is major file format in Hadoop eco system.
It is simple to manage file and easy to combine with other tools.
So we are still needed SequenceFile format, even if the file format supports 
Parquet and ORC.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8318) Conflict jackson library with ElasticSearch connector

2018-01-05 Thread Jihyun Cho (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8318?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16313362#comment-16313362
 ] 

Jihyun Cho commented on FLINK-8318:
---

Here is my code and pom files.
{code:title=Test.scala}
import java.net.{InetAddress, InetSocketAddress}
import java.util.{Properties, TimeZone}

import org.apache.commons.lang3.time.FastDateFormat
import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import 
org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, 
RequestIndexer}
import org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
import org.elasticsearch.action.index.IndexRequest
import org.elasticsearch.client.Requests
import org.json4s.DefaultFormats
import org.json4s.JsonAST._
import org.json4s.native.JsonMethods._
import org.json4s.native.Serialization.write

object Test {
  val consumeProperties = {
val props = new Properties()
props.setProperty("bootstrap.servers", "kafka-001:9092")
props.setProperty("group.id", "test")
props.setProperty("auto.offset.reset", "latest")
props
  }

  def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment

val stream = env.addSource(new 
FlinkKafkaConsumer010[String]("clova-log-dev", new SimpleStringSchema(), 
consumeProperties))


val config = new java.util.HashMap[String, String]
config.put("cluster.name", "test")

val transportAddresses = new java.util.ArrayList[InetSocketAddress]
transportAddresses.add(new 
InetSocketAddress(InetAddress.getByName("es-001"), 9300))

val esSink = new ElasticsearchSink[String](config, transportAddresses,
  new ElasticsearchSinkFunction[String] {
def createIndexRequest(t: String): IndexRequest = {
  return Requests.indexRequest()
.index("test")
.`type`("message")
.source(t)
}

override def process(t: String, runtimeContext: RuntimeContext, 
requestIndexer: RequestIndexer) = {
  requestIndexer.add(createIndexRequest(t))
}
  }
)

stream.map { value =>
  try {
val esDateFormat = FastDateFormat.getInstance("-MM-dd 
HH:mm:ss.SSS", TimeZone.getTimeZone("UTC"))
implicit val formats = DefaultFormats
val json = parse(value)
val transJson = json transformField {
  case JField("short_message", JString(s)) => ("message", JString(s))
  case JField("host", JString(s)) => ("source", JString(s))
  case JField("timestamp", JInt(i)) => ("timestamp", 
JString(esDateFormat.format((i * 1000L).toLong)))
  case JField("timestamp", JDouble(d)) => ("timestamp", 
JString(esDateFormat.format((d * 1000L).toLong)))
  case JField(k, v) => (k.stripPrefix("_"), v)
}
write(transJson)
  } catch {
case _: Exception => ""
  }
}.filter(_.nonEmpty).addSink(esSink)

env.execute("Test")
  }
}
{code}
{code:title=pom.xml}

http://maven.apache.org/POM/4.0.0;
 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>

4.0.0

test.streaming
test
jar
1.0-SNAPSHOT


1.4.0




org.apache.flink
flink-streaming-scala_2.11
${flink.version}


org.apache.flink
flink-streaming-contrib_2.11
${flink.version}


org.apache.flink
flink-connector-kafka-0.10_2.11
${flink.version}


org.apache.flink
flink-connector-elasticsearch5_2.11
${flink.version}


org.json4s
json4s-native_2.11
3.5.3






org.apache.maven.plugins
maven-compiler-plugin

1.8
1.8



net.alchim31.maven
scala-maven-plugin
3.2.2



compile
testCompile





org.apache.maven.plugins
maven-shade-plugin
2.4.3




test.streaming.Test


${project.artifactId}



[jira] [Comment Edited] (FLINK-8318) Conflict jackson library with ElasticSearch connector

2018-01-05 Thread Jihyun Cho (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8318?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16313362#comment-16313362
 ] 

Jihyun Cho edited comment on FLINK-8318 at 1/5/18 4:14 PM:
---

Here is my code and pom files.
{code:title=Test.scala}
import java.net.{InetAddress, InetSocketAddress}
import java.util.{Properties, TimeZone}

import org.apache.commons.lang3.time.FastDateFormat
import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import 
org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, 
RequestIndexer}
import org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
import org.elasticsearch.action.index.IndexRequest
import org.elasticsearch.client.Requests
import org.json4s.DefaultFormats
import org.json4s.JsonAST._
import org.json4s.native.JsonMethods._
import org.json4s.native.Serialization.write

object Test {
  val consumeProperties = {
val props = new Properties()
props.setProperty("bootstrap.servers", "kafka-001:9092")
props.setProperty("group.id", "test")
props.setProperty("auto.offset.reset", "latest")
props
  }

  def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment

val stream = env.addSource(new FlinkKafkaConsumer010[String]("test", new 
SimpleStringSchema(), consumeProperties))


val config = new java.util.HashMap[String, String]
config.put("cluster.name", "test")

val transportAddresses = new java.util.ArrayList[InetSocketAddress]
transportAddresses.add(new 
InetSocketAddress(InetAddress.getByName("es-001"), 9300))

val esSink = new ElasticsearchSink[String](config, transportAddresses,
  new ElasticsearchSinkFunction[String] {
def createIndexRequest(t: String): IndexRequest = {
  return Requests.indexRequest()
.index("test")
.`type`("message")
.source(t)
}

override def process(t: String, runtimeContext: RuntimeContext, 
requestIndexer: RequestIndexer) = {
  requestIndexer.add(createIndexRequest(t))
}
  }
)

stream.map { value =>
  try {
val esDateFormat = FastDateFormat.getInstance("-MM-dd 
HH:mm:ss.SSS", TimeZone.getTimeZone("UTC"))
implicit val formats = DefaultFormats
val json = parse(value)
val transJson = json transformField {
  case JField("short_message", JString(s)) => ("message", JString(s))
  case JField("host", JString(s)) => ("source", JString(s))
  case JField("timestamp", JInt(i)) => ("timestamp", 
JString(esDateFormat.format((i * 1000L).toLong)))
  case JField("timestamp", JDouble(d)) => ("timestamp", 
JString(esDateFormat.format((d * 1000L).toLong)))
  case JField(k, v) => (k.stripPrefix("_"), v)
}
write(transJson)
  } catch {
case _: Exception => ""
  }
}.filter(_.nonEmpty).addSink(esSink)

env.execute("Test")
  }
}
{code}
{code:title=pom.xml}

http://maven.apache.org/POM/4.0.0;
 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>

4.0.0

test.streaming
test
jar
1.0-SNAPSHOT


1.4.0




org.apache.flink
flink-streaming-scala_2.11
${flink.version}


org.apache.flink
flink-streaming-contrib_2.11
${flink.version}


org.apache.flink
flink-connector-kafka-0.10_2.11
${flink.version}


org.apache.flink
flink-connector-elasticsearch5_2.11
${flink.version}


org.json4s
json4s-native_2.11
3.5.3






org.apache.maven.plugins
maven-compiler-plugin

1.8
1.8



net.alchim31.maven
scala-maven-plugin
3.2.2



compile
testCompile





org.apache.maven.plugins
maven-shade-plugin
2.4.3




test.streaming.Test



[jira] [Commented] (FLINK-8318) Conflict jackson library with ElasticSearch connector

2018-01-04 Thread Jihyun Cho (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8318?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16312448#comment-16312448
 ] 

Jihyun Cho commented on FLINK-8318:
---

When I tryed the new default option {{child-first}}, the error is occured.
{noformat}
org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot 
instantiate user function.
at 
org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:235)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:355)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:282)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:346)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:282)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:126)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:231)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassCastException: cannot assign instance of 
test.streaming.Test$$anonfun$main$2 to field 
org.apache.flink.streaming.api.scala.DataStream$$anon$7.cleanFun$6 of type 
scala.Function1 in instance of 
org.apache.flink.streaming.api.scala.DataStream$$anon$7
at 
java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2233)
at 
java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1405)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2293)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:433)
at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:290)
at 
org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:248)
at 
org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:220)
... 8 more
{noformat}
So I changed to the previous default, {{parent-first}}.
Regardless of the above options, I think that the problem is that Flink loads 
Hadoop libraries even if it does not set {{HADOOP_CLASSPATH}}.

> Conflict jackson library with ElasticSearch connector
> -
>
> Key: FLINK-8318
> URL: https://issues.apache.org/jira/browse/FLINK-8318
> Project: Flink
>  Issue Type: Bug
>  Components: ElasticSearch Connector, Startup Shell Scripts
>Affects Versions: 1.4.0
>Reporter: Jihyun Cho
>
> My flink job is failed after update flink version to 1.4.0. It uses 
> ElasticSearch connector.
> I'm using CDH Hadoop with Flink option "classloader.resolve-order: 
> parent-first" 
> The failure log is below.
> {noformat}
> Using the result of 'hadoop classpath' to augment the Hadoop classpath: 
> /etc/hadoop/conf:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop/lib/*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop/.//*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-hdfs/./:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-hdfs/lib/*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-hdfs/.//*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-yarn/lib/*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-yarn/.//*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-mapreduce/lib/*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-mapreduce/.//*
> 2017-12-26 14:13:21,160 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - 
> 
> 2017-12-26 14:13:21,161 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  -  Starting 
> TaskManager (Version: 1.4.0, Rev:3a9d9f2, Date:06.12.2017 @ 11:08:40 UTC)
> 2017-12-26 14:13:21,161 INFO  
> 

[jira] [Created] (FLINK-8318) Conflict jackson library with ElasticSearch connector

2017-12-25 Thread Jihyun Cho (JIRA)
Jihyun Cho created FLINK-8318:
-

 Summary: Conflict jackson library with ElasticSearch connector
 Key: FLINK-8318
 URL: https://issues.apache.org/jira/browse/FLINK-8318
 Project: Flink
  Issue Type: Bug
  Components: ElasticSearch Connector, Startup Shell Scripts
Affects Versions: 1.4.0
Reporter: Jihyun Cho


My flink job is failed after update flink version to 1.4.0. It uses 
ElasticSearch connector.
I'm using CDH Hadoop with Flink option "classloader.resolve-order: 
parent-first" 
The failure log is below.

{noformat}
Using the result of 'hadoop classpath' to augment the Hadoop classpath: 
/etc/hadoop/conf:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop/lib/*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop/.//*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-hdfs/./:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-hdfs/lib/*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-hdfs/.//*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-yarn/lib/*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-yarn/.//*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-mapreduce/lib/*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-mapreduce/.//*
2017-12-26 14:13:21,160 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
- 

2017-12-26 14:13:21,161 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
-  Starting TaskManager (Version: 1.4.0, Rev:3a9d9f2, 
Date:06.12.2017 @ 11:08:40 UTC)
2017-12-26 14:13:21,161 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
-  OS current user: www
2017-12-26 14:13:21,446 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
-  Current Hadoop/Kerberos user: www
2017-12-26 14:13:21,446 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
-  JVM: Java HotSpot(TM) 64-Bit Server VM - Oracle Corporation - 
1.8/25.131-b11
2017-12-26 14:13:21,447 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
-  Maximum heap size: 31403 MiBytes
2017-12-26 14:13:21,447 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
-  JAVA_HOME: (not set)
2017-12-26 14:13:21,448 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
-  Hadoop version: 2.6.5
2017-12-26 14:13:21,448 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
-  JVM Options:
2017-12-26 14:13:21,448 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
- -Xms32768M
2017-12-26 14:13:21,448 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
- -Xmx32768M
2017-12-26 14:13:21,448 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
- -XX:MaxDirectMemorySize=8388607T
2017-12-26 14:13:21,448 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
- 
-Djava.library.path=/home/cloudera/parcels/CDH/lib/hadoop/lib/native/
2017-12-26 14:13:21,449 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
- 
-Dlog4j.configuration=file:/home/www/service/flink-1.4.0/conf/log4j-console.properties
2017-12-26 14:13:21,449 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
- 
-Dlogback.configurationFile=file:/home/www/service/flink-1.4.0/conf/logback-console.xml
2017-12-26 14:13:21,449 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
-  Program Arguments:
2017-12-26 14:13:21,449 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
- --configDir
2017-12-26 14:13:21,449 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
- /home/www/service/flink-1.4.0/conf
2017-12-26 14:13:21,449 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
-  Classpath:
...:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-mapreduce/.//jackson-core-2.2.3.jar:...

2017-12-26 14:14:01,393 INFO  org.apache.flink.runtime.taskmanager.Task 
- Source: Custom Source -> Filter -> Map -> Filter -> Sink: Unnamed 
(3/10) (fb33a6e0c1a7e859eaef9cf8bcf4565e) switched from RUNNING to FAILED.
java.lang.NoSuchFieldError: FAIL_ON_SYMBOL_HASH_OVERFLOW
at 
org.elasticsearch.common.xcontent.json.JsonXContent.(JsonXContent.java:76)
at 
org.elasticsearch.common.xcontent.XContentType$1.xContent(XContentType.java:59)
at 
org.elasticsearch.common.settings.Setting.arrayToParsableString(Setting.java:726)
at 
org.elasticsearch.common.settings.Setting.lambda$listSetting$26(Setting.java:672)
at