Hi Flink-Devs,
I have created an outputformat for elasticsearch, which connnects through
the transport client.
While running the job from eclipse-IDE, it works fine.
But, while running the job from command line or Flink web interface, i am
getting different error.
if i deploy the jar for first time, i am getting:
java.lang.Exception: Configuring the OutputFormat
(de.fraunhofer.fokus.odp.transformer.flink.iee2rdf.ESOutPutFormat@1a20270e)
failed: JRE_IS_64BIT
at
org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:80)
~[flink-runtime-0.8.1.jar:0.8.1]
at
org.apache.flink.runtime.jobmanager.JobManager.submitJob(JobManager.java:385)
~[flink-runtime-0.8.1.jar:0.8.1]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
~[na:1.8.0_45]
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
~[na:1.8.0_45]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
~[na:1.8.0_45]
at java.lang.reflect.Method.invoke(Unknown Source) ~[na:1.8.0_45]
at org.apache.flink.runtime.ipc.RPC$Server.call(RPC.java:420)
[flink-runtime-0.8.1.jar:0.8.1]
at org.apache.flink.runtime.ipc.Server$Handler.run(Server.java:949)
[flink-runtime-0.8.1.jar:0.8.1]
Caused by: java.lang.NoSuchFieldError: JRE_IS_64BIT
at
org.apache.lucene.util.RamUsageEstimator.<clinit>(RamUsageEstimator.java:145)
~[de.fraunhofer.fokus.thirdparty.elasticsearch.jdk_1.0.0.jar:na]
at org.apache.lucene.util.ArrayUtil.<clinit>(ArrayUtil.java:32)
~[de.fraunhofer.fokus.thirdparty.elasticsearch.jdk_1.0.0.jar:na]
at org.apache.lucene.util.BytesRefBuilder.grow(BytesRefBuilder.java:65)
~[de.fraunhofer.fokus.thirdparty.elasticsearch.jdk_1.0.0.jar:na]
at
org.apache.lucene.util.BytesRefBuilder.copyChars(BytesRefBuilder.java:146)
~[de.fraunhofer.fokus.thirdparty.elasticsearch.jdk_1.0.0.jar:na]
at
org.apache.lucene.util.BytesRefBuilder.copyChars(BytesRefBuilder.java:138)
~[de.fraunhofer.fokus.thirdparty.elasticsearch.jdk_1.0.0.jar:na]
at org.elasticsearch.common.Strings.toUTF8Bytes(Strings.java:1018)
~[de.fraunhofer.fokus.thirdparty.elasticsearch.jdk_1.0.0.jar:na]
at org.elasticsearch.common.Strings.toUTF8Bytes(Strings.java:1014)
~[de.fraunhofer.fokus.thirdparty.elasticsearch.jdk_1.0.0.jar:na]
at
org.elasticsearch.search.facet.filter.InternalFilterFacet.<clinit>(InternalFilterFacet.java:40)
~[de.fraunhofer.fokus.thirdparty.elasticsearch.jdk_1.0.0.jar:na]
at
org.elasticsearch.search.facet.TransportFacetModule.configure(TransportFacetModule.java:39)
~[de.fraunhofer.fokus.thirdparty.elasticsearch.jdk_1.0.0.jar:na]
at
org.elasticsearch.common.inject.AbstractModule.configure(AbstractModule.java:60)
~[de.fraunhofer.fokus.thirdparty.elasticsearch.jdk_1.0.0.jar:na]
at
org.elasticsearch.common.inject.spi.Elements$RecordingBinder.install(Elements.java:204)
~[de.fraunhofer.fokus.thirdparty.elasticsearch.jdk_1.0.0.jar:na]
at
org.elasticsearch.common.inject.spi.Elements.getElements(Elements.java:85)
~[de.fraunhofer.fokus.thirdparty.elasticsearch.jdk_1.0.0.jar:na]
at
org.elasticsearch.common.inject.InjectorShell$Builder.build(InjectorShell.java:130)
~[de.fraunhofer.fokus.thirdparty.elasticsearch.jdk_1.0.0.jar:na]
at
org.elasticsearch.common.inject.InjectorBuilder.build(InjectorBuilder.java:99)
~[de.fraunhofer.fokus.thirdparty.elasticsearch.jdk_1.0.0.jar:na]
at org.elasticsearch.common.inject.Guice.createInjector(Guice.java:93)
~[de.fraunhofer.fokus.thirdparty.elasticsearch.jdk_1.0.0.jar:na]
at org.elasticsearch.common.inject.Guice.createInjector(Guice.java:70)
~[de.fraunhofer.fokus.thirdparty.elasticsearch.jdk_1.0.0.jar:na]
at
org.elasticsearch.common.inject.ModulesBuilder.createInjector(ModulesBuilder.java:59)
~[de.fraunhofer.fokus.thirdparty.elasticsearch.jdk_1.0.0.jar:na]
at
org.elasticsearch.client.transport.TransportClient.<init>(TransportClient.java:195)
~[de.fraunhofer.fokus.thirdparty.elasticsearch.jdk_1.0.0.jar:na]
at
org.elasticsearch.client.transport.TransportClient.<init>(TransportClient.java:125)
~[de.fraunhofer.fokus.thirdparty.elasticsearch.jdk_1.0.0.jar:na]
at
de.fraunhofer.fokus.odp.transformer.flink.iee2rdf.ESOutPutFormat.configure(ESOutPutFormat.java:87)
~[de.fraunhofer.fokus.odp.transformer.flink.iee2rdf_1.0.0.201511012145.jar:na]
at
org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:77)
~[flink-runtime-0.8.1.jar:0.8.1]
... 7 common frames omitted
if i run the same jar again, it shows:
java.lang.Exception: Configuring the OutputFormat
(de.fraunhofer.fokus.odp.transformer.flink.iee2rdf.ESOutPutFormat@1a20270e)
failed: Could not initialize class
org.elasticsearch.search.facet.filter.InternalFilterFacet
at
org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:80)
~[flink-runtime-0.8.1.jar:0.8.1]
at
org.apache.flink.runtime.jobmanager.JobManager.submitJob(JobManager.java:385)
~[flink-runtime-0.8.1.jar:0.8.1]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
~[na:1.8.0_45]
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
~[na:1.8.0_45]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
~[na:1.8.0_45]
at java.lang.reflect.Method.invoke(Unknown Source) ~[na:1.8.0_45]
at org.apache.flink.runtime.ipc.RPC$Server.call(RPC.java:420)
[flink-runtime-0.8.1.jar:0.8.1]
at org.apache.flink.runtime.ipc.Server$Handler.run(Server.java:949)
[flink-runtime-0.8.1.jar:0.8.1]
Caused by: java.lang.NoClassDefFoundError: Could not initialize class
org.elasticsearch.search.facet.filter.InternalFilterFacet
at
org.elasticsearch.search.facet.TransportFacetModule.configure(TransportFacetModule.java:39)
~[de.fraunhofer.fokus.thirdparty.elasticsearch.jdk_1.0.0.jar:na]
at
org.elasticsearch.common.inject.AbstractModule.configure(AbstractModule.java:60)
~[de.fraunhofer.fokus.thirdparty.elasticsearch.jdk_1.0.0.jar:na]
at
org.elasticsearch.common.inject.spi.Elements$RecordingBinder.install(Elements.java:204)
~[de.fraunhofer.fokus.thirdparty.elasticsearch.jdk_1.0.0.jar:na]
at
org.elasticsearch.common.inject.spi.Elements.getElements(Elements.java:85)
~[de.fraunhofer.fokus.thirdparty.elasticsearch.jdk_1.0.0.jar:na]
at
org.elasticsearch.common.inject.InjectorShell$Builder.build(InjectorShell.java:130)
~[de.fraunhofer.fokus.thirdparty.elasticsearch.jdk_1.0.0.jar:na]
at
org.elasticsearch.common.inject.InjectorBuilder.build(InjectorBuilder.java:99)
~[de.fraunhofer.fokus.thirdparty.elasticsearch.jdk_1.0.0.jar:na]
at org.elasticsearch.common.inject.Guice.createInjector(Guice.java:93)
~[de.fraunhofer.fokus.thirdparty.elasticsearch.jdk_1.0.0.jar:na]
at org.elasticsearch.common.inject.Guice.createInjector(Guice.java:70)
~[de.fraunhofer.fokus.thirdparty.elasticsearch.jdk_1.0.0.jar:na]
at
org.elasticsearch.common.inject.ModulesBuilder.createInjector(ModulesBuilder.java:59)
~[de.fraunhofer.fokus.thirdparty.elasticsearch.jdk_1.0.0.jar:na]
at
org.elasticsearch.client.transport.TransportClient.<init>(TransportClient.java:195)
~[de.fraunhofer.fokus.thirdparty.elasticsearch.jdk_1.0.0.jar:na]
at
org.elasticsearch.client.transport.TransportClient.<init>(TransportClient.java:125)
~[de.fraunhofer.fokus.thirdparty.elasticsearch.jdk_1.0.0.jar:na]
at
de.fraunhofer.fokus.odp.transformer.flink.iee2rdf.ESOutPutFormat.configure(ESOutPutFormat.java:87)
~[de.fraunhofer.fokus.odp.transformer.flink.iee2rdf_1.0.0.201511012145.jar:na]
at
org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:77)
~[flink-runtime-0.8.1.jar:0.8.1]
... 7 common frames omitted
My ESOutPutFormat looks like the below.
/*
* (non-Javadoc)
* @see org.apache.flink.api.common.io.OutputFormat#close()
*/
@Override
public void close() throws IOException {
// TODO Auto-generated method stub
try {
client.close();
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("DONE ES CLuster updated");;
}
/*
* (non-Javadoc)
* @see
org.apache.flink.api.common.io.OutputFormat#configure(org.apache.flink.configuration.Configuration)
*/
@Override
public void configure(Configuration arg0) {
try {
Settings settings =
ImmutableSettings.settingsBuilder().put("cluster.name",
"elasticsearch").build();
@SuppressWarnings("resource")
TransportClient transportClient = new
TransportClient(settings);//Error
OCCURED HERE
transportClient = ((TransportClient)
transportClient).addTransportAddress(new
InetSocketTransportAddress("localhost", 9300));
this.client = (Client) transportClient;
} catch (Exception e) {
e.printStackTrace();
}
}
/*
* (non-Javadoc)
* @see org.apache.flink.api.common.io.OutputFormat#open(int, int)
*/
@Override
public void open(int arg0, int arg1) throws IOException {
try {
} catch (Exception e) {
// TODO: handle exception
}
}
/*
* (non-Javadoc)
* @see
org.apache.flink.api.common.io.OutputFormat#writeRecord(java.lang.Object)
*/
@Override
public void writeRecord(Tuple2<String, String> input) throws
IOException {
System.out.println(" ES CLuster Indexing processing...");;
try {
//Writing records
} catch (Exception e) {
e.printStackTrace();
}
}
Thanks and Regards,
Santosh
--
View this message in context:
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/job-failed-while-initiating-Transport-client-for-Elasticsearch-tp8833.html
Sent from the Apache Flink Mailing List archive. mailing list archive at
Nabble.com.