Re: PLEASE - [KAFKA-1194] Log Retention is always throwing IOException

2017-07-06 Thread Manikumar
Kafka is not well tested on Windows platform..There are some issues running
on
Windows. It is recommended to run on Linux machines.

On Thu, Jul 6, 2017 at 9:49 PM, M. Manna  wrote:

> Hi,
>
> I have sent numerous emails in the past about the same issue, but no
> response so far.
>
> I was wondering if we will get a patch. I am working on a synchronisation
> PoC which is reliant on Log cleanup to be successful every day. I have got
> auto.offset.reset=earliest and the offsets.retention.minutes is set
> sufficiently large (2880) to avoid such issues. I do have a
> log.cleanup.policy=compact, but what I understand from the documentation
> that deletion occurs after "Compact" for older logs anyway.
>
> I have moved my log.dir locations several places and with full access to
> the directories, but still same issue as KAFKA-1194. Could someone please
> let me know if this has worked for you i.e. the logs were deleted
> successfully after certain period?
>
> KR,
> MM
>


Re: Mirroring multiple clusters into one

2017-07-06 Thread James Cheng
Answers inline below.

-James

Sent from my iPhone

> On Jul 7, 2017, at 1:18 AM, Vahid S Hashemian  
> wrote:
> 
> James,
> 
> Thanks for sharing your thoughts and experience.
> Could you please also confirm whether
> - you do any encryption for the mirrored data?
Not at the Kafka level. The data goes over a VPN.

> - you have a many-to-one mirroring similar to what I described?
> 

Yes, we mirror multiple source clusters to a single target cluster. We have a 
topic naming convention where our topics are prefixed with their cluster name, 
so as long as we follow that convention, each source topic gets mirrored to a 
unique target topic. That is, we try not to have multiple mirrormakers writing 
to a single target topic. 

Our topic names in the target cluster get prefixed with the string "mirror." 
And then we never mirror topics that start with "mirror." This prevents us from 
creating mirroring loops.

> Thanks.
> --Vahid
> 
> 
> 
> From:   James Cheng 
> To: users@kafka.apache.org
> Cc: dev 
> Date:   07/06/2017 12:37 PM
> Subject:Re: Mirroring multiple clusters into one
> 
> 
> 
> I'm not sure what the "official" recommendation is. At TiVo, we *do* run 
> all our mirrormakers near the target cluster. It works fine for us, but 
> we're still fairly inexperienced, so I'm not sure how strong of a data 
> point we should be.
> 
> I think the thought process is, if you are mirroring from a source cluster 
> to a target cluster where there is a WAN between the two, then whichever 
> request goes across the WAN has a higher chance of intermittent failure 
> than the one over the LAN. That means that if mirrormaker is near the 
> source cluster, the produce request over the WAN to the target cluster may 
> fail. If the mirrormaker is near the target cluster, then the fetch 
> request over the WAN to the source cluster may fail.
> 
> Failed fetch requests don't have much impact on data replication, it just 
> delays it. Whereas a failure during a produce request may introduce 
> duplicates.
> 
> Becket Qin from LinkedIn did a presentation on tuning producer performance 
> at a meetup last year, and I remember he specifically talked about 
> producing over a WAN as one of the cases where you have to tune settings. 
> Maybe that presentation will give more ideas about what to look at. 
> https://www.slideshare.net/mobile/JiangjieQin/producer-performance-tuning-for-apache-kafka-63147600
> 
> 
> -James
> 
> Sent from my iPhone
> 
>> On Jul 6, 2017, at 1:00 AM, Vahid S Hashemian 
>  wrote:
>> 
>> The literature suggests running the MM on the target cluster when 
> possible 
>> (with the exception of when encryption is required for transferred 
> data).
>> I am wondering if this is still the recommended approach when mirroring 
>> from multiple clusters to a single cluster (i.e. multiple MM instances).
>> Is there anything in particular (metric, specification, etc.) to 
> consider 
>> before making a decision?
>> 
>> Thanks.
>> --Vahid
>> 
>> 
> 
> 
> 
> 


Re: connect in 0.11.0.0 warnings due to class not found exceptions

2017-07-06 Thread Koert Kuipers
i did not have log4j.logger.org.reflections=ERROR, because i didnt update
my log4j files yet. i will do this now.

connect seems to start up fine.

i still wonder why its searching for gson. like... where does it get the
idea for the start searching for gson? i dont use gson and neither does
connect it seems?

On Thu, Jul 6, 2017 at 8:09 PM, Konstantine Karantasis <
konstant...@confluent.io> wrote:

> Hi Koert,
>
> these warnings appear to be produced during the class scanning that Connect
> is performing when it's starting up. Connect is using org.reflections to
> discover plugins (Connectors, Transformations, Converters) in the various
> locations that it's configured to search for plugins.
> (such locations are entries in the plugin.path property as well as the
> supplied CLASSPATH). It's normally safe to ignore the warnings.
>
> I would expect that such warnings would be disabled by having:
>
> log4j.logger.org.reflections=ERROR
>
> in config/connect-log4j.properties.
>
> Does this setting exist in your environment? Did you by any change enabled
> a different log level for org.reflections?
> Also, is Connect starting up successfully after all these warnings are
> logged?
>
> Konstantine
>
>
> On Thu, Jul 6, 2017 at 3:33 PM, Koert Kuipers  wrote:
>
> > i just did a test upgrade to kafka 0.11.0.0 and i am seeing lots of
> > ClassNotFoundException in the logs for connect-distributed upon startup,
> > see below. is this expected? kind of curious why its looking for say gson
> > while gson jar is not in libs folder.
> > best,
> > koert
> >
> >
> > [2017-07-06 22:20:41,844] INFO Reflections took 6944 ms to scan 65 urls,
> > producing 3136 keys and 25105 values  (org.reflections.Reflections:232)
> > [2017-07-06 22:20:42,126] WARN could not get type for name
> > org.osgi.framework.BundleListener from any class loader
> > (org.reflections.Reflections:396)
> > org.reflections.ReflectionsException: could not get type for name
> > org.osgi.framework.BundleListener
> > at org.reflections.ReflectionUtils.forName(
> > ReflectionUtils.java:390)
> > at
> > org.reflections.Reflections.expandSuperTypes(Reflections.java:381)
> > at org.reflections.Reflections.(Reflections.java:126)
> > at
> > org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.
> > scanPluginPath(DelegatingClassLoader.java:221)
> > at
> > org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.
> > scanUrlsAndAddPlugins(DelegatingClassLoader.java:198)
> > at
> > org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.
> > initLoaders(DelegatingClassLoader.java:159)
> > at
> > org.apache.kafka.connect.runtime.isolation.Plugins.<
> init>(Plugins.java:47)
> > at
> > org.apache.kafka.connect.cli.ConnectDistributed.main(
> > ConnectDistributed.java:63)
> > Caused by: java.lang.ClassNotFoundException:
> > org.osgi.framework.BundleListener
> > at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> > at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> > at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
> > at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> > at org.reflections.ReflectionUtils.forName(
> > ReflectionUtils.java:388)
> > ... 7 more
> > [2017-07-06 22:20:42,223] WARN could not get type for name
> > com.google.gson.JsonDeserializer from any class loader
> > (org.reflections.Reflections:396)
> > org.reflections.ReflectionsException: could not get type for name
> > com.google.gson.JsonDeserializer
> > at org.reflections.ReflectionUtils.forName(
> > ReflectionUtils.java:390)
> > at
> > org.reflections.Reflections.expandSuperTypes(Reflections.java:381)
> > at org.reflections.Reflections.(Reflections.java:126)
> > at
> > org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.
> > scanPluginPath(DelegatingClassLoader.java:221)
> > at
> > org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.
> > scanUrlsAndAddPlugins(DelegatingClassLoader.java:198)
> > at
> > org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.
> > initLoaders(DelegatingClassLoader.java:159)
> > at
> > org.apache.kafka.connect.runtime.isolation.Plugins.<
> init>(Plugins.java:47)
> > at
> > org.apache.kafka.connect.cli.ConnectDistributed.main(
> > ConnectDistributed.java:63)
> > Caused by: java.lang.ClassNotFoundException:
> > com.google.gson.JsonDeserializer
> > at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> > at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> > at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
> > at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> > at org.reflections.ReflectionUtils.forName(
> > ReflectionUtils.java:388)
> > ... 7 more
> >
>


Kafka Broker Halting due to "ahead" offsets

2017-07-06 Thread Gaurav Agarwal
Hi All,

In our 3-node test cluster running Kafka 0.10.0, we faced this error:

FATAL [2017-07-06 07:30:42,962]
kafka.server.ReplicaFetcherThread:[Logging$class:fatal:110] -
[ReplicaFetcherThread-0-0] - [ReplicaFetcherThread-0-0], Halting because
log truncation is not allowed for topic Topic3, Current leader 0's latest
offset 41170020 is less than replica 3's latest offset 41170083

Kafka cluster is configured with:
replication_factor:3, min_isr:2 and unclean_leader_election: disabled

There were some machine issues where node 1 crashed out and rejoined after
30 seconds or so. Ideally, since min_isr is set to 2, another node should
have take over but for some reason the isr for some of the topic partitions
consisted of only node 1 just before node 1 crashed.

It appears similar to issues described in:
https://issues.apache.org/jira/browse/KAFKA-3861
https://issues.apache.org/jira/browse/KAFKA-3410

What I wanted to know is :

(a) How to handle such errors? ISR size is dynamically determined and it is
quite possible that in time of troubles, the troubled node will shrink its
ISR to itself (like network disruption before crashing).
(b) Is this issue addressed in any way in future Kafka versions like
0.11.0? Will https://issues.apache.org/jira/browse/KAFKA-1211 prevent this
situation?

--
thanks,
gaurav


Error in Kafka startup with the JMX exporter - java.lang.IllegalArgumentException: Collector already registered that provides name: jmx_scrape_duration_seconds

2017-07-06 Thread karan alang
Hi All,
I'm getting error in starting up Kafka, with the JMX exporter passed as
java agent.
Error is as shown below, any ideas ?

I'm trying to integrate Prometheus to collect Kafka metrics.


[root@nwk2-bdp-kafka-06 kafka]# KAFKA_OPTS="$KAFKA_OPTS
-javaagent:/usr/hdp/2.5.3.0-37/prometheus/jmx_prometheus_javaagent-0.9.jar=7071:/usr/hdp/2.5.3.0-37/prometheus/jmx_exporter.yaml"
./bin/kafka-server-start.sh config/server.properties
2017-07-07 00:44:31.781:INFO:ipjsoejs.Server:jetty-8.y.z-SNAPSHOT
2017-07-07 00:44:31.866:INFO:ipjsoejs.AbstractConnector:Started
SelectChannelConnector@0.0.0.0:7071
Exception in thread "main" java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
sun.instrument.InstrumentationImpl.loadClassAndStartAgent(InstrumentationImpl.java:386)
at
sun.instrument.InstrumentationImpl.loadClassAndCallPremain(InstrumentationImpl.java:401)

*Caused by: java.lang.IllegalArgumentException: Collector already
registered that provides name: jmx_scrape_duration_seconds*at
io.prometheus.jmx.shaded.io.prometheus.client.CollectorRegistry.register(CollectorRegistry.java:54)
at
io.prometheus.jmx.shaded.io.prometheus.client.Collector.register(Collector.java:128)
at
io.prometheus.jmx.shaded.io.prometheus.client.Collector.register(Collector.java:121)
at
io.prometheus.jmx.shaded.io.prometheus.jmx.JavaAgent.premain(JavaAgent.java:38)


How to fetch offset for all the partitions of one topic through single KafkaClient function call (0.10.x)?

2017-07-06 Thread Martin Peng
Hi,

I am using Kafka client 0.10.2. Is there a way to fetch latest committed
offset for all the partitions in one function call?

I am call the KafkaConsumer.commited() to get this for single partition, is
there a simple way to batch fetch offsets for all the partitions in single
topic in one shot? And how to fetch all partition offsets for multiple
topics?

Thanks
Martin


Re: connect in 0.11.0.0 warnings due to class not found exceptions

2017-07-06 Thread Konstantine Karantasis
Hi Koert,

these warnings appear to be produced during the class scanning that Connect
is performing when it's starting up. Connect is using org.reflections to
discover plugins (Connectors, Transformations, Converters) in the various
locations that it's configured to search for plugins.
(such locations are entries in the plugin.path property as well as the
supplied CLASSPATH). It's normally safe to ignore the warnings.

I would expect that such warnings would be disabled by having:

log4j.logger.org.reflections=ERROR

in config/connect-log4j.properties.

Does this setting exist in your environment? Did you by any change enabled
a different log level for org.reflections?
Also, is Connect starting up successfully after all these warnings are
logged?

Konstantine


On Thu, Jul 6, 2017 at 3:33 PM, Koert Kuipers  wrote:

> i just did a test upgrade to kafka 0.11.0.0 and i am seeing lots of
> ClassNotFoundException in the logs for connect-distributed upon startup,
> see below. is this expected? kind of curious why its looking for say gson
> while gson jar is not in libs folder.
> best,
> koert
>
>
> [2017-07-06 22:20:41,844] INFO Reflections took 6944 ms to scan 65 urls,
> producing 3136 keys and 25105 values  (org.reflections.Reflections:232)
> [2017-07-06 22:20:42,126] WARN could not get type for name
> org.osgi.framework.BundleListener from any class loader
> (org.reflections.Reflections:396)
> org.reflections.ReflectionsException: could not get type for name
> org.osgi.framework.BundleListener
> at org.reflections.ReflectionUtils.forName(
> ReflectionUtils.java:390)
> at
> org.reflections.Reflections.expandSuperTypes(Reflections.java:381)
> at org.reflections.Reflections.(Reflections.java:126)
> at
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.
> scanPluginPath(DelegatingClassLoader.java:221)
> at
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.
> scanUrlsAndAddPlugins(DelegatingClassLoader.java:198)
> at
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.
> initLoaders(DelegatingClassLoader.java:159)
> at
> org.apache.kafka.connect.runtime.isolation.Plugins.(Plugins.java:47)
> at
> org.apache.kafka.connect.cli.ConnectDistributed.main(
> ConnectDistributed.java:63)
> Caused by: java.lang.ClassNotFoundException:
> org.osgi.framework.BundleListener
> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> at org.reflections.ReflectionUtils.forName(
> ReflectionUtils.java:388)
> ... 7 more
> [2017-07-06 22:20:42,223] WARN could not get type for name
> com.google.gson.JsonDeserializer from any class loader
> (org.reflections.Reflections:396)
> org.reflections.ReflectionsException: could not get type for name
> com.google.gson.JsonDeserializer
> at org.reflections.ReflectionUtils.forName(
> ReflectionUtils.java:390)
> at
> org.reflections.Reflections.expandSuperTypes(Reflections.java:381)
> at org.reflections.Reflections.(Reflections.java:126)
> at
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.
> scanPluginPath(DelegatingClassLoader.java:221)
> at
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.
> scanUrlsAndAddPlugins(DelegatingClassLoader.java:198)
> at
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.
> initLoaders(DelegatingClassLoader.java:159)
> at
> org.apache.kafka.connect.runtime.isolation.Plugins.(Plugins.java:47)
> at
> org.apache.kafka.connect.cli.ConnectDistributed.main(
> ConnectDistributed.java:63)
> Caused by: java.lang.ClassNotFoundException:
> com.google.gson.JsonDeserializer
> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> at org.reflections.ReflectionUtils.forName(
> ReflectionUtils.java:388)
> ... 7 more
>


Re: Mirroring multiple clusters into one

2017-07-06 Thread Vahid S Hashemian
James,

Thanks for sharing your thoughts and experience.
Could you please also confirm whether
- you do any encryption for the mirrored data?
- you have a many-to-one mirroring similar to what I described?

Thanks.
--Vahid



From:   James Cheng 
To: users@kafka.apache.org
Cc: dev 
Date:   07/06/2017 12:37 PM
Subject:Re: Mirroring multiple clusters into one



I'm not sure what the "official" recommendation is. At TiVo, we *do* run 
all our mirrormakers near the target cluster. It works fine for us, but 
we're still fairly inexperienced, so I'm not sure how strong of a data 
point we should be.

I think the thought process is, if you are mirroring from a source cluster 
to a target cluster where there is a WAN between the two, then whichever 
request goes across the WAN has a higher chance of intermittent failure 
than the one over the LAN. That means that if mirrormaker is near the 
source cluster, the produce request over the WAN to the target cluster may 
fail. If the mirrormaker is near the target cluster, then the fetch 
request over the WAN to the source cluster may fail.

Failed fetch requests don't have much impact on data replication, it just 
delays it. Whereas a failure during a produce request may introduce 
duplicates.

Becket Qin from LinkedIn did a presentation on tuning producer performance 
at a meetup last year, and I remember he specifically talked about 
producing over a WAN as one of the cases where you have to tune settings. 
Maybe that presentation will give more ideas about what to look at. 
https://www.slideshare.net/mobile/JiangjieQin/producer-performance-tuning-for-apache-kafka-63147600


-James

Sent from my iPhone

> On Jul 6, 2017, at 1:00 AM, Vahid S Hashemian 
 wrote:
> 
> The literature suggests running the MM on the target cluster when 
possible 
> (with the exception of when encryption is required for transferred 
data).
> I am wondering if this is still the recommended approach when mirroring 
> from multiple clusters to a single cluster (i.e. multiple MM instances).
> Is there anything in particular (metric, specification, etc.) to 
consider 
> before making a decision?
> 
> Thanks.
> --Vahid
> 
> 






updating startup script in Apache Amabari

2017-07-06 Thread karan alang
hi All


i'm trying to configure/update Kafka startup script, so Prometheus is able
to read metrics from Kafka.

I need to add the following to the Broker startup script :

-javaagent:/path/to/jmx_exporter.jar=$PORT:$CONFIG

Where in Apache Ambari can i modify the Kafka Broker startup script ?

Pls note - i'm using Hortonworks 2.5.x


connect in 0.11.0.0 warnings due to class not found exceptions

2017-07-06 Thread Koert Kuipers
i just did a test upgrade to kafka 0.11.0.0 and i am seeing lots of
ClassNotFoundException in the logs for connect-distributed upon startup,
see below. is this expected? kind of curious why its looking for say gson
while gson jar is not in libs folder.
best,
koert


[2017-07-06 22:20:41,844] INFO Reflections took 6944 ms to scan 65 urls,
producing 3136 keys and 25105 values  (org.reflections.Reflections:232)
[2017-07-06 22:20:42,126] WARN could not get type for name
org.osgi.framework.BundleListener from any class loader
(org.reflections.Reflections:396)
org.reflections.ReflectionsException: could not get type for name
org.osgi.framework.BundleListener
at org.reflections.ReflectionUtils.forName(ReflectionUtils.java:390)
at
org.reflections.Reflections.expandSuperTypes(Reflections.java:381)
at org.reflections.Reflections.(Reflections.java:126)
at
org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanPluginPath(DelegatingClassLoader.java:221)
at
org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanUrlsAndAddPlugins(DelegatingClassLoader.java:198)
at
org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initLoaders(DelegatingClassLoader.java:159)
at
org.apache.kafka.connect.runtime.isolation.Plugins.(Plugins.java:47)
at
org.apache.kafka.connect.cli.ConnectDistributed.main(ConnectDistributed.java:63)
Caused by: java.lang.ClassNotFoundException:
org.osgi.framework.BundleListener
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at org.reflections.ReflectionUtils.forName(ReflectionUtils.java:388)
... 7 more
[2017-07-06 22:20:42,223] WARN could not get type for name
com.google.gson.JsonDeserializer from any class loader
(org.reflections.Reflections:396)
org.reflections.ReflectionsException: could not get type for name
com.google.gson.JsonDeserializer
at org.reflections.ReflectionUtils.forName(ReflectionUtils.java:390)
at
org.reflections.Reflections.expandSuperTypes(Reflections.java:381)
at org.reflections.Reflections.(Reflections.java:126)
at
org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanPluginPath(DelegatingClassLoader.java:221)
at
org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanUrlsAndAddPlugins(DelegatingClassLoader.java:198)
at
org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initLoaders(DelegatingClassLoader.java:159)
at
org.apache.kafka.connect.runtime.isolation.Plugins.(Plugins.java:47)
at
org.apache.kafka.connect.cli.ConnectDistributed.main(ConnectDistributed.java:63)
Caused by: java.lang.ClassNotFoundException:
com.google.gson.JsonDeserializer
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at org.reflections.ReflectionUtils.forName(ReflectionUtils.java:388)
... 7 more


Re: Using Kafka Producer inside Oracle DB

2017-07-06 Thread Stephen Durfey
Just to add to this, depending upon your use case it may be beneficial to
use kafka connect for pulling data out of oracle to publish to kafka. With
the JDBC connector  you would just need a few configs to stand up kafka
connect and start publishing data to kafka, either via a select statement
or a table whitelist.

http://docs.confluent.io/3.2.1/connect/index.html

On Thu, Jul 6, 2017 at 8:07 AM, Tauzell, Dave 
wrote:

> >> java.lang.NoClassDefFound Error
>
> You are missing some dependent classes.   Two questions:
>
> 1. Does the message have more information about what class it couldn't
> find?
> 2. What exactly are you putting into your jar file?
>
> -Dave
>
> -Original Message-
> From: Rahul R04 [mailto:rahul.kuma...@mphasis.com]
> Sent: Thursday, July 6, 2017 5:29 AM
> To: users@kafka.apache.org
> Subject: Using Kafka Producer inside Oracle DB
>
> Hi,
>
> I am trying to post Messages from Oracle to Kafka broker directly using
> custom java class for that I uploaded the required library in oracle and
> calling java function using oracle function/procedure. Basic java function
> is working fine but function in which Kafka is getting initialized and used
> throwing uncaught exception. Even I put whole function body inside
> try-catch block but still not getting my customized exception as message.
> I am having Oracle 12C that have JVM version 1.8.
>
> This message got popup on oracle SQL prompt at java function call
> (java.lang.NoClassDefFound Error).
>
> I created a jar file for my custom classes and uploaded it with it's
> respected libraries to oracle DB.
> I uploaded library files using below command.
>
> loadjava -u /@DB -resolve .jar
>
> Please tell how to get Kafka Producer initialized and start sending
> message, from inside oracle
>
> My custom classes are given below
>
> public class KafkaPublisher {
> static ProducerRecord PR = null; static Producer String> producer = null; static Properties prop = new Properties();
>
> public static void init() {
> prop.put("bootstrap.servers", "111.11.11.11:9092"); prop.put("acks",
> "1"); prop.put("retries", "0"); prop.put("batch.size", "16384"); prop.put("
> linger.ms", "1"); prop.put("buffer.memory", "33554432");
> prop.put("key.serializer", "org.apache.kafka.common.serialization.
> StringSerializer");
> prop.put("value.serializer", "org.apache.kafka.common.serialization.
> StringSerializer");
>
> }
>
> public static String push(String data) { String response = "No responce";
> init(); try { producer = new KafkaProducer(prop); PR = new
> ProducerRecord("topic1", data); Future
> future = producer.send(PR); RecordMetadata rMetaData = future.get();
> response = "Current Offset: " + rMetaData.offset(); } catch (Exception e) {
> response = "Message: " + e.getMessage() + " Cause: " + e.getCause(); }
> return response; }
>
> public static String getProperties(String msg) { return "My message: " +
> msg + prop.toString(); } }
>
> ## function getProperties(String msg) is working fine but function
> push(String data) is not working.
>
> Thanks and Regards,
>
> Rahul Kumar
> Information transmitted by this e-mail is proprietary to Mphasis, its
> associated companies and/ or its customers and is intended for use only by
> the individual or entity to which it is addressed, and may contain
> information that is privileged, confidential or exempt from disclosure
> under applicable law. If you are not the intended recipient or it appears
> that this mail has been forwarded to you without proper authority, you are
> notified that any use or dissemination of this information in any manner is
> strictly prohibited. In such cases, please notify us immediately at
> mailmas...@mphasis.com and delete this mail from your records.
> This e-mail and any files transmitted with it are confidential, may
> contain sensitive information, and are intended solely for the use of the
> individual or entity to whom they are addressed. If you have received this
> e-mail in error, please notify the sender by reply e-mail immediately and
> destroy all copies of the e-mail and any attachments.
>
>


Re: [DISCUSS] KIP-175: Additional '--describe' views for ConsumerGroupCommand

2017-07-06 Thread Jeff Widman
Thanks for the KIP Vahid. I think it'd be useful to have these filters.

That said, I also agree with Edo.

We don't currently rely on the output, but there's been more than one time
when debugging an issue that I notice something amiss when I see all the
data at once but if it wasn't present in the default view I probably would
have missed it as I wouldn't have thought to look at that particular
filter.

This would also be more consistent with the API of the kafka-topics.sh
where "--describe" gives everything and then can be filtered down.



On Tue, Jul 4, 2017 at 10:42 AM, Edoardo Comar  wrote:

> Hi Vahid,
> no we are not relying on parsing the current output.
>
> I just thought that keeping the full output isn't necessarily that bad as
> it shows some sort of history of how a group was used.
>
> ciao
> Edo
> --
>
> Edoardo Comar
>
> IBM Message Hub
>
> IBM UK Ltd, Hursley Park, SO21 2JN
>
> "Vahid S Hashemian"  wrote on 04/07/2017
> 17:11:43:
>
> > From: "Vahid S Hashemian" 
> > To: d...@kafka.apache.org
> > Cc: "Kafka User" 
> > Date: 04/07/2017 17:12
> > Subject: Re: [DISCUSS] KIP-175: Additional '--describe' views for
> > ConsumerGroupCommand
> >
> > Hi Edo,
> >
> > Thanks for reviewing the KIP.
> >
> > Modifying the default behavior of `--describe` was suggested in the
> > related JIRA.
> > We could poll the community to see whether they go for that option, or,
> as
> > you suggested, introducing a new `--only-xxx` ( can't also think of a
> > proper name right now :) ) option instead.
> >
> > Are you making use of the current `--describe` output and relying on the
>
> > full data set?
> >
> > Thanks.
> > --Vahid
> >
> >
> >
> >
> > From:   Edoardo Comar 
> > To: d...@kafka.apache.org
> > Cc: "Kafka User" 
> > Date:   07/04/2017 03:17 AM
> > Subject:Re: [DISCUSS] KIP-175: Additional '--describe' views for
>
> > ConsumerGroupCommand
> >
> >
> >
> > Thanks Vahid, I like the KIP.
> >
> > One question - could we keep the current "--describe" behavior unchanged
>
> > and introduce "--only-xxx" options to filter down the full output as you
>
> > proposed ?
> >
> > ciao,
> > Edo
> > --
> >
> > Edoardo Comar
> >
> > IBM Message Hub
> >
> > IBM UK Ltd, Hursley Park, SO21 2JN
> >
> >
> >
> > From:   "Vahid S Hashemian" 
> > To: dev , "Kafka User"
> 
> > Date:   04/07/2017 00:06
> > Subject:[DISCUSS] KIP-175: Additional '--describe' views for
> > ConsumerGroupCommand
> >
> >
> >
> > Hi,
> >
> > I created KIP-175 to make some improvements to the ConsumerGroupCommand
> > tool.
> > The KIP can be found here:
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-175%3A
> > +Additional+%27--describe%27+views+for+ConsumerGroupCommand
> >
> >
> >
> > Your review and feedback is welcome!
> >
> > Thanks.
> > --Vahid
> >
> >
> >
> >
> >
> > Unless stated otherwise above:
> > IBM United Kingdom Limited - Registered in England and Wales with number
>
> > 741598.
> > Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire PO6
> 3AU
> >
> >
> >
> >
>
> Unless stated otherwise above:
> IBM United Kingdom Limited - Registered in England and Wales with number
> 741598.
> Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire PO6 3AU
>


RE: Kafka partitions are piled up for Consumer

2017-07-06 Thread Ghosh, Achintya (Contractor)
This is not the case for us. Anyone has any other idea please? How we can put 
timeout for Kafka thread?

Thanks!

-Original Message-
From: Naanu Bora [mailto:naanub...@gmail.com] 
Sent: Thursday, July 06, 2017 1:41 PM
To: users@kafka.apache.org
Subject: Re: Kafka partitions are piled up for Consumer

I am observing the same with our .10.2 cluster where consumers are hanged for 
those partitions where the current offset's data gets deleted due to retention. 
It looks like a bug to me.

Thanks!

On Jul 6, 2017 9:23 AM, "Ghosh, Achintya (Contractor)" < 
achintya_gh...@comcast.com> wrote:

> Hi,
>
> If we have some high response in backend( let's say database high 
> response), the few of the partitions messages are piled up. Once the 
> backend gets normal still those partitions are not processing any 
> message and it creates a huge lag.
>
> Any idea why it happened? We thought once the backend gets back to 
> normal those partitions will be processing but its not happening. 
> Somehow some threads are stuck.
>
> Thanks
>
>


Loading and streaming data from Kafka to BigQuery

2017-07-06 Thread Ofir Sharony
Hi guys,

I would like to recommend the following post, discussing and comparing
techniques for loading data from Kafka to BigQuery.

https://medium.com/myheritage-engineering/kafka-to-bigquery-load-a-guide-for-streaming-billions-of-daily-events-cbbf31f4b737

Feedback is welcome.

*Ofir Sharony*
BackEnd Tech Lead

Mobile: +972-54-7560277 | ofir.shar...@myheritage.com | www.myheritage.com
MyHeritage Ltd., 3 Ariel Sharon St., Or Yehuda 60250, Israel




 



Consumer offset management in latest Kafka release

2017-07-06 Thread Bishnu Agrawal
Hello Team,

I have few questions on consumer offset management done in latest release,
if you could spend few minutes and answer the below questions - (please
answer based on latest release)

1. Is there any dependency of zookeeper in storing consumer offset ?
2. Can you give some insight on the structure it uses to store these
offsets ?

Any link or pointer would really help me if you can provide.

Thanks,
Bishnu


Re: Mirroring multiple clusters into one

2017-07-06 Thread James Cheng
I'm not sure what the "official" recommendation is. At TiVo, we *do* run all 
our mirrormakers near the target cluster. It works fine for us, but we're still 
fairly inexperienced, so I'm not sure how strong of a data point we should be.

I think the thought process is, if you are mirroring from a source cluster to a 
target cluster where there is a WAN between the two, then whichever request 
goes across the WAN has a higher chance of intermittent failure than the one 
over the LAN. That means that if mirrormaker is near the source cluster, the 
produce request over the WAN to the target cluster may fail. If the mirrormaker 
is near the target cluster, then the fetch request over the WAN to the source 
cluster may fail.

Failed fetch requests don't have much impact on data replication, it just 
delays it. Whereas a failure during a produce request may introduce duplicates.

Becket Qin from LinkedIn did a presentation on tuning producer performance at a 
meetup last year, and I remember he specifically talked about producing over a 
WAN as one of the cases where you have to tune settings. Maybe that 
presentation will give more ideas about what to look at. 
https://www.slideshare.net/mobile/JiangjieQin/producer-performance-tuning-for-apache-kafka-63147600

-James

Sent from my iPhone

> On Jul 6, 2017, at 1:00 AM, Vahid S Hashemian  
> wrote:
> 
> The literature suggests running the MM on the target cluster when possible 
> (with the exception of when encryption is required for transferred data).
> I am wondering if this is still the recommended approach when mirroring 
> from multiple clusters to a single cluster (i.e. multiple MM instances).
> Is there anything in particular (metric, specification, etc.) to consider 
> before making a decision?
> 
> Thanks.
> --Vahid
> 
> 


Re: Kafka partitions are piled up for Consumer

2017-07-06 Thread Naanu Bora
I am observing the same with our .10.2 cluster where consumers are hanged
for those partitions where the current offset's data gets deleted due to
retention. It looks like a bug to me.

Thanks!

On Jul 6, 2017 9:23 AM, "Ghosh, Achintya (Contractor)" <
achintya_gh...@comcast.com> wrote:

> Hi,
>
> If we have some high response in backend( let's say database high
> response), the few of the partitions messages are piled up. Once the
> backend gets normal still those partitions are not processing any message
> and it creates a huge lag.
>
> Any idea why it happened? We thought once the backend gets back to normal
> those partitions will be processing but its not happening. Somehow some
> threads are stuck.
>
> Thanks
>
>


Re: Kafka connector throughput reduction upon avro schema change

2017-07-06 Thread Dave Hamilton
Bumping this. Has anyone here observed this in their Kafka connect deployments?

Thanks,
Dave


On 5/26/17, 1:44 PM, "Dave Hamilton"  wrote:

We are currently using the Kafka S3 connector to ship Avro data to S3. We 
made a change to one of our Avro schemas and have noticed consumer throughput 
on the Kafka connector drop considerably. I am wondering if there is anything 
we can do to avoid such issues when we update schemas in the future?

This is what I believe is happening:


· The avro producer application is running on 12 instances. They 
are restarted in a rolling fashion, switching from producing schema version 1 
before the restart to schema version 2 afterward.

· While the rolling restart is occurring, data on schema version 1 
and schema version 2 is simultaneously being written to the topic.

· The Kafka connector has to close the current avro file for a 
partition and ship it whenever it detects a schema change, which is happening 
several times due to the rolling nature of the schema update deployment and the 
mixture of message versions being written during this time. This process causes 
the overall consumer throughput to plummet.

Am I reasoning correctly about what we’re observing here? Is there any way 
to avoid this when we change schemas (short of stopping all instances of the 
service and bringing them up together on the new schema version)?

Thanks,
Dave





Kafka partitions are piled up for Consumer

2017-07-06 Thread Ghosh, Achintya (Contractor)
Hi,

If we have some high response in backend( let's say database high response), 
the few of the partitions messages are piled up. Once the backend gets normal 
still those partitions are not processing any message and it creates a huge lag.

Any idea why it happened? We thought once the backend gets back to normal those 
partitions will be processing but its not happening. Somehow some threads are 
stuck.

Thanks



PLEASE - [KAFKA-1194] Log Retention is always throwing IOException

2017-07-06 Thread M. Manna
Hi,

I have sent numerous emails in the past about the same issue, but no
response so far.

I was wondering if we will get a patch. I am working on a synchronisation
PoC which is reliant on Log cleanup to be successful every day. I have got
auto.offset.reset=earliest and the offsets.retention.minutes is set
sufficiently large (2880) to avoid such issues. I do have a
log.cleanup.policy=compact, but what I understand from the documentation
that deletion occurs after "Compact" for older logs anyway.

I have moved my log.dir locations several places and with full access to
the directories, but still same issue as KAFKA-1194. Could someone please
let me know if this has worked for you i.e. the logs were deleted
successfully after certain period?

KR,
MM


Unusual(?) behaviour in mirror maker cluster

2017-07-06 Thread Max Weaver
Hi,

We've deployed an EU based mirror maker cluster to consume topics from
clusters situated in the US.

We have five brokers in the mm cluster, but it seems to be that one
particular one consistently has higher throughput than all the others
(which stay around the same).

We are also having issues with lag, especially with certain topics. The
mirror maker is hosted on the target cluster and our producer / consumer
settings look like this:

producer.properties:

bootstrap.servers=localhost:9092
acks=1
retries=2147483647
client.id=mm.glb.data.producer

consumer.properties:

bootstrap.servers=us-broker-1:9092,...:9092
group.id=mm_data_production
exclude.internal.topics=true
client.id=mm.us1.data.consumer
partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor

The mirror maker itself is running as a service, which looks like:

[Unit]
Description=Kafka-mm
After=network.target

[Service]
User=kafka
Group=kafka
Environment=KAFKA_HEAP_OPTS=-Xmx1G
SyslogIdentifier=kafka-mm
ExecStart=/opt/kafka/bin/kafka-run-class.sh kafka.tools.MirrorMaker
--consumer.config /opt/kafka/config/consumer.properties --num.streams 24
--producer.config /opt/kafka/config/producer.properties --whitelist="*"
Restart=on-failure

[Install]
WantedBy=multi-user.target


each topic has 24 partitions and a replication factor of 2. There are 9
topics in total.

Perhaps someone could come up with an idea why this is happening?

Many thanks

Max


Re: Kafka Streams 0.10.2.1 client crash - .checkpoint.tmp (No such file or directory)

2017-07-06 Thread Ian Duffy
Hi Damian,

Sorry for the delayed reply have been out of office.

I'm afraid I cannot check. We have alarms on our auto scaling groups for
stream instances to kill them should the CPU utilization be < 1 for 30
mins.

On Fri 30 Jun 2017 at 5:05 p.m., Damian Guy  wrote:

> Hi Ian,
>
> Can you check if the file exists and it is indeed a file rather then a
> directory?
>
> Thanks,
> Damian
>
> On Fri, 30 Jun 2017 at 16:45 Damian Guy  wrote:
>
> > Hi Ian,
> >
> > We had another report of what looks like the same issue. Will look into
> it.
> >
> > Thanks,
> > Damian
> >
> > On Fri, 30 Jun 2017 at 16:38 Ian Duffy  wrote:
> >
> >> Hi All,
> >>
> >> I was wondering if any of those who know stream internals should shed
> any
> >> light on the following exception:
> >>
> >> org.apache.kafka.streams.errors.ProcessorStateException: Error while
> >> closing the state manager at
> >>
> >>
> org.apache.kafka.streams.processor.internals.AbstractTask.closeStateManager(AbstractTask.java:133)
> >> at
> >>
> >>
> org.apache.kafka.streams.processor.internals.StreamThread.closeNonAssignedSuspendedTasks(StreamThread.java:898)
> >> at
> >>
> >>
> org.apache.kafka.streams.processor.internals.StreamThread.access$500(StreamThread.java:69)
> >> at
> >>
> >>
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:233)
> >> at
> >>
> >>
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259)
> >> at
> >>
> >>
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352)
> >> at
> >>
> >>
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
> >> at
> >>
> >>
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290)
> >> at
> >>
> >>
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029)
> >> at
> >>
> >>
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
> >> at
> >>
> >>
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592)
> >> at
> >>
> >>
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
> >> Caused by: java.io.FileNotFoundException:
> >> /data/kafka-streams-freshness/freshness-id-prod/2_46/.checkpoint.tmp (No
> >> such file or directory) at java.io.FileOutputStream.open0(Native Method)
> >> at
> >> java.io.FileOutputStream.open(FileOutputStream.java:270) at
> >> java.io.FileOutputStream.(FileOutputStream.java:213) at
> >> java.io.FileOutputStream.(FileOutputStream.java:162) at
> >>
> >>
> org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:71)
> >> at
> >>
> >>
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.close(ProcessorStateManager.java:386)
> >> at
> >>
> >>
> org.apache.kafka.streams.processor.internals.AbstractTask.closeStateManager(AbstractTask.java:131)
> >> ... 11 common frames omitted
> >>
> >> Thanks,
> >> Ian.
> >>
> >
>


RE: Using Kafka Producer inside Oracle DB

2017-07-06 Thread Tauzell, Dave
>> java.lang.NoClassDefFound Error

You are missing some dependent classes.   Two questions:

1. Does the message have more information about what class it couldn't find?
2. What exactly are you putting into your jar file?

-Dave

-Original Message-
From: Rahul R04 [mailto:rahul.kuma...@mphasis.com]
Sent: Thursday, July 6, 2017 5:29 AM
To: users@kafka.apache.org
Subject: Using Kafka Producer inside Oracle DB

Hi,

I am trying to post Messages from Oracle to Kafka broker directly using custom 
java class for that I uploaded the required library in oracle and calling java 
function using oracle function/procedure. Basic java function is working fine 
but function in which Kafka is getting initialized and used throwing uncaught 
exception. Even I put whole function body inside try-catch block but still not 
getting my customized exception as message.
I am having Oracle 12C that have JVM version 1.8.

This message got popup on oracle SQL prompt at java function call 
(java.lang.NoClassDefFound Error).

I created a jar file for my custom classes and uploaded it with it's respected 
libraries to oracle DB.
I uploaded library files using below command.

loadjava -u /@DB -resolve .jar

Please tell how to get Kafka Producer initialized and start sending message, 
from inside oracle

My custom classes are given below

public class KafkaPublisher {
static ProducerRecord PR = null; static Producer producer = null; static Properties prop = new Properties();

public static void init() {
prop.put("bootstrap.servers", "111.11.11.11:9092"); prop.put("acks", "1"); 
prop.put("retries", "0"); prop.put("batch.size", "16384"); 
prop.put("linger.ms", "1"); prop.put("buffer.memory", "33554432"); 
prop.put("key.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
prop.put("value.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");

}

public static String push(String data) { String response = "No responce"; 
init(); try { producer = new KafkaProducer(prop); PR = new 
ProducerRecord("topic1", data); Future future = 
producer.send(PR); RecordMetadata rMetaData = future.get(); response = "Current 
Offset: " + rMetaData.offset(); } catch (Exception e) { response = "Message: " 
+ e.getMessage() + " Cause: " + e.getCause(); } return response; }

public static String getProperties(String msg) { return "My message: " + msg + 
prop.toString(); } }

## function getProperties(String msg) is working fine but function push(String 
data) is not working.

Thanks and Regards,

Rahul Kumar
Information transmitted by this e-mail is proprietary to Mphasis, its 
associated companies and/ or its customers and is intended for use only by the 
individual or entity to which it is addressed, and may contain information that 
is privileged, confidential or exempt from disclosure under applicable law. If 
you are not the intended recipient or it appears that this mail has been 
forwarded to you without proper authority, you are notified that any use or 
dissemination of this information in any manner is strictly prohibited. In such 
cases, please notify us immediately at mailmas...@mphasis.com and delete this 
mail from your records.
This e-mail and any files transmitted with it are confidential, may contain 
sensitive information, and are intended solely for the use of the individual or 
entity to whom they are addressed. If you have received this e-mail in error, 
please notify the sender by reply e-mail immediately and destroy all copies of 
the e-mail and any attachments.



Order of ConsumerRecords obtained by records(TopicPartition) in Kafka Consumer API

2017-07-06 Thread M. Manna
Hello,

I had a question regarding records(TopicPartition partition) method in
ConsumerRecords

I am trying to use auto.offset.reset=earliest and combine it with manual
offset control.
 I saw this sample code under "Manual Offset"

https://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html

>From the API, I can see that records(TopicPartition) says:

> public List
> 
>  
>  
> ,V
> 
> >> records(TopicPartition
> 
>  partition)
> Get just the records for the given partition



I am assuming that the "order" of ConsumerRecord is maintained in the list
somewhere but couldn't find the source (probably exhausted eyes). Does
anyone know where to find the information about the ordering?

KR,


Re: Kafka shutdown gracefully

2017-07-06 Thread Kamal C
Don't use `kill -9 PID`. Use `kill -s TERM PID` - sends a signal to the
process to end, and will trigger any cleanup routines before exiting.

Since the output of the `ps` command used by kafka-server-stop.sh exceeds
4096 characters. It shows "No kafka server to stop"

On Thu, Jul 6, 2017 at 3:25 AM, Ghosh, Achintya (Contractor) <
achintya_gh...@comcast.com> wrote:

> Hi team,
>
> What is the command to shutdown kafka server gracefully instead of using
> 'kill -9 PID'?
>
> If we use bin/kafka-server-stop.sh it shows "No kafka server to stop" but
> the service actually running and I see the PID by using "ps -ef|grep kafka"
>
>
> Thanks
> Achintya
>


Broker leaving cluster--even though Kafka broker remains up

2017-07-06 Thread John Yost
Hi Everyone,

What causes a broker to leave a cluster even when the broker remains
running? Is it loss of sync with Zookeeper?

--John


Using Kafka Producer inside Oracle DB

2017-07-06 Thread Rahul R04
Hi,

I am trying to post Messages from Oracle to Kafka broker directly using custom 
java class for that I uploaded the required library in oracle and calling java 
function using oracle function/procedure. Basic java function is working fine 
but function in which Kafka is getting initialized and used throwing uncaught 
exception. Even I put whole function body inside try-catch block but still not 
getting my customized exception as message.
I am having Oracle 12C that have JVM version 1.8.

This message got popup on oracle SQL prompt at java function call 
(java.lang.NoClassDefFound Error).

I created a jar file for my custom classes and uploaded it with it's respected 
libraries to oracle DB.
I uploaded library files using below command.

loadjava -u /@DB -resolve .jar

Please tell how to get Kafka Producer initialized and start sending message, 
from inside oracle

My custom classes are given below

public class KafkaPublisher {
static ProducerRecord PR = null;
static Producer producer = null;
static Properties prop = new Properties();

public static void init() {
prop.put("bootstrap.servers", "111.11.11.11:9092");
prop.put("acks", "1");
prop.put("retries", "0");
prop.put("batch.size", "16384");
prop.put("linger.ms", "1");
prop.put("buffer.memory", "33554432");
prop.put("key.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
prop.put("value.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");

}

public static String push(String data) {
String response = "No responce";
init();
try {
producer = new KafkaProducer(prop);
PR = new ProducerRecord("topic1", data);
Future future = producer.send(PR);
RecordMetadata rMetaData = future.get();
response = "Current Offset: " + rMetaData.offset();
} catch (Exception e) {
response = "Message: " + e.getMessage() + " Cause: " + e.getCause();
}
return response;
}

public static String getProperties(String msg) {
return "My message: " + msg + prop.toString();
}
}

## function getProperties(String msg) is working fine but function push(String 
data) is not working.

Thanks and Regards,

Rahul Kumar
Information transmitted by this e-mail is proprietary to Mphasis, its 
associated companies and/ or its customers and is intended 
for use only by the individual or entity to which it is addressed, and may 
contain information that is privileged, confidential or 
exempt from disclosure under applicable law. If you are not the intended 
recipient or it appears that this mail has been forwarded 
to you without proper authority, you are notified that any use or dissemination 
of this information in any manner is strictly 
prohibited. In such cases, please notify us immediately at 
mailmas...@mphasis.com and delete this mail from your records.