Wojket Gawronski's post here, neatly summarises the issues with GenEvent. Fortunately there are alternatives including gproc, phoenix_pubsub, and Elixir's upcoming Process registry.
The code used in these examples is available from this repository.
gproc
While gproc's main purpose is as a process registry, it can be used as pub/sub framework using a lovely trick.
defmodule PubsubSpike.Gproc do
use GenServer
def start_link(topic, otp_opts \\ []) do
GenServer.start_link(__MODULE__, topic, otp_opts)
end
def broadcast(topic, message) do
GenServer.cast({:via, :gproc, gproc_key(topic)},
{:broadcast, message})
end
def messages_received(pid) do
GenServer.call(pid, :messages_received)
end
def init(topic) do
:gproc.reg(gproc_key(topic))
{:ok, []}
end
def handle_cast({:broadcast, message}, messages_received) do
{:noreply, [message | messages_received]}
end
def handle_call(:messages_received, _from, messages_received) do
{:reply, Enum.reverse(messages_received), messages_received}
end
defp gproc_key(topic) do
{:p, :l, topic}
end
end
The code above (also here), illustrates using gproc in this way. In init
the GenServer
process registers itself with a particular key: {:p, :l, topic}
:
- The
:p
atom in the tuple, indicates to gproc that multiple processes may be registered using the same key. (If it was:n
, then the uniqueness of the key would be enforced.) - The
:l
just says that the process is registered locally, just on this node.:g
would register globally, across all connected nodes, but that involves a certain amount of faff. - And
topic
is our topic.
broadcast
broadcasts a message to all processes listening on a topic; this is implemented with GenServer.cast
. Rather than identifying the cast's target process with a pid
or a name in the Erlang process registry, it is identified using a via tuple, {:via, :gproc, gproc_key(topic)}
which delegates finding the pid(s) to gproc's whereis_name
function. Thus, all processes registered with that key (effectively listening to that topic) will receive the cast
.
This test shows it all working (code also here):
alias PubsubSpike.Gproc
test "broadcast messages" do
{:ok, pid1} = Gproc.start_link("sue")
{:ok, pid2} = Gproc.start_link("sue")
{:ok, pid3} = Gproc.start_link(:miranda)
Gproc.broadcast("sue", "Hi Sue!")
Gproc.broadcast(:miranda, "Hi Miranda!")
assert Gproc.messages_received(pid1) == ["Hi Sue!"]
assert Gproc.messages_received(pid2) == ["Hi Sue!"]
assert Gproc.messages_received(pid3) == ["Hi Miranda!"]
end
Elixir Registry
Registry is a more Elixir-like and built-in version of gproc. We can also use it as a pub/sub framework. A registry is a supervisor and must be started, such as in the application supervisor:
def start(_type, _args) do
import Supervisor.Spec, warn: false
children = [
supervisor(Registry, [:duplicate, :pubsub_elixir_registry]),
# ...
]
opts = [strategy: :one_for_one, name: PubsubSpike.Supervisor]
Supervisor.start_link(children, opts)
end
Note that we have called this registry :pubsub_elixir_registry
and have marked it as allowing duplicate keys.
Here is a pub/sub implementation similar to that for gproc:
defmodule PubsubSpike.ElixirRegistry do
use GenServer
def start_link(topic, otp_opts \\ []) do
GenServer.start_link(__MODULE__, topic, otp_opts)
end
def broadcast(topic, message) do
Registry.dispatch(:pubsub_elixir_registry, topic, fn entries ->
for {pid, _} <- entries, do: send(pid, {:broadcast, message})
end)
end
def messages_received(pid) do
GenServer.call(pid, :messages_received)
end
def init(topic) do
Registry.register(:pubsub_elixir_registry, topic, [])
{:ok, []}
end
def handle_info({:broadcast, message}, messages_received) do
{:noreply, [message | messages_received]}
end
def handle_call(:messages_received, _from, messages_received) do
{:reply, Enum.reverse(messages_received), messages_received}
end
end
Registries that allow duplicates are not allowed to service via tuples, so we cannot perform the same trick as we did with gproc. Instead, broadcast
uses Registry.dispatch to send a message to each process listening on a topic. We can use handle_info to receive messages to the subscribed topic.
The illustrative test, here, is so similar to the gproc version that I will not include it below.
Phoenix PubSub
Unlike gproc and Registry, Phoenix PubSub's primary purpose is as a pub/sub framework. There is a clue in the name.
Phoenix PubSub supports multiple implementations, such as a Phoenix PubSub Redis, but we will just use the built-in one based on Erlang's PG2. We will to start the Phoenix.PubSub.PG2
supervisor, in the application supervisor:
def start(_type, _args) do
import Supervisor.Spec, warn: false
children = [
supervisor(Phoenix.PubSub.PG2, [:pubsub_spike, []]),
# ..
]
opts = [strategy: :one_for_one, name: PubsubSpike.Supervisor]
Supervisor.start_link(children, opts)
end
Note that we have named the Phoenix PubSub supervisor, :pubsub_spike
.
Implementing with the same interface as the other examples, we get this, following:
defmodule PubsubSpike.PhoenixPubsub do
use GenServer
alias Phoenix.PubSub
def start_link(topic, otp_opts \\ []) when is_binary(topic) do
GenServer.start_link(__MODULE__, topic, otp_opts)
end
def broadcast(topic, message) do
PubSub.broadcast(:pubsub_spike, topic, {:pubsub_spike, message})
end
def messages_received(pid) do
GenServer.call(pid, :messages_received)
end
def init(topic) do
PubSub.subscribe(:pubsub_spike, topic)
{:ok, []}
end
def handle_call(:messages_received, _from, messages_received) do
{:reply, Enum.reverse(messages_received), messages_received}
end
def handle_info({:pubsub_spike, msg}, messages_received) do
{:noreply, [msg | messages_received]}
end
end
The interface to Phoenix Pub Sub is more straightforward than the other two, as it was designed specifically for pub/sub. We explicitly subscribed to a topic (in init
), broadcast to a topic (in broadcast
), and receive a message (in handle_info
). It is worth noting that PHoenix PubSub only supports binary (String) topics.
There is test code here, but is also so similar to the gproc test that it is not worth embedding in the blog.
Bonus: distributed events with Phoenix PubSub
A bonus of using Phoenix PubSub is that it can publishes events across connected nodes. This post's companion code is set up for you to easily play with this.
A PubsubSpike.PhoenixPubsub
worker (as above), named :phoenix_pubsub
and listening on the topic "topic:phoenix_pubsub", is created by its application supervisor:
def start(_type, _args) do
import Supervisor.Spec, warn: false
children = [
supervisor(Phoenix.PubSub.PG2, [:pubsub_spike, []]),
worker(PubsubSpike.PhoenixPubsub, ["topic:phoenix_pubsub",
[name: :phoenix_pubsub]]),
# ..
]
opts = [strategy: :one_for_one, name: PubsubSpike.Supervisor]
Supervisor.start_link(children, opts)
end
To try this out, (assuming you're all set up for Elixir) open a terminal and type the following.
git clone [email protected]:CultivateHQ/pubsub_spike.git
cd pubsub_spike
mix deps.get
iex --sname mel@localhost -S mix
In another terminal, but the same directory:
iex --sname sue@localhost -S mix
Node.connect(:"mel@localhost") # should return true
PubsubSpike.PhoenixPubsub.broadcast("topic:phoenix_pubsub", "Hello all!")
PubsubSpike.PhoenixPubsub.messages_received(:phoenix_pubsub)
messages_received
should return ["Hello all!"]
. Now in the other terminal (Node "mel@localhost"), try
PubsubSpike.PhoenixPubsub.messages_received(:phoenix_pubsub)
That too should return ["Hello all!"]
. The message has been broadcast from the other node.
Summary
You may already be using gproc as a registry for dynamic processes; if so, it may be convenient to also use it's pub/sub capabilities. In most cases it would be better to use the built-in Registry
if you need the events to be propagated locally.
Use Phoenix.PubSub
if you want events to be propagated to all connected nodes.
Updates
- 2018-02-27 Update to Elixir 1.6; changed references to
Registry
being an upcoming feature of Elixir 1.4 to it being built-in to Elixir; altered the summary to be clearer aboutPhoenix.PubSub
events propagating across all connected nodes, whileRegistry
being for local events.