Taking it for a drive
So now comes the really fun stuff... a recommendation engine and it's interaction with kafka et al. I have borrowed the concept from the mailing list on Kafka; but and as you shall see in the next post, a variety of different apps can benefit from this eventing framework. But first a digression on the use case.
The use case is simple. There is a recommendation engine that recommends one of a set of products to the logged in users. The users get to see the recommendation in near real time as the recommendation engine makes it's recommendation based on what it sees about the user. The logged in users are in one topic (Topic1) in kafka; as the users log in. There are multiple instances of the recommendation engine; each one of which is reponsible for a list of users. The current demo, for expediency's sake, makes this list static; but architecturally this is a dynamic list. The users are eagerly awaiting the recommendations from the recommendation engine.
Now we are going to start an instance of the kafka_client (refer to my previous post on the erlang client for kafka and the code). In the code, we currently write to Topic1 in kafka; which must exist
kafka_client:start_link(in_mps).
and then start the recommendation engine and then the kafka_subscription engine that will read loggedin users from kafka and publish them into the mps realm.
recommendation_engine:start_link(["user9", "user23", "user46"]).
kafka_subscription:start_link(in_mps, publish)
Now generate artificial data through
recommendation_example:generate_data (100). % generates a 100 simulated users in kafka
and the recommendation_engine manages to get recommendations to the users.
Generate some more users in kafka
recommendation_example:generate_data (100).
and you see updated results on the recommendations.
WE ARE AWARE ABOUT A BUG IN THE kafka_protocol.erl THAT DOES NOT HANDLE BROKEN TCP DATA CORRECTLY. THIS BUG WILL MANIFEST ITSELF INTERMINENTLY UNTIL IT IS FIXED BY THROWING AN EXCEPTION TO WHICH THE ONLY REMEDY IS TO CURRENTLY RESTART THE ERLANG NODE; by q().
UPDATE 06/16/2013 -- This bug has been fixed in 0.4.6 at github. Please refresh at will