Looking around some more it could be a misconfiguration of the running
hadoop cluster and what you're trying to point to. So, double check the IP
address and port number for the hadoop cluster (in the hadoop cluster
config and make sure the port is open, reachable, and listening). If its
all running locally, make sure any port forwarding is setup correctly, if
applicable.

On Thu, Aug 3, 2017 at 9:16 AM, Debasish Ghosh <[email protected]>
wrote:

> Thanks for the heads up. In fact I am using both HDFS and Kafka Connect
> from Mesosphere repository. Hence expected some compatibility .. will take
> a look to see if they include different versions of protobuf.
>
> regards.
>
> On Thu, Aug 3, 2017 at 7:24 PM, Stephen Durfey <[email protected]> wrote:
>
> > That sounds like either a protobuf dependency compatibility issue between
> > what is on the classpath of kafka connect and the hadoop cluster you are
> > trying to write to (e.g. you're on a newer version of protobuf than your
> > cluster, or vice versa), or a wire incompatilibty of the communcation
> > protocol between the version you're writing with and the version of your
> > cluster.
> >
> > Hopefully that gives you something to look at.
> >
> > On Thu, Aug 3, 2017 at 7:29 AM, Debasish Ghosh <[email protected]
> >
> > wrote:
> >
> > > I installed the Confluent Connect package on DC/OS. It started a worker
> > on
> > > the cluster without any problem. I am using version 3.2.2 (compatible
> > with
> > > 0.10.2.1).
> > >
> > > Now I have  a Kafka streams application that generates Avro into a
> Kafka
> > > topic named avro-topic .. and I configure the HdfsSinkConnector as
> > follows
> > > ..
> > >
> > > curl -X POST -H "Content-Type: application/json" --data '{"name":
> > > > "ks-hdfs-sink", "config": {"connector.class":"HdfsSinkConnector",
> > > > "tasks.max":"1", "hdfs.url":"hdfs://10.8.0.18:18039",
> > > > "topics":"avro-topic", "flush.size":"1000" }}'
> > > > http://10.8.0.18:4904/connectors
> > >
> > >
> > > When avro is produced in the topic, I get the following exceptions in
> the
> > > log, which shuts down the HdfsSinkConnector ..
> > >
> > > [2017-08-03 12:13:06,031] INFO Couldn't start HdfsSinkConnector:
> > > (io.confluent.connect.hdfs.HdfsSinkTask)
> > > org.apache.kafka.connect.errors.ConnectException: java.io.IOException:
> > > Failed on local exception:
> > > com.google.protobuf.InvalidProtocolBufferException: Protocol message
> > > end-group tag did not match expected tag.; Host Details : local host
> is:
> > > "7923902e26b4/172.17.0.3"; destination host is: "10.8.0.18":18039;
> > > at io.confluent.connect.hdfs.DataWriter.<init>(DataWriter.java:202)
> > > at io.confluent.connect.hdfs.HdfsSinkTask.start(HdfsSinkTask.java:76)
> > > at
> > > org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(
> > > WorkerSinkTask.java:231)
> > > at
> > > org.apache.kafka.connect.runtime.WorkerSinkTask.
> > > execute(WorkerSinkTask.java:145)
> > > at org.apache.kafka.connect.runtime.WorkerTask.doRun(
> > WorkerTask.java:139)
> > > at org.apache.kafka.connect.runtime.WorkerTask.run(
> WorkerTask.java:182)
> > > at java.util.concurrent.Executors$RunnableAdapter.
> > call(Executors.java:511)
> > > at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> > > at
> > > java.util.concurrent.ThreadPoolExecutor.runWorker(
> > > ThreadPoolExecutor.java:1142)
> > > at
> > > java.util.concurrent.ThreadPoolExecutor$Worker.run(
> > > ThreadPoolExecutor.java:617)
> > > at java.lang.Thread.run(Thread.java:745)
> > > Caused by: java.io.IOException: Failed on local exception:
> > > com.google.protobuf.InvalidProtocolBufferException: Protocol message
> > > end-group tag did not match expected tag.; Host Details : local host
> is:
> > > "7923902e26b4/172.17.0.3"; destination host is: "10.8.0.18":18039;
> > > at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:776)
> > > at org.apache.hadoop.ipc.Client.call(Client.java:1479)
> > > at org.apache.hadoop.ipc.Client.call(Client.java:1412)
> > > at
> > > org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.
> > > invoke(ProtobufRpcEngine.java:229)
> > > at com.sun.proxy.$Proxy49.getFileInfo(Unknown Source)
> > > at
> > > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslat
> > > orPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:771)
> > > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> > > at
> > > sun.reflect.NativeMethodAccessorImpl.invoke(
> > NativeMethodAccessorImpl.java:
> > > 62)
> > > at
> > > sun.reflect.DelegatingMethodAccessorImpl.invoke(
> > > DelegatingMethodAccessorImpl.java:43)
> > > at java.lang.reflect.Method.invoke(Method.java:498)
> > > at
> > > org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(
> > > RetryInvocationHandler.java:191)
> > > at
> > > org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(
> > > RetryInvocationHandler.java:102)
> > > at com.sun.proxy.$Proxy50.getFileInfo(Unknown Source)
> > > at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:2108)
> > > at
> > > org.apache.hadoop.hdfs.DistributedFileSystem$22.
> > > doCall(DistributedFileSystem.java:1305)
> > > at
> > > org.apache.hadoop.hdfs.DistributedFileSystem$22.
> > > doCall(DistributedFileSystem.java:1301)
> > > at
> > > org.apache.hadoop.fs.FileSystemLinkResolver.resolve(
> > > FileSystemLinkResolver.java:81)
> > > at
> > > org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(
> > > DistributedFileSystem.java:1317)
> > > at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1426)
> > > at io.confluent.connect.hdfs.storage.HdfsStorage.exists(
> > > HdfsStorage.java:66)
> > > at io.confluent.connect.hdfs.DataWriter.createDir(DataWriter.java:368)
> > > at io.confluent.connect.hdfs.DataWriter.<init>(DataWriter.java:170)
> > > ... 10 more
> > > Caused by: com.google.protobuf.InvalidProtocolBufferException:
> Protocol
> > > message end-group tag did not match expected tag.
> > > at
> > > com.google.protobuf.InvalidProtocolBufferException.invalidEndTag(
> > > InvalidProtocolBufferException.java:94)
> > > at
> > > com.google.protobuf.CodedInputStream.checkLastTagWas(
> > > CodedInputStream.java:124)
> > > at
> > > com.google.protobuf.AbstractParser.parsePartialFrom(
> > > AbstractParser.java:202)
> > > at
> > > com.google.protobuf.AbstractParser.parsePartialDelimitedFrom(
> > > AbstractParser.java:241)
> > > at
> > > com.google.protobuf.AbstractParser.parseDelimitedFrom(
> > > AbstractParser.java:253)
> > > at
> > > com.google.protobuf.AbstractParser.parseDelimitedFrom(
> > > AbstractParser.java:259)
> > > at
> > > com.google.protobuf.AbstractParser.parseDelimitedFrom(
> > > AbstractParser.java:49)
> > > at
> > > org.apache.hadoop.ipc.protobuf.RpcHeaderProtos$RpcResponseHeaderProto.
> > > parseDelimitedFrom(RpcHeaderProtos.java:3167)
> > > at
> > > org.apache.hadoop.ipc.Client$Connection.receiveRpcResponse(
> > > Client.java:1086)
> > > at org.apache.hadoop.ipc.Client$Connection.run(Client.java:979)
> > >
> > > It also throws anther exception while closing ..
> > >
> > > [2017-08-03 12:13:06,337] ERROR Task ks-hdfs-sink-0 threw an uncaught
> and
> > > unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
> > > java.lang.NullPointerException
> > > at io.confluent.connect.hdfs.HdfsSinkTask.close(HdfsSinkTask.java:121)
> > > at
> > > org.apache.kafka.connect.runtime.WorkerSinkTask.
> > > commitOffsets(WorkerSinkTask.java:317)
> > > at
> > > org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions(
> > > WorkerSinkTask.java:480)
> > > at
> > > org.apache.kafka.connect.runtime.WorkerSinkTask.
> > > execute(WorkerSinkTask.java:152)
> > > at org.apache.kafka.connect.runtime.WorkerTask.doRun(
> > WorkerTask.java:139)
> > > at org.apache.kafka.connect.runtime.WorkerTask.run(
> WorkerTask.java:182)
> > > at java.util.concurrent.Executors$RunnableAdapter.
> > call(Executors.java:511)
> > > at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> > > at
> > > java.util.concurrent.ThreadPoolExecutor.runWorker(
> > > ThreadPoolExecutor.java:1142)
> > > at
> > > java.util.concurrent.ThreadPoolExecutor$Worker.run(
> > > ThreadPoolExecutor.java:617)
> > > at java.lang.Thread.run(Thread.java:745)
> > >
> > > any pointer / help ?
> > >
> > > regards.
> > >
> > > --
> > > Debasish Ghosh
> > > http://manning.com/ghosh2
> > > http://manning.com/ghosh
> > >
> > > Twttr: @debasishg
> > > Blog: http://debasishg.blogspot.com
> > > Code: http://github.com/debasishg
> > >
> >
>
>
>
> --
> Debasish Ghosh
> http://manning.com/ghosh2
> http://manning.com/ghosh
>
> Twttr: @debasishg
> Blog: http://debasishg.blogspot.com
> Code: http://github.com/debasishg
>

Reply via email to