Re: [DISCUSS] Streams DSL/StateStore Refactoring

2017-07-24 Thread Jan Filipiak

Hi Damian,

thanks for taking the time. I think you read my points individually but 
you seem to not understand the bigger picture I am trying to paint.


From the three problems I mentioned - and that you agreed on to be 
problems -  you are only trying to address the first.


What I am trying to tell you is that if you focus on the later two the 
first one comes for free. On the other hand if you focus on the first
and please allow me to call it the easy part. All you going to archive 
is to break user land and sugar coat the real problems.


This takes away overloads, but still leaves it a mess to implement new 
features. I am currently trying to prep a patch for Kafka-3705 and
I do not understand why I should deal with Interactive Queries what so 
ever. My Output table has a proper ValueGetterSupplier.

That should be it!

I hope I made clear that to improve here quite some hard work has been 
done and that it would be rewariding and that just sugar coating everything

is one of the worst steps we could take from where we are at the moment.

Looking at Kafka-5581 that you mentioned. None of the points are really 
related to what I am saying really. Each of these points is tricky and

requires some carefull thinking but might work out.

Further Looking at you comment that refers to KIP vs. DISCUSS. I don't 
know what I should understand from that.


Regarding your comment mentioning that getQueryHandle() wouldn't work. 
Its the same thing as giving the user a queryable string.
It works the same way with the only difference that we have a wrapper 
object that gives the user what he wants instantly! Instead of giving 
him a String
to get a Store, we just give him a store, plus we don't hand out some 
inflexible native types that we later on don't have control over.

The whole logic about partitioners and what else does not change.

Hope this makes my points more clear.

Best Jan


On 19.07.2017 12:03, Damian Guy wrote:

Hi Jan,

Thanks for your input. Comments inline

On Tue, 18 Jul 2017 at 15:21 Jan Filipiak  wrote:


Hi,


1. To many overloads:
Currently, whenever a KTable is the result of an operation it gets and
override with stateStoreName, and StatestoreSupplier in case people want
to query that.
This is what produces 2/3rd of the overloaded methods right now (not
counting methods returning KStream)



As you state further down we are trying to address this.



2. Code copy and pasting.
Almost all KTableProcessorSuppliers have the same block of (if(name !=
null) store.put(k,v))



Yes, i agree. That is related to the KTable queryable store etc, and can
easily be addressed, but isn't necessarily part of this as it doesn't need
to be a public interface change, i.e., we can clean that up in the
background.



3. Runtime inefficiencies.
Each querable table almost instantly causes a another store beeing
required. Storing equivalent data of upstream KTables.


Agreed. Again, this is not a public interface change. We don't need another
store, i.e., we can just use a "View" on the existing store, which is
basically just using the KTableValueGetter to apply the map or filter
operation to the original store. We also have this jira
https://issues.apache.org/jira/browse/KAFKA-5581 to look into optimizing
when we do and don't need to add additional changelogs.



So I really see us tackeling only the first part currently. Wich in my
opinion is to short-sighted to settle on an Public API.


We are not settling on the public API. We do, however need to do KIPs for
public API discussions. For internal changes we don't necessarily need to
have a public discussion about it.



This is why I want to tackle our approach to IQ-first, as it seems to me
to be the most disruptive thing. And the cause of most problems.

The Plan:

Table from topic, kstream (don't even like this one, but probaly needed
for some kind of enhanced flexibility) or aggregations would be the only
KTables that would get associated with a statestore (their processors).
For these operations one can have the "statestoresupplier" overload but
also not the "querablestatestore" overload. From this point on KTables
abstraction would be considered restored.
All the overloads of join and through with respect to IQ would go away.
"through" would go completely maybe the benefit added is. The method I
would add is for a table to get a Queryhandle.
This query handle will underneath remember its tables processor name. To
access the data form IQ we would not rely on the "per processor
statestore" but go the usual path through ValueGetterSupplier.
*Note:* We do not necessarily have a Serde for V, especially after
mapValues. also not for any intermediate Data types. It would be each
KTableProccesors job to provide a serialized version of upstream Datatypes.
KTableKTabkeJoinwould need to bring a JoinInputSerializer that
would serialize both upstream values for transport across boxes.

This first step would kill all the "Storename" based overloads + many
Statestore 

how about kafka test

2017-07-24 Thread fuyou
kafka is a distributed, partitioned, replicated commit log service message
queue, i am interesting how kafka test.

transaction message ,exactly once message ,replication ,any document about
kafka test?

how to ensure kafka work as except?

-- 
   =

  fuyou001
Best Regards


Re: Logs truncated at o'clock

2017-07-24 Thread mosto...@gmail.com

anyone?


On 17/07/17 15:24, mosto...@gmail.com wrote:


ping?


On 13/07/17 17:09, mosto...@gmail.com wrote:


Hi

With swiss precission, our kafka test environment seems to truncate 
topics at o'clock hours.


This might be confirmed with the following trace, which states 
"Truncating log ... to offset 0"


We are still using Kafka 0.10.2.1, but I was wondering if this is 
resolved in recent versions, it's a know bug or just a 
misconfiguration (I think it is shown below).


Thanks

Jul 13 16:00:00 computer kafka[28511]: [2017-07-13 16:00:00,986]
INFO [TopicChangeListener on Controller 1002]: New topics:
[Set(mytopic.2017-07-13-16)], deleted topics: [Set()], new
partition replica assignment [Map([mytopic.2017-07-13-16,0] ->
List(1001, 1002), [mytopic.2017-07-13-16,2] -> List(1003, 1001),
[mytopic.2017-07-13-16,1] -> List(1002, 1003))]
(kafka.controller.PartitionStateMachine$TopicChangeListener)
Jul 13 16:00:00 computer kafka[28511]: [2017-07-13 16:00:00,987]
INFO [Controller 1002]: New topic creation callback for

[mytopic.2017-07-13-16,0],[mytopic.2017-07-13-16,2],[mytopic.2017-07-13-16,1]
(kafka.controller.KafkaController)
Jul 13 16:00:00 computer kafka[28511]: [2017-07-13 16:00:00,988]
INFO [Controller 1002]: New partition creation callback for

[mytopic.2017-07-13-16,0],[mytopic.2017-07-13-16,2],[mytopic.2017-07-13-16,1]
(kafka.controller.KafkaController)
Jul 13 16:00:00 computer kafka[28511]: [2017-07-13 16:00:00,988]
INFO [Partition state machine on Controller 1002]: Invoking state
change to NewPartition for partitions

[mytopic.2017-07-13-16,0],[mytopic.2017-07-13-16,2],[mytopic.2017-07-13-16,1]
(kafka.controller.PartitionStateMachine)
Jul 13 16:00:00 computer kafka[28511]: [2017-07-13 16:00:00,988]
INFO [Replica state machine on controller 1002]: Invoking state
change to NewReplica for replicas

[Topic=mytopic.2017-07-13-16,Partition=1,Replica=1003],[Topic=mytopic.2017-07-13-16,Partition=0,Replica=1001],[Topic=mytopic.2017-07-13-16,Partition=2,Replica=1001],[Topic=mytopic.2017-07-13-16,Partition=1,Replica=1002],[Topic=mytopic.2017-07-13-16,Partition=2,Replica=1003],[Topic=mytopic.2017-07-13-16,Partition=0,Replica=1002]
(kafka.controller.ReplicaStateMachine)
Jul 13 16:00:00 computer kafka[28511]: [2017-07-13 16:00:00,993]
INFO [Partition state machine on Controller 1002]: Invoking state
change to OnlinePartition for partitions

[mytopic.2017-07-13-16,0],[mytopic.2017-07-13-16,2],[mytopic.2017-07-13-16,1]
(kafka.controller.PartitionStateMachine)
Jul 13 16:00:01 computer kafka[28511]: [2017-07-13 16:00:01,036]
INFO [Replica state machine on controller 1002]: Invoking state
change to OnlineReplica for replicas

[Topic=mytopic.2017-07-13-16,Partition=1,Replica=1003],[Topic=mytopic.2017-07-13-16,Partition=0,Replica=1001],[Topic=mytopic.2017-07-13-16,Partition=2,Replica=1001],[Topic=mytopic.2017-07-13-16,Partition=1,Replica=1002],[Topic=mytopic.2017-07-13-16,Partition=2,Replica=1003],[Topic=mytopic.2017-07-13-16,Partition=0,Replica=1002]
(kafka.controller.ReplicaStateMachine)
Jul 13 16:00:01 computer kafka[28511]: [2017-07-13 16:00:01,037]
INFO [ReplicaFetcherManager on broker 1002] Removed fetcher for
partitions mytopic.2017-07-13-16-1
(kafka.server.ReplicaFetcherManager)
Jul 13 16:00:01 computer kafka[28511]: [2017-07-13 16:00:01,039]
INFO Completed load of log mytopic.2017-07-13-16-1 with 1 log
segments and log end offset 0 in 0 ms (kafka.log.Log)
Jul 13 16:00:01 computer kafka[28511]: [2017-07-13 16:00:01,040]
INFO Created log for partition [mytopic.2017-07-13-16,1] in
/data/kafka-1 with properties {compression.type -> producer,
message.format.version -> 0.10.2-IV0, file.delete.delay.ms ->
6, max.message.bytes -> 112, min.compaction.lag.ms -> 0,
message.timestamp.type -> CreateTime, min.insync.replicas -> 1,
segment.jitter.ms -> 0, preallocate -> false,
min.cleanable.dirty.ratio -> 0.5, index.interval.bytes -> 4096,
unclean.leader.election.enable -> true, retention.bytes -> -1,
delete.retention.ms -> 360, cleanup.policy -> [delete],
flush.ms -> 30, segment.ms -> 60, segment.bytes ->
1073741824, retention.ms -> 1440,
message.timestamp.difference.max.ms -> 9223372036854775807,
segment.index.bytes -> 10485760, flush.messages -> 1}.
(kafka.log.LogManager)
Jul 13 16:00:01 computer kafka[28511]: [2017-07-13 16:00:01,045]
INFO Partition [mytopic.2017-07-13-16,1] on broker 1002: No
checkpointed highwatermark is found for partition
mytopic.2017-07-13-16-1 (kafka.cluster.Partition)
Jul 13 16:00:01 computer kafka[28511]: [2017-07-13 16:00:01,047]
INFO Completed load of log mytopic.2017-07-13-16-0 with 1 log
segments and log end offset 0 in 1 ms (kafka.log.Log)
Jul 13 16:00:01 computer kafka[28511]: [2017-07-13 16:00:01,047]
  

Re: decreasing loglevel for basic creation-roll-delete events

2017-07-24 Thread mosto...@gmail.com

anyone?


On 17/07/17 15:21, mosto...@gmail.com wrote:


ping?


On 13/07/17 16:53, mosto...@gmail.com wrote:

Hi

While testing Kafka in our environment, we have noticed it creates A 
LOT of "debug" logs (not to be confused with topic logs!).

eg:
Jul 13 16:44:26 host kafka[28511]: [2017-07-13 16:44:26,879] INFO 
Deleting index 
/data/kafka-1/group-index-type.2017-07-13-12-0/00439596.timeindex.deleted 
(kafka.log.TimeIndex)


Config snippet:

default.replication.factor=2
delete.topic.enable=true
log.cleaner.delete.retention.ms=360
log.flush.interval.messages=1
log.flush.scheduler.interval.ms=6
log.retention.hours=4
log.roll.ms=60
num.partitions=3
reconfigEnabled=true


Wouldn't it make sense to *set index/segment creation, roll and 
deletion to a lower loglevel like FINE or DEBUG?*


Maybe (can I?) we could set "log4j.logger.kafka.log.Log=WARN", but 
IMHO it makes sense to reduce verbosity...


Thanks







How to tune Kafka High Level Consumer for Production

2017-07-24 Thread Rachana Srivastava
I want to know how to tune/setup high level Kafka Client to a Kafka server in 
EC2 I set zookeeper.session.timeout.ms=5. I found that after some time I 
got following error in the logs. I want to know how to tune Kafka parameters to 
run the consumer for ever. I checked and found ZK is running just fine only 
Kafka consumer is throwing exception. I am using 0.8.2.2 version of Kafka 
Client because my production Kafka server in EC2 is using 8.2.2.2 version of 
Kafka.



executor = Executors.newFixedThreadPool(threadCount);
for (final KafkaStream stream : streams) {
executor.submit(new EmailProcessorThread(stream, threadNumber, 
context ,  redisTemplate));
}

public void run() {
while(true){
ConsumerIterator it = stream.iterator();
while (it.hasNext()) {
try{
LOG.info("Message received from the topic is " + new 
String(it.next().message()));
ObjectMapper mapper = new ObjectMapper();
String recordValue = new String(it.next().message());
Email email = mapper.readValue(recordValue, 
Email.class);
System.out.println("^ Email data 
received for processing is " + email.toString());
MessageListener listener = new MessageListener();
listener.processEmail( email, redisTemplate, context) ;
}catch(Throwable e){
e.printStackTrace();
   LOG.info("Error received while processing message " + e);
}

}
}
}

2017-07-23 00:10:13.488  WARN 48209 --- [main-SendThread(10.202.138.126:2181)] 
org.apache.zookeeper.ClientCnxn  : Session 0x15ac97701bb91f0 for server 
:2181, unexpected error, closing socket connection and attempting 
reconnect

java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.read0(Native Method) ~[na:1.8.0_91]
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) 
~[na:1.8.0_91]
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) 
~[na:1.8.0_91]
at sun.nio.ch.IOUtil.read(IOUtil.java:192) ~[na:1.8.0_91]
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380) 
~[na:1.8.0_91]
at 
org.apache.zookeeper.ClientCnxnSocketNIO.doIO(ClientCnxnSocketNIO.java:68) 
~[zookeeper-3.4.6.jar!/:3.4.6-1569965]
at 
org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:366)
 ~[zookeeper-3.4.6.jar!/:3.4.6-1569965]
at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1081) 
~[zookeeper-3.4.6.jar!/:3.4.6-1569965]



Re: Event sourcing with Kafka and Kafka Streams. How to deal with atomicity

2017-07-24 Thread Ben Stopford
No worries Jose ;-)

So there are a few ways you could do this, but I think it’s important that
you manage a single “stock level” state store, backed by a changelog. Use
this for validation, and keep it up to date at the same time. You should
also ensure the input topic(s) are partitioned by productId so any update
to, or validation of, the same product will be sequenced. This effectively
ensures the mutations of the quantities in stock will be atomic.

So say we have two inputs: OrderRequests, StockUpdates

Order requests need to validate that there is sufficient stock, via the
product store, then decrement the stock value in that store:

public Event validateInventory(OrderRequestEvent order, KeyValueStore<>
store){

Long stockCount = store.get(order.product);

if (stockCount - order.quantity >= 0) {

//decrement the value in the store

store.put(order.product, stockCount - order.amount);

return new OrderValidatedEvent(Validation.Passed);

} else

   return new OrderValidatedEvent(Validation.Failed);

}

Stock updates need to increase the stock value in the product store as new
stock arrives.

public void updateStockStore(StockUpdateEvent update, KeyValueStore<>
store){

Long current = update.get(update.product);

store.put(update.product, current + update.amount);

}

To do the processing we merge input streams, then push this into a
transfomer, that uses a single state store to manage the mapping between
products and their stock levels.

KStream unvalidatedOrdersStream =
builder.stream(orderTopic);

KStream stockStream = builder.stream(stockUpdateTopic);

StateStoreSupplier productStore = Stores.create(productStoreName)...build()

KStream orderOutputs =

unvalidatedOrdersStream.outerJoin(stockStream, ...)

.transform(StockCheckTransformer::new, productStoreName)

.filter((key, value) -> value != "");

orderOutputs.to(validatedOrdersStream);


With the transformer both managing and validating against the stock levels.

StockCountTransformer { ….

public KeyValue transform(ProductId key, Event event)

if (event.isStockUpdate()) {

Stock update = parseStock(value);

return KeyValue.pair(key,

updateStockStore(parseStockUpdate(event), productStore))

  } else if (event.isOrderRequest()) {

return KeyValue.pair(key,

validateInventory(parseOrderReq(event), productStore))

}

}

}

Now the stock levels will be held in the changelog topic which backs the
ProductStore which we can reuse if we wish.

I think we could also optimise this code a bit by splitting into two
transformers via streams.branch(..).

Regarding EoS. This doesn’t add any magic to your processing logic. It just
guarantees that your stock count will be accurate in the face of failure
(i.e. you don’t need to manage idempotence yourself).

B


On Sat, Jul 22, 2017 at 12:52 PM José Antonio Iñigo <
joseantonio.in...@gmail.com> wrote:

> Hi Garret,
>
> At the moment, to simplify the problem I only have one topic, orders, where
> I add products and decrement them based on ProductAdded and ProductReserved
> events.
>
> Yeaterday I was reading about EoS but I don't know if it'll solve the
> problem. Dividing the query-update in two steps means that the event
> ordering could be:
>
> OrderPlaced (query stock ok)
> OrderPlaced (query stock ok)
> ProductReserved (update stock)
> ProductReserved (update stock)
>
> Regarding EoS this sequence is correct, the messages are delivered once in
> the order in which they were generated. The problem is the order itself: if
> there were a way to query-update-store-generate-event in one step to
> produce instead the following sequence of events there wouldn't be any
> problem:
>
> OrderPlaced->ProductReserved (query stock ok + Update stock store +
> reserved event)
> OrderPlaced->ProductNoStock (query stock fail so no update and out-of-stock
> event)
>
> Regards
>
> On Sat, 22 Jul 2017 at 05:35, Garrett Barton 
> wrote:
>
> > Could you take in both topics via the same stream? Meaning don't do a
> kafka
> > streams join, literally just read both streams. If KStream cant do this,
> > dunno haven't tried, then simple upstream merge job to throw them into 1
> > topic with same partitioning scheme.
> >
> > I'd assume you would have the products stream that would be some kind of
> > incrementer on state (within the local state store).  The Orders stream
> > would act as a decrement to the same stream task.  Exactly once semantics
> > and you skirt the issue of having to wait for the update to come back
> > around.
> >
> > Thoughts?
> >
> > On Fri, Jul 21, 2017 at 6:15 PM, José Antonio Iñigo <
> > joseantonio.in...@gmail.com> wrote:
> >
> > > Hi Chris,
> > >
> > >
> > >
> > > *"if I understand your problem correctly, the issue is that you need
> > > todecrement the stock count when you reserve it, rather than splitting
> > it*
> > > *into a second phase."*
> > >
> > > That's exactly the problem, I would need to:
> > >
> > > 1) Read the OrderPlaced event f

Re: [DISCUSS] KIP-163: Lower the Minimum Required ACL Permission of OffsetFetch

2017-07-24 Thread Ewen Cheslack-Postava
Vahid,

Thanks for the KIP. I think we're mostly in violent agreement that the lack
of any Write permissions on consumer groups is confusing. Unfortunately
it's a pretty annoying issue to fix since it would require an increase in
permissions. More generally, I think it's unfortunate because by squeezing
all permissions into the lowest two levels, we have no room for refinement,
e.g. if we realize some permission needs to have a lower level of access
but higher than Describe, without adding new levels.

I'm +1 on the KIP. I don't think it's ideal given the discussion of Read vs
Write since I think Read is the correct permission in theory, but given
where we are now it makes sense.

Regarding the extra food for thought, I think such a change would require
some plan for how to migrate people over to it. The main proposal in the
KIP works without any migration plan because it is reducing the required
permissions, but changing the requirement for listing a group to Describe
(Group) would be adding/changing the requirements, which would be backwards
incompatible. I'd be open to doing it, but it'd require some thought about
how it would impact users and how we'd migrate them to the updated rule (or
just agree that it is a bug and that including upgrade notes would be
sufficient).

-Ewen

On Mon, Jul 10, 2017 at 1:12 PM, Vahid S Hashemian <
vahidhashem...@us.ibm.com> wrote:

> I'm bumping this up again to get some feedback, especially from some of
> the committers, on the KIP and on the note below.
>
> Thanks.
> --Vahid
>
>
>
>
> From:   "Vahid S Hashemian" 
> To: d...@kafka.apache.org
> Cc: "Kafka User" 
> Date:   06/21/2017 12:49 PM
> Subject:Re: [DISCUSS] KIP-163: Lower the Minimum Required ACL
> Permission of OffsetFetch
>
>
>
> I appreciate everyone's feedback so far on this KIP.
>
> Before starting a vote, I'd like to also ask for feedback on the
> "Additional Food for Thought" section in the KIP:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 163%3A+Lower+the+Minimum+Required+ACL+Permission+of+OffsetFetch#KIP-163:
> LowertheMinimumRequiredACLPermissionofOffsetFetch-AdditionalFoodforThought
>
> I just added some more details in that section, which I hope further
> clarifies the suggestion there.
>
> Thanks.
> --Vahid
>
>
>
>
>
>
>
>
>
>
>


Schema Registry on DC/OS

2017-07-24 Thread Debasish Ghosh
Hi -

Is it possible to run schema registry service on DC/OS ? I checked the
Confluent Kafka package on Mesosphere Universe. It doesn't have any support
for running Schema Registry. Also this thread
https://stackoverflow.com/questions/37322078/cant-start-confluent-2-0-apache-kafka-schema-registry-in-dc-os
on SoF says the same thing. Just wondering if there has been any progress
on this front ..

regards.

-- 
Debasish Ghosh
http://manning.com/ghosh2
http://manning.com/ghosh

Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg


Kafka error after SSL enabled - Bootstrap broker-name :6667 disconnected (org.apache.kafka.clients.NetworkClient)

2017-07-24 Thread karan alang
Hello - i've enabled SSL for Kafka, and Kafka is starting up fine with SSL
enable.

However, when i run the Kafka console producer, it is give me error as
shown below ->



   1. Command :
   2.
   3. /usr/hdp/2.5.3.0-37/kafka/bin/kafka-console-producer.sh --broker-list
   nwk2-bdp-kafka-05.gdcs-qa.apple.com:6667,nwk2-bdp-kafka-04.gdcs-qa.apple.
   com:6667,nwk2-bdp-kafka-06.gdcs-qa.apple.com:6667 --topic sslTopic --
   producer.config /tmp/ssl-kafka/client-ssl.properties
   4.
   5. Message Typed on console :
   6.  hi
   7.
   8. On Typing message on the Console Producer, i get the following error :
   9.
   10. [2017-07-24 19:10:22,940] WARN Bootstrap broker nwk2-bdp-kafka-
   06.gdcs-qa.apple.com:6667 disconnected (org.apache.kafka.clients.
   NetworkClient)
   11. [2017-07-24 19:10:23,106] WARN Bootstrap broker nwk2-bdp-kafka-
   05.gdcs-qa.apple.com:6667 disconnected (org.apache.kafka.clients.
   NetworkClient)


Attached is the client-ssl.properties file, used to start the Console
produce

security.protocol=SSL
ssl.truststore.location=/tmp/ssl-kafka/client.truststore.jks
ssl.truststore.password=changeit
ssl.keystore.location=/tmp/ssl-kafka/client.keystore.jks
ssl.keystore.password=changeit
ssl.key.password=changeit
ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1
ssl.keystore.type=JKS
ssl.truststore.type=JKS

kafka-consumer-groups tool with SASL_PLAINTEXT

2017-07-24 Thread Meghana Narasimhan
Hi,
What is the correct way to use the kafka-consumer-groups tool with
SASL_PLAINTEXT security enabled ?

The tool seems to work fine with PLAINTEXT port but not with
SASL_PLAINTEXT. Can it be configured to work with SASL_PLAINTEXT ? If so
what permissions have to enabled for it ?

Thanks,
Meghana


Re: Kafka error after SSL enabled - Bootstrap broker-name :6667 disconnected (org.apache.kafka.clients.NetworkClient)

2017-07-24 Thread karan alang
Here is what i see the logs ..
So, it seems the Kafka Broker is starting up with SSL, however - when the
Controller is not able to connect to the Broker

server.log

>
> [2017-07-24 20:57:19,461] INFO [ThrottledRequestReaper-Produce], Starting
> (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
> [2017-07-24 20:57:19,464] INFO [ThrottledRequestReaper-Fetch], Starting
> (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
> [2017-07-24 20:57:19,467] INFO Will not load MX4J, mx4j-tools.jar is not
> in the classpath (kafka.utils.Mx4jLoader$)
> [2017-07-24 20:57:19,474] INFO [Group Metadata Manager on Broker 1001]:
> Removed 0 expired offsets in 7 milliseconds.
> (kafka.coordinator.GroupMetadataManager)
> [2017-07-24 20:57:19,498] INFO Creating /brokers/ids/1001 (is it secure?
> false) (kafka.utils.ZKCheckedEphemeral)
> [2017-07-24 20:57:19,508] INFO Result of znode creation is: OK
> (kafka.utils.ZKCheckedEphemeral)
> [2017-07-24 20:57:19,510] INFO Registered broker 1001 at path
> /brokers/ids/1001 with addresses: PLAINTEXT -> EndPoint(
> nwk2-bdp-kafka-04.gdcs-qa.apple.com,6668,PLAINTEXT),SSL -> EndPoint(
> nwk2-bdp-kafka-04.gdcs-qa.apple.com,6667,SSL) (kafka.utils.ZkUtils)
> [2017-07-24 20:57:19,526] INFO [Kafka Server 1001], started
> (kafka.server.KafkaServer)



controller.log

[2017-07-24 20:59:56,323] WARN
> [Controller-1001-to-broker-1001-send-thread], Controller 1001's connection
> to broker nwk2-bdp-kafka-04.gdcs-qa.apple.com:6667 (id: 1001 rack: null)
> was unsuccessful (kafka.controller.RequestSendThread)
> java.io.IOException: Connection to
> nwk2-bdp-kafka-04.gdcs-qa.apple.com:6667 (id: 1001 rack: null) failed
> at
> kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$extension$2.apply(NetworkClientBlockingOps.scala:63)
> at
> kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$extension$2.apply(NetworkClientBlockingOps.scala:59)
> at
> kafka.utils.NetworkClientBlockingOps$.recursivePoll$1(NetworkClientBlockingOps.scala:112)
> at
> kafka.utils.NetworkClientBlockingOps$.kafka$utils$NetworkClientBlockingOps$$pollUntil$extension(NetworkClientBlockingOps.scala:120)
> at
> kafka.utils.NetworkClientBlockingOps$.blockingReady$extension(NetworkClientBlockingOps.scala:59)
> at
> kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.scala:233)
> at
> kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:182)
> at
> kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:181)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)


On Mon, Jul 24, 2017 at 12:36 PM, karan alang  wrote:

> Hello - i've enabled SSL for Kafka, and Kafka is starting up fine with SSL
> enable.
>
> However, when i run the Kafka console producer, it is give me error as
> shown below ->
>
>
>
>1. Command :
>2.
>3. /usr/hdp/2.5.3.0-37/kafka/bin/kafka-console-producer.sh --broker-list
>nwk2-bdp-kafka-05.gdcs-qa.apple.com:6667,nwk2-bdp-kafka-04.gdcs-qa.
>apple.com:6667,nwk2-bdp-kafka-06.gdcs-qa.apple.com:6667 --topic
>sslTopic --producer.config /tmp/ssl-kafka/client-ssl.properties
>4.
>5. Message Typed on console :
>6.  hi
>7.
>8. On Typing message on the Console Producer, i get the following
>error :
>9.
>10. [2017-07-24 19:10:22,940] WARN Bootstrap broker nwk2-bdp-kafka-
>06.gdcs-qa.apple.com:6667 disconnected (org.apache.kafka.clients.Netw
>orkClient)
>11. [2017-07-24 19:10:23,106] WARN Bootstrap broker nwk2-bdp-kafka-
>05.gdcs-qa.apple.com:6667 disconnected (org.apache.kafka.clients.Netw
>orkClient)
>
>
> Attached is the client-ssl.properties file, used to start the Console
> produce
>
>


Re: kafka-consumer-groups tool with SASL_PLAINTEXT

2017-07-24 Thread Vahid S Hashemian
Hi Meghana,

I did some experiments with SASL_PLAINTEXT and documented the results 
here:
https://developer.ibm.com/opentech/2017/05/31/kafka-acls-in-practice/
I think it covers what you'd like to achieve. If not, please advise.

Thanks.
--Vahid




From:   Meghana Narasimhan 
To: users@kafka.apache.org
Date:   07/24/2017 01:56 PM
Subject:kafka-consumer-groups tool with SASL_PLAINTEXT



Hi,
What is the correct way to use the kafka-consumer-groups tool with
SASL_PLAINTEXT security enabled ?

The tool seems to work fine with PLAINTEXT port but not with
SASL_PLAINTEXT. Can it be configured to work with SASL_PLAINTEXT ? If so
what permissions have to enabled for it ?

Thanks,
Meghana






Re: Tuning up mirror maker for high thruput

2017-07-24 Thread James Cheng
Todd,

I have a question about the OS/broker tuning that you are talking about on the 
source cluster. Aside from mirrormaker (which you say should be running in the 
remote destination datacenter), presumably there will be other consumers in the 
source datacenter as well. How does the OS/broker tuning affect those consumers 
that are close to the source datacenter? Will they continue to function well?

-James

> On Jul 23, 2017, at 7:16 AM, Todd Palino  wrote:
> 
> One of the best pieces of advice I can offer is that you really need to run
> the mirror maker in the same physical/network location as the Kafka cluster
> you are producing to. Latency on the consumer side can be more easily
> absorbed than latency on the producer side, as to assure that we have
> proper message ordering and reliability, we need to restrict in flight
> batches to 1. So that means that our produce connection is contstrained to
> be very thin, and latency makes a huge impact. Meanwhile, on the consume
> side we’re fetching large batches of messages, many at a time, so
> round-trip latency has less of an impact. I really can’t stress this
> enough. We set up some mirror makers in the opposite configuration for
> security reasons, and it’s been a huge problem even with tuning.
> 
> In addition to this, you will want to assure that your OS (and then the
> mirror maker and broker) tuning is taking into account the latency. Here’s
> a good reference for the OS side (for Linux):
> http://linuxczar.net/blog/2016/09/18/bandwidth-delay-product/
> 
> Once you have the OS tuned, you’ll need to adjust the broker tuning on the
> clusters you are consuming from, since that is the high latency side. The
> configuration for that is socket.send.buffer.bytes, and it probably makes
> sense to set this to -1 (which means use the OS configuration). You can do
> the same with socket.receive.buffer.bytes, but it’s not as critical with
> this setup. On the mirror maker, the configuration is on the consumer side,
> and it’s called receive.buffer.bytes. Again, you can set this to -1 to use
> the OS configuration. Make sure to restart the applications after making
> all these changes, of course.
> 
> -Todd
> 
> 
> On Sat, Jul 22, 2017 at 1:27 AM, James Cheng  wrote:
> 
>> Becket Qin from LinkedIn spoke at a meetup about how to tune the Kafka
>> producer. One scenario that he described was tuning for situations where
>> you had high network latency. See slides at https://www.slideshare.net/
>> mobile/JiangjieQin/producer-performance-tuning-for-apache-kafka-63147600
>> and video at https://youtu.be/oQe7PpDDdzA
>> 
>> -James
>> 
>> Sent from my iPhone
>> 
>>> On Jul 21, 2017, at 9:25 AM, Sunil Parmar  wrote:
>>> 
>>> We're trying to set up mirror maker to mirror data from EU dc to US dc.
>> The
>>> network delay is ~150 ms. In recent test; we realized that mirror maker
>> is
>>> not keeping up with load and have a lag trending upward all the time.
>>> 
>>> What are configurations that can be tuned up to make it work for the
>> higher
>>> throughput ?
>>> How to decide number of producer and consumer threads ? ( number of topic
>>> partitions / brokers ? )
>>> 
>>> 
>>> *Environment* ( both source and destination cluster )
>>> 
>>> Kafka version 0.9 ( Cloudera 0.9.0.0+kafka2.0.0+188 )
>>> 
>>> queue.size = 1
>>> queue.byte.size = 100MB
>>> 
>>> 2 brokers on source, 3 brokers on destination
>>> 
>>> 
>>> *Mirror maker configs :*
>>> 
>>> Producer properties :
>>> request.timeout.ms=12
>>> block.on.buffer.full=TRUE
>>> retries=20
>>> acks=all
>>> 
>>> 
>>> Consumer properties:
>>> request.timeout.ms=12
>>> auto.offset.reset=latest
>>> enable.auto.commit=false
>>> 
>>> We've configured 4 producer and consumer threads.
>>> There is no security set up as of now so it's all PLAINTEXT.
>>> 
>>> We have 4 topics are white listed to sync from EU to US. Only one of them
>>> is high throughput. We also have a message handler to strip off some
>>> sensitive information from EU to US but it only works on a low thru put
>>> topic; the message handler still try to process the other topics but let
>> it
>>> pass thru.
>>> 
>>> Thanks,
>>> Sunil Parmar
>> 
> 
> 
> 
> -- 
> *Todd Palino*
> Senior Staff Engineer, Site Reliability
> Data Infrastructure Streaming
> 
> 
> 
> linkedin.com/in/toddpalino



Re: Tuning up mirror maker for high thruput

2017-07-24 Thread Todd Palino
We haven’t had any problem after tuning the default send/receive buffers in
the OS up to 10MB. Linux uses a sliding window, so if you have short
latencies, you won’t use as much of the buffer and you should see very
little, if any, impact.

-Todd


On Mon, Jul 24, 2017 at 2:20 PM, James Cheng  wrote:

> Todd,
>
> I have a question about the OS/broker tuning that you are talking about on
> the source cluster. Aside from mirrormaker (which you say should be running
> in the remote destination datacenter), presumably there will be other
> consumers in the source datacenter as well. How does the OS/broker tuning
> affect those consumers that are close to the source datacenter? Will they
> continue to function well?
>
> -James
>
> > On Jul 23, 2017, at 7:16 AM, Todd Palino  wrote:
> >
> > One of the best pieces of advice I can offer is that you really need to
> run
> > the mirror maker in the same physical/network location as the Kafka
> cluster
> > you are producing to. Latency on the consumer side can be more easily
> > absorbed than latency on the producer side, as to assure that we have
> > proper message ordering and reliability, we need to restrict in flight
> > batches to 1. So that means that our produce connection is contstrained
> to
> > be very thin, and latency makes a huge impact. Meanwhile, on the consume
> > side we’re fetching large batches of messages, many at a time, so
> > round-trip latency has less of an impact. I really can’t stress this
> > enough. We set up some mirror makers in the opposite configuration for
> > security reasons, and it’s been a huge problem even with tuning.
> >
> > In addition to this, you will want to assure that your OS (and then the
> > mirror maker and broker) tuning is taking into account the latency.
> Here’s
> > a good reference for the OS side (for Linux):
> > http://linuxczar.net/blog/2016/09/18/bandwidth-delay-product/
> >
> > Once you have the OS tuned, you’ll need to adjust the broker tuning on
> the
> > clusters you are consuming from, since that is the high latency side. The
> > configuration for that is socket.send.buffer.bytes, and it probably makes
> > sense to set this to -1 (which means use the OS configuration). You can
> do
> > the same with socket.receive.buffer.bytes, but it’s not as critical with
> > this setup. On the mirror maker, the configuration is on the consumer
> side,
> > and it’s called receive.buffer.bytes. Again, you can set this to -1 to
> use
> > the OS configuration. Make sure to restart the applications after making
> > all these changes, of course.
> >
> > -Todd
> >
> >
> > On Sat, Jul 22, 2017 at 1:27 AM, James Cheng 
> wrote:
> >
> >> Becket Qin from LinkedIn spoke at a meetup about how to tune the Kafka
> >> producer. One scenario that he described was tuning for situations where
> >> you had high network latency. See slides at https://www.slideshare.net/
> >> mobile/JiangjieQin/producer-performance-tuning-for-apache-
> kafka-63147600
> >> and video at https://youtu.be/oQe7PpDDdzA
> >>
> >> -James
> >>
> >> Sent from my iPhone
> >>
> >>> On Jul 21, 2017, at 9:25 AM, Sunil Parmar 
> wrote:
> >>>
> >>> We're trying to set up mirror maker to mirror data from EU dc to US dc.
> >> The
> >>> network delay is ~150 ms. In recent test; we realized that mirror maker
> >> is
> >>> not keeping up with load and have a lag trending upward all the time.
> >>>
> >>> What are configurations that can be tuned up to make it work for the
> >> higher
> >>> throughput ?
> >>> How to decide number of producer and consumer threads ? ( number of
> topic
> >>> partitions / brokers ? )
> >>>
> >>>
> >>> *Environment* ( both source and destination cluster )
> >>>
> >>> Kafka version 0.9 ( Cloudera 0.9.0.0+kafka2.0.0+188 )
> >>>
> >>> queue.size = 1
> >>> queue.byte.size = 100MB
> >>>
> >>> 2 brokers on source, 3 brokers on destination
> >>>
> >>>
> >>> *Mirror maker configs :*
> >>>
> >>> Producer properties :
> >>> request.timeout.ms=12
> >>> block.on.buffer.full=TRUE
> >>> retries=20
> >>> acks=all
> >>>
> >>>
> >>> Consumer properties:
> >>> request.timeout.ms=12
> >>> auto.offset.reset=latest
> >>> enable.auto.commit=false
> >>>
> >>> We've configured 4 producer and consumer threads.
> >>> There is no security set up as of now so it's all PLAINTEXT.
> >>>
> >>> We have 4 topics are white listed to sync from EU to US. Only one of
> them
> >>> is high throughput. We also have a message handler to strip off some
> >>> sensitive information from EU to US but it only works on a low thru put
> >>> topic; the message handler still try to process the other topics but
> let
> >> it
> >>> pass thru.
> >>>
> >>> Thanks,
> >>> Sunil Parmar
> >>
> >
> >
> >
> > --
> > *Todd Palino*
> > Senior Staff Engineer, Site Reliability
> > Data Infrastructure Streaming
> >
> >
> >
> > linkedin.com/in/toddpalino
>
>


-- 
*Todd Palino*
Senior Staff Engineer, Site Reliability
Data Infrastructure Streaming



linkedin.com/in/toddpalino


Re: Kafka error after SSL enabled - Bootstrap broker-name :6667 disconnected (org.apache.kafka.clients.NetworkClient)

2017-07-24 Thread karan alang
further update -> i recreated the certificates & here is the result of the
verification

(i read in one post that the CN should match the FQDN, else it gives the
error, any ideas on how to debug this ?


openssl s_client -debug -connect nwk2-bdp-kafka-04.gdcs-qa.apple.com:6667 -tls1
CONNECTED(0003)
write to 0x8bd830 [0x908c33] (155 bytes => 155 (0x9B))
 - 16 03 01 00 96 01 00 00-92 03 01 59 76 79 79 99 ...Yvyy.
0010 - 65 b5 a8 26 4c 80 20 9f-cc 73 86 b7 e0 ff b6 93 e..&L. ..s..
0020 - e4 bf 05 b7 34 0c 39 01-c1 b5 f6 00 00 4c c0 14 4.9..L..
0030 - c0 0a 00 39 00 38 00 88-00 87 c0 0f c0 05 00 35 ...9.8.5
0040 - 00 84 c0 13 c0 09 00 33-00 32 00 9a 00 99 00 45 ...3.2.E
.
..
0570 - 32 d9 53 62 8d 34 47 ab-10 39 0e 16 ee ef ca 02 2.Sb.4G..9..
0580 - c6 37 12 a7 da 60 69 d3-48 1c 2d 5e f1 9d 55 da .7...`i.H.-^..U.
0590 - cd 11 e8 eb 18 bc ca b8-82 72 98 e7 67 a8 9e 0e .r..g...
05a0 - 5f 05 6d c0 ae 23 0f c5-8c cf 77 0e _.m..#w.
05af - 
depth=0 C = us, ST = ca, L = nwk, O = gdcs, OU = gdcs-qa, CN =
nwk2-bdp-kafka-04.gdcs-qa.apple.com
verify error:num=18:self signed certificate
verify return:1
depth=0 C = us, ST = ca, L = nwk, O = gdcs, OU = gdcs-qa, CN =
nwk2-bdp-kafka-04.gdcs-qa.apple.com
verify return:1
write to 0x8bd830 [0x90e100] (143 bytes => 143 (0x8F))
 - 16 03 01 00 8a 10 00 00-86 85 04 00 c2 51 e7 95 .Q..
0010 - 9a f9 56 c3 78 c7 1a 92-ba 0e 5a e7 17 48 81 d9 ..V.x.Z..H..
0020 - 25 6a ce 4a 83 2c 31 d1-5a e4 ee d8 b7 db 9e 64 %j.J.,1.Z..d
0030 - 79 e5 e9 c0 58 a4 40 2b-5c 33 69 d7 2b 5f f5 f9 y...X.@+\3i.+_..
0040 - dc 96 2a e7 d6 7c be b9-bd ae 91 11 b3 01 69 0d ..*..|i.
0050 - f8 45 01 81 44 13 98 d8-10 27 b8 d0 ee c9 50 51 .E..D'PQ
0060 - 85 b3 ab 23 46 d7 c1 65-77 d4 57 d0 25 79 4c 48 ...#F..ew.W.%yLH
0070 - c5 03 1d b9 45 43 c8 e2-d4 6b ce 7c 7b 5f 8e a0 EC...k.|{_..
0080 - f7 cf 82 ec c2 66 a4 10-79 28 03 7f 74 6e b2.f..y(..tn.
write to 0x8bd830 [0x90e100] (6 bytes => 6 (0x6))
 - 14 03 01 00 01 01 ..
write to 0x8bd830 [0x90e100] (53 bytes => 53 (0x35))
 - 16 03 01 00 30 c2 b9 f5-bc 0f fb ce 98 f4 a1 fb 0...
0010 - 11 e3 70 b5 5c 14 27 88-72 e0 96 b4 95 cf 86 f5 ..p.\.'.r...
0020 - 8e 88 91 ff f8 58 b1 a2-cc c5 62 17 a6 c2 22 9a .Xb...".
0030 - 9a 90 80 7d 04...}.
read from 0x8bd830 [0x9046e3] (5 bytes => 5 (0x5))
 - 14 03 01 00 01.
read from 0x8bd830 [0x9046e8] (1 bytes => 1 (0x1))
 - 01.
read from 0x8bd830 [0x9046e3] (5 bytes => 5 (0x5))
 - 16 03 01 00 300
read from 0x8bd830 [0x9046e8] (48 bytes => 48 (0x30))
 - ff bc bf 23 4d fa 4b 8d-cb fc 28 10 c0 c4 57 c8 ...#M.K...(...W.
0010 - 53 14 f7 77 65 71 e5 60-44 a9 27 7b 69 11 fc a9 S..weq.`D.'{i...
0020 - 10 52 f9 06 d3 d9 00 07-e8 5a f0 35 79 23 18 9b .R...Z.5y#..
---
Certificate chain
0 s:/C=us/ST=ca/L=nwk/O=gdcs/OU=gdcs-qa/CN=nwk2-bdp-kafka-04.gdcs-qa.apple.com
i:/C=us/ST=ca/L=nwk/O=gdcs/OU=gdcs-qa/CN=nwk2-bdp-kafka-04.gdcs-qa.apple.com
---
Server certificate
-BEGIN CERTIFICATE-
MIIDvTCCAqWgAwIBAgIEbFXDGDANBgkqhkiG9w0BAQsFADB3MQswCQYDVQQGEwJ1
czELMAkGA1UECBMCY2ExDDAKBgNVBAcTA253azENMAsGA1UEChMEZ2RjczEQMA4G
A1UECxMHZ2Rjcy1xYTEsMCoGA1UEAxMjbndrMi1iZHAta2Fma2EtMDQuZ2Rjcy1x
YS5hcHBsZS5jb20wHhcNMTcwNzI0MjIzNTE2WhcNMTgwNzE5MjIzNTE2WjB3MQsw
CQYDVQQGEwJ1czELMAkGA1UECBMCY2ExDDAKBgNVBAcTA253azENMAsGA1UEChME
Z2RjczEQMA4GA1UECxMHZ2Rjcy1xYTEsMCoGA1UEAxMjbndrMi1iZHAta2Fma2Et
MDQuZ2Rjcy1xYS5hcHBsZS5jb20wggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEK
AoIBAQDZxDGpPOh17dhxKnTwdDYLXXQL6Kkq4DLQ56x0DgJGGW2zwfeBhfNpOOnE
6P02NE8BLenSvMs/FqMHJ+ywtCGp/Yxth9QUeheVAr8qHPV7rvnN1p1OL7ezyzQY
d/pwu2KP5c/ROX3izfpMIVvF+04njw56ZMkmHECiTs6Cel3P9649TkTn62ssdlhC
HZT0TaYmoMgEW4Viv5XvEC8TCHTJT03O2zD2JM+P4rFa/JeSjeY7MBHzwMb7O/uV
dqNRQi9ziTfxSA9xCz72nZkLUhk0LGkecoVRaFiImWesQ3xJ/ys4DvAaHY2XeU3g
HMGIiQh0zSvq5xX3EIEa5hOBhgJ3AgMBAAGjUTBPMC4GA1UdEQQnMCWCI253azIt
YmRwLWthZmthLTA0LmdkY3MtcWEuYXBwbGUuY29tMB0GA1UdDgQWBBSc6pEu8gEu
/6xddU9riRIwPQwKBDANBgkqhkiG9w0BAQsFAAOCAQEAckfOcvs2SrdodvHo2DUE
LqkizsSE2T1RI0VNIejDSOZq4kjctj0skUPbu/EyUqt78ZObXQgf4uZHXLKnMp4o
Em2qs/XrQN+SiaFEE/o1ng5XvBBJJbFoAjmh5rNeX621vnx/pqWqNVs+bgwAsfM2
sGESAJqbukm4VgLXuDLBhkbdwhx2E8NT9GnqloJRFeAWjcwQGYsIuXKa7jU1eO4b
MAwWSxW1wk/w3cyZ50j4WgPNM4imFbHjq6B3cUjyU0vFwqbv7SEMTHsFV24X/7n5
+mIASEqRWfgATmTqsKFvmgsFvEZhi8FPoR0yRAZcz78WSijt0NWVFO5KDG1Y12Ok
OQ==
-END CERTIFICATE-
subject=/C=us/ST=ca/L=nwk/O=gdcs/OU=gdcs-qa/CN=nwk2-bdp-kafka-04.gdcs-qa.apple.com
issuer=/C=us/ST=ca/L=nwk/O=gdcs/OU=gdcs-qa/CN=nwk2-bdp-kafka-04.gdcs-qa.apple.com
---
No client certificate CA names sent
Server Temp Key: ECDH, secp521r1, 521 bits
---
SSL handshake has read 1519 bytes and written 357 bytes
---
New, TLSv1/SSLv3, Cipher is ECDHE-RSA-AES256-SHA
Server public key is 2048 bit
Secure Renegotiation IS supported
Compression: NONE
Expansion: NONE
SSL-Session:
Protocol: TLSv1
Cipher: ECDHE-RSA-AES256-SHA
Session-ID: 59767979D3C289D1EB584B04C9CB1DF4659C017296247CC84BB1F7D7842B

Re: [DISCUSS] KIP-163: Lower the Minimum Required ACL Permission of OffsetFetch

2017-07-24 Thread Vahid S Hashemian
Hi Ewen,

Thanks for reviewing the KIP.

Your comment about the "food for thought" section makes sense. It seems 
like a bug to me, not sure how you and others feel about it. I'll remove 
it for now, and open a separate JIRA for it, so we have a record of it.
The read vs. write discussion and fixing the confusion seems to be an even 
bigger task, and will be addressed in its own KIP, if necessary.

The KIP will be updated shortly.

Thanks again.
--Vahid




From:   Ewen Cheslack-Postava 
To: d...@kafka.apache.org
Cc: Kafka User 
Date:   07/24/2017 10:36 AM
Subject:Re: [DISCUSS] KIP-163: Lower the Minimum Required ACL 
Permission of OffsetFetch



Vahid,

Thanks for the KIP. I think we're mostly in violent agreement that the 
lack
of any Write permissions on consumer groups is confusing. Unfortunately
it's a pretty annoying issue to fix since it would require an increase in
permissions. More generally, I think it's unfortunate because by squeezing
all permissions into the lowest two levels, we have no room for 
refinement,
e.g. if we realize some permission needs to have a lower level of access
but higher than Describe, without adding new levels.

I'm +1 on the KIP. I don't think it's ideal given the discussion of Read 
vs
Write since I think Read is the correct permission in theory, but given
where we are now it makes sense.

Regarding the extra food for thought, I think such a change would require
some plan for how to migrate people over to it. The main proposal in the
KIP works without any migration plan because it is reducing the required
permissions, but changing the requirement for listing a group to Describe
(Group) would be adding/changing the requirements, which would be 
backwards
incompatible. I'd be open to doing it, but it'd require some thought about
how it would impact users and how we'd migrate them to the updated rule 
(or
just agree that it is a bug and that including upgrade notes would be
sufficient).

-Ewen

On Mon, Jul 10, 2017 at 1:12 PM, Vahid S Hashemian <
vahidhashem...@us.ibm.com> wrote:

> I'm bumping this up again to get some feedback, especially from some of
> the committers, on the KIP and on the note below.
>
> Thanks.
> --Vahid
>
>
>
>
> From:   "Vahid S Hashemian" 
> To: d...@kafka.apache.org
> Cc: "Kafka User" 
> Date:   06/21/2017 12:49 PM
> Subject:Re: [DISCUSS] KIP-163: Lower the Minimum Required ACL
> Permission of OffsetFetch
>
>
>
> I appreciate everyone's feedback so far on this KIP.
>
> Before starting a vote, I'd like to also ask for feedback on the
> "Additional Food for Thought" section in the KIP:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 163%3A+Lower+the+Minimum+Required+ACL+Permission+of+OffsetFetch#KIP-163:
> 
LowertheMinimumRequiredACLPermissionofOffsetFetch-AdditionalFoodforThought
>
> I just added some more details in that section, which I hope further
> clarifies the suggestion there.
>
> Thanks.
> --Vahid
>
>
>
>
>
>
>
>
>
>
>






Re: Consumer blocked when auth failed.

2017-07-24 Thread 陈江枫
Hi,
I intentionally use the wrong credential, get the following log:

 *java.net.ConnectException: Connection refused*
* at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)*
* at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)*
* at
org.apache.kafka.common.network.PlaintextTransportLayer.finishConnect(PlaintextTransportLayer.java:51)*
* at
org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:81)*
* at
org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:335)*
* at org.apache.kafka.common.network.Selector.poll(Selector.java:303)*
* at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349)*
* at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226)*
* at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:203)*
* at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:138)*
* at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:216)*
* at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:193)*
* at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:279)*
* at
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029)*
* at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)*
* at
org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.paranoidPoll(DirectKafkaInputDStream.scala:168)*
* at
org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.start(DirectKafkaInputDStream.scala:244)*


Any Idea how to throw exception in the main thread instead of just blocking.

2017-07-20 14:11 GMT+08:00 Jason Gustafson :

> What log level do you have configured? You might bump up to DEBUG or TRACE
> and see if anything stands out.
>
> -Jason
>
> On Tue, Jul 18, 2017 at 7:59 PM, 陈江枫  wrote:
>
> > Hi,
> >
> > I was integrating Kafka with Spark, using DirectStream, when my
> > authentication fail,  the stream just blocked. No log, no exceptions were
> > thrown. Could some one help to address such situtation
> >
>


Re: Schema Registry on DC/OS

2017-07-24 Thread Kaufman Ng
Confluent Schema Registry is available in the DC/OS Universe, see here for
the package definitions
https://github.com/mesosphere/universe/tree/dcd777a7e429678fd74fc7306945cdd27bda3b94/repo/packages/C/confluent-schema-registry/5

The stackoverflow thread is quite out of date, as it mentions Confluent
Platform 2.0 (the latest version as of now is 3.2.2).


On Mon, Jul 24, 2017 at 2:16 PM, Debasish Ghosh 
wrote:

> Hi -
>
> Is it possible to run schema registry service on DC/OS ? I checked the
> Confluent Kafka package on Mesosphere Universe. It doesn't have any support
> for running Schema Registry. Also this thread
> https://stackoverflow.com/questions/37322078/cant-start-
> confluent-2-0-apache-kafka-schema-registry-in-dc-os
> on SoF says the same thing. Just wondering if there has been any progress
> on this front ..
>
> regards.
>
> --
> Debasish Ghosh
> http://manning.com/ghosh2
> http://manning.com/ghosh
>
> Twttr: @debasishg
> Blog: http://debasishg.blogspot.com
> Code: http://github.com/debasishg
>



-- 
Kaufman Ng
+1 646 961 8063
Solutions Architect | Confluent | www.confluent.io


Kafka Streams internal repartitioning topics, retention time, and reprocessing old data

2017-07-24 Thread Gerd Behrmann
Hi,

While adding a new Streams based micro service to an existing Kafka 
infrastructure, I have run into some issues processing older data in existing 
topics. I am uncertain of the exact cause of the problems, but am looking for 
advice to clarify how things are supposed to work to eliminate possibilities. 
The following is my hypothesis of what may be happening; maybe somebody can 
tell me why what I describe is impossible - or whether this might be a bug. The 
following relates to Kafka 0.10.2.1 (confluent distribution).



TL;DR: When (re)processing old data with Kafka Streams in a topology that 
causes the stream to be repartitioned, the records in the -repartition topic 
carry the timestamp of the original input records as extracted by the timestamp 
extractor. The default retention policy of the -repartition topic is however 7 
days, allowing Kafka to delete data from the -repartition topic even before the 
Streams application has a chance of reading it back in.





The situation is basically that I have a topic with existing records dating 
back several months. Each record contains a timestamp and a client identifier 
(among other things). The task is quite simple: Produce an output topic that 
contains the largest timestamp for each client. Distilled this looks something 
like this:

   class Record {
 ...
 long client;
 long time;
 ...
   }

   builder.stream(EARLIEST, Bytes(), Record(), “input")
  .map((hash, record) -> new KeyValue<>(record.client, record.time))
  .groupByKey(Long(), Long())
  .reduce(Long::max, “store")
  .to(Long(), Long(), “output”);

where Bytes(), Long(), Record() return the appropriate Serde. This takes each 
input record and throws the original key away, repartitions on the embedded id, 
runs a reduction operation keeping the largest timestamp, and stores the result 
back into a topic. The repartitioning causes an internal topic to be created. 
This topic will have a cleanup.policy=delete and the server default retention 
policy of 7 days. 


I am using the default FailOnInvalidTimestamp timestamp extractor. As far as I 
can determine this causes the record in the -repartition topic to have the same 
metadata timestamps as the input records. Also, as far as I can see in the 
Kafka server side code, log segments will be deleted once the largest timestamp 
(as extracted from the records stored in the segment) is older than the 
retention policy. 


This is where I wonder how this is supposed to work when ingesting months old 
data: it would appear that Kafka could start to delete segments of the 
-repartition aggressively as the timestamps are several months old. This could 
happen even before Kafka Streams had a chance to read the data back in for the 
reduce operation. The Kafka server log would seem to support that this happens 
as I see several segments be created *and deleted* right after the application 
was started:

   [2017-07-24 09:37:54,735] INFO Rolled new log segment for 
‘xxx-repartition-2' in 1 ms. (kafka.log.Log)
   [2017-07-24 09:37:54,735] INFO Scheduling log segment 0 for log 
xxx-repartition-2 for deletion. (kafka.log.Log)


This repeats quite a number of times for the first half our or so - presumably 
until the computation has caught up with newer data that didn’t get deleted 
right away. Also the client side log seemed to indicate that something like 
this was happening:


   2017-07-24 09:45:02,550  INFO StreamThread stream-thread [StreamThread-1] no 
custom setting defined for topic xxx-repartition using original config earliest 
for offset reset


This message too repeated quite a number of times for the first half our or so. 
Looking at the Kafka Streams code, this message would get logged as a result of 
the consumer failing due to an invalid offset.


Assuming my theory is correct, I could probably solve this problem by using a 
WallclockTimestampExtractor (something I will test tomorrow). However one of 
the use cases often repeated in the Kafka material is that one can reprocess 
old data - surely it must be possible to reprocess using a timestamp extractor 
that reflects the original time, not the current processing time?


I tried to Google for information about retention time and internal topics. The 
closest thing I could come is that Kafka 0.11 has gained supported for 
applications asking for records before a particular offset to be deleted; a 
future Kafka Streams could use this to eliminate the need for having a 
retention time on the internal topic and thus resolve the problem.



Cheers,

Gerd

Re: Schema Registry on DC/OS

2017-07-24 Thread Debasish Ghosh
Thanks a lot .. I found it from the community supported packages on the
DC/OS UI. Installed it and it runs ok. One question - is there any CLI for
confluent-schema-registry ? dcos package install confluent-schema-registry
--cli does not give anything ..

regards.

On Tue, Jul 25, 2017 at 9:50 AM, Kaufman Ng  wrote:

> Confluent Schema Registry is available in the DC/OS Universe, see here for
> the package definitions https://github.com/mesosphere/universe/tree/
> dcd777a7e429678fd74fc7306945cdd27bda3b94/repo/packages/C/
> confluent-schema-registry/5
>
> The stackoverflow thread is quite out of date, as it mentions Confluent
> Platform 2.0 (the latest version as of now is 3.2.2).
>
>
> On Mon, Jul 24, 2017 at 2:16 PM, Debasish Ghosh 
> wrote:
>
>> Hi -
>>
>> Is it possible to run schema registry service on DC/OS ? I checked the
>> Confluent Kafka package on Mesosphere Universe. It doesn't have any
>> support
>> for running Schema Registry. Also this thread
>> https://stackoverflow.com/questions/37322078/cant-start-conf
>> luent-2-0-apache-kafka-schema-registry-in-dc-os
>> on SoF says the same thing. Just wondering if there has been any progress
>> on this front ..
>>
>> regards.
>>
>> --
>> Debasish Ghosh
>> http://manning.com/ghosh2
>> http://manning.com/ghosh
>>
>> Twttr: @debasishg
>> Blog: http://debasishg.blogspot.com
>> Code: http://github.com/debasishg
>>
>
>
>
> --
> Kaufman Ng
> +1 646 961 8063
> Solutions Architect | Confluent | www.confluent.io
>
>
>


-- 
Debasish Ghosh
http://manning.com/ghosh2
http://manning.com/ghosh

Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg