Friday, June 14, 2013

Introducing mps, an eventing framework for Apache Kafka 0.8 - PART II

Taking it for a drive

So now comes the really fun stuff... a recommendation engine and it's interaction with kafka et al. I have borrowed the concept from the mailing list on Kafka; but and as you shall see in the next post, a variety of different apps can benefit from this eventing framework. But first a digression on the use case.

The use case is simple. There is a recommendation engine that recommends one of a set of products to the logged in users. The users get to see the recommendation in near real time as the recommendation engine makes it's recommendation based on what it sees about the user. The logged in users are in one topic (Topic1) in kafka; as the users log in. There are multiple instances of the recommendation engine; each one of which is reponsible for a list of users. The current demo, for expediency's sake, makes this list static; but architecturally this is a dynamic list. The users are eagerly awaiting the recommendations from the recommendation engine. 





Now we are going to start an instance of the kafka_client  (refer to my previous post on the erlang client for kafka and the code). In the code, we currently write to Topic1 in kafka; which must exist

kafka_client:start_link(in_mps).


and then start the recommendation engine and then the kafka_subscription engine that will read loggedin users from kafka and publish them into the mps realm.

recommendation_engine:start_link(["user9",  "user23", "user46"]).
kafka_subscription:start_link(in_mps, publish)




Now generate artificial data through

recommendation_example:generate_data (100).    % generates a 100 simulated users in kafka
and the recommendation_engine manages to get recommendations to the users.





Generate some more users in kafka
recommendation_example:generate_data (100).

and you see updated results on the recommendations.




WE ARE AWARE ABOUT A BUG IN THE kafka_protocol.erl THAT DOES NOT HANDLE BROKEN TCP DATA CORRECTLY. THIS BUG WILL MANIFEST ITSELF INTERMINENTLY UNTIL IT IS FIXED BY THROWING AN EXCEPTION TO WHICH THE ONLY REMEDY IS TO CURRENTLY RESTART THE ERLANG NODE; by q().

UPDATE 06/16/2013 -- This bug has been fixed in 0.4.6 at github. Please refresh at will





Introducing mps, an eventing framework for Apache Kafka 0.8 - PART I

Apache Kafka is reliable messaging infrastructure that is used in production at different companies. It communicates with other software through it's wire protocol.

mps (massive publish subscribe) is an eventing framework over the top of kafka that enables user-land communicate with kafka with the ambition of enabling event driven applications utilize kafka without having to worry about the wire-protocol. mps is open source and available at github.com/milindparikh/mps. It's still a baby; so be gentle.  it is written in Erlang and rests on shoulders of giants... of particular mention is OTP, Riak_Core and Cowboy.

This post, which is about setting up the mps and seeing some toy demos, is the percursor to the next post; which will explain the technical underpinnings of mps. In the meantime and for the impatient, the source is the guide (at least  those who grok Erlang).


Kicking the tires

Building of mps is mentioned in README.md at github.com/milindparikh/mps.
It does require some hand holding; but essentially you need to end "make rel" in a clean fashion. You must have started kafka on your localhost at 9092; unless you grok erlang (in which case you can change the code).

./rel/mps/bin/mps console will start you off with a set of 64 vnodes on which ~65000 htopics (which are very different than topics in kafka) will be deployed.  It also comes with web-server at localhost:8080

At the command prompt of the console, you must type (with the period):

mps:create_stream(). 

This deploys the 65000 htopics on the 64 vnodes and it takes time ( 30 or so seconds; at least on my ubuntu powered virtual machine ); even it is assures you by saying [ok|ok.....].



Now open up two browsers sessions at localhost:8080 and localhost:8080/publish. You must subscribe to an event first within your subscription client at Topic, Key range. So in the screenshot below, you are subscribing to Topic1, Key1 in the subscription client. Then at the publishing client, you can publish a value that can be seen at the subscription client.



Trying something more adventurous, com.yahoo.sales.* at the subscription level and com.yahoo.sales.books with a value of 100 at the publishing level, provides a clearer concept of key filtering at the subscription and publishing level.


The important thing to note is that there are no kafka topics so far at all (and will not be in this section). This independence, as you shall see later, is the part of the  key (pun not intended) to the eventing framework. You can actually broadcast a message as well like com.(yahoo|amazon).sales.books.


NEXT UP -- Introducing mps, an eventing framework for Apache Kafka 0.8 - PART II


Thursday, June 13, 2013

The Erlang client for Apache Kafka 0.8

Apache Kafka 0.8 (http://kafka.apache.org/) represents a significant evolution in distributed messaging system that is of specific interest to me (more about that later).

The Erlang client for Apache Kafka 0.8 has been written to interface erlang to kafka. This client has the verbs and nouns to talk directly to kafka at the wire protocol  level. Since kafka 0.8 introduced a couple neat features, I though that I would talk about that here.


Since kafka has the ability of multiplexing different requests through the co-relation-id, we thought that it would be useful to bake this into the implementation. This also means that the responsibility of managing the relationship is pushed up the chain.

All of the formation of the request and the parsing of the request occurs in kafka_protocol. The protocol needs a little help from the kafka_client to managing the tcp connection. Essentially kafka_client sends the request formatted through kafka_protocol  to kafka; but does NOT wait for the response at the time of the request.

At the backend, as the response for the request becomes available (through the corelation id), it then is sent to kafka_client through parsing by kafka_protocol. The range of co-relation-id decides for the kafka protocol what the actual response was for out of the six requests. In that way, the co-relation id is opaque to an end client of kafka_client.


A typical invocation looks like:
       
        Pid = kafka_client:locate_kafka_client().

%% produce for Topic1, Partition 0 with two messages

        Cid = kafka_client:request_produce(Pid, [{<<"Topic1">>, [ {0, [{<<"Key1">>, <<"Msg1">>}, {<<"Key1">>, <<"Msg1">>}]}]]).

         Rid = kafka_client:response_produce (Pid, Cid)

The syntax of the produce (as with other requests) is ensure compatibility with the philosophy of kafka to be able to generate produce (and other requests) for multiple topics and multiple partitions within a topic through the same request.


erlkafka.hrl has all of the defined constants.


The whole thing is available on github @ https://github.com/milindparikh/mps. The kafka client is embedded into this project as a part of the larger goal of providing an eventing framework on top of kafka. (more about that soon).

UPDATE 06/14/2013: WE ARE AWARE ABOUT A BUG IN THE kafka_protocol.erl THAT DOES NOT HANDLE BROKEN TCP DATA CORRECTLY. THIS BUG WILL MANIFEST ITSELF INTERMINENTLY UNTIL IT IS FIXED BY THROWING AN EXCEPTION TO WHICH THE ONLY REMEDY IS TO CURRENTLY RESTART THE ERLANG NODE.

UPDATE 06/16/2013 This bug has been fixed in 0.4.6 and also master