Interprocess Akka

Akka allows for interprocess communication (be it on the same machine or elsewhere).

You’ll need the akka-cluster dependency in your POM file (I left it in there in the previous code sample by accident).

In your supervising actor (you interact with this Master actor, and it then interacts with all the Slave actors on your behalf), you’ll need to register for cluster events.

    private Cluster cluster = null;
 
    public Master() {
        cluster = Cluster.get(getContext().system());
    }
 
    @Override
    public void preStart() throws Exception {
        super.preStart();
        // Listen for remote slaves joining and leaving
        cluster.subscribe(getSelf(),
                ClusterEvent.initialStateAsEvents(),
                ClusterEvent.MemberEvent.class,
                ClusterEvent.UnreachableMember.class);
    }
 
    @Override
    public void postStop() throws Exception {
        cluster.unsubscribe(getSelf());
    }

public Master() {
cluster = Cluster.get(getContext().system());
}

@Override
public void preStart() throws Exception {
super.preStart();
// Listen for remote slaves joining and leaving
cluster.subscribe(getSelf(),
ClusterEvent.initialStateAsEvents(),
ClusterEvent.MemberEvent.class,
ClusterEvent.UnreachableMember.class);
}

@Override
public void postStop() throws Exception {
cluster.unsubscribe(getSelf());
}

You’ll now receive messages relating to members joining and members leaving (technically becoming unreachable) which you can handle in your onReceive handler.

    @Override
    public void onReceive(Object message) throws Exception {
        if (message instanceof ClusterEvent.MemberUp) {
            // Member joined
        } else if (message instanceof ClusterEvent.UnreachableMember) {
            // Member gone
        } else {
            unhandled(message);
        }
    }

When you bootstrap the Akka subsystem, you’ll also need to tell Akka what IP address and port to bind to (the bindAddress) and what the exposed IP and port are (the exposedAddress). This enables it to work in a NAT’d environment. If you get ‘dropping message for non-local recipient’, chances are that the exposedAddress doesn’t match what the slave has sent.

public void runMaster(final String bindAddress, final String exposedAddress) {
    String[] bindParts = bindAddress.split(":");
    String[] exposedParts = exposedAddress.split(":");
 
    List<String> seeds = new ArrayList<String>();
    seeds.add(String.format("akka.tcp://SimpleAkka@%s:%s", exposedParts[0], exposedParts[1]));
    List<String> roles = new ArrayList<String>();
    roles.add("master");
    Config config = com.typesafe.config.ConfigFactory.empty()
            .withValue("akka.actor.provider", ConfigValueFactory.fromAnyRef("akka.cluster.ClusterActorRefProvider"))
            .withValue("akka.actor.serializers.java", ConfigValueFactory.fromAnyRef("akka.serialization.JavaSerializer"))
            .withValue("akka.remote.untrusted-mode", ConfigValueFactory.fromAnyRef("off"))
            .withValue("akka.remote.netty.tcp.bind-hostname", ConfigValueFactory.fromAnyRef(bindParts[0]))
            .withValue("akka.remote.netty.tcp.bind-port", ConfigValueFactory.fromAnyRef(bindParts[1]))
            .withValue("akka.remote.netty.tcp.hostname", ConfigValueFactory.fromAnyRef(exposedParts[0]))
            .withValue("akka.remote.netty.tcp.port", ConfigValueFactory.fromAnyRef(exposedParts[1]))
            .withValue("akka.cluster.seed-nodes", ConfigValueFactory.fromIterable(seeds))
            .withValue("akka.cluster.auto-down-unreachable-after", ConfigValueFactory.fromAnyRef("10s"))
            .withValue("akka.cluster.roles", ConfigValueFactory.fromIterable(roles))
            .withFallback(ConfigFactory.load());
 
    ActorSystem system = ActorSystem.create("SimpleAkka", config);
    ActorRef master = system.actorOf(Props.create(Master.class), "master");
    master.tell(new Master.SayHello(), ActorRef.noSender());
 
    // Every 10 seconds, send a sayHello message
    while (true) {
        try {
            Thread.sleep(10000);
        } 
        catch (InterruptedException e) {
            // Don't care
        }
 
        master.tell(new Master.SayHello(), ActorRef.noSender());
    }
}

List<String> seeds = new ArrayList<String>();
seeds.add(String.format("akka.tcp://SimpleAkka@%s:%s", exposedParts[0], exposedParts[1]));
List<String> roles = new ArrayList<String>();
roles.add("master");
Config config = com.typesafe.config.ConfigFactory.empty()
.withValue("akka.actor.provider", ConfigValueFactory.fromAnyRef("akka.cluster.ClusterActorRefProvider"))
.withValue("akka.actor.serializers.java", ConfigValueFactory.fromAnyRef("akka.serialization.JavaSerializer"))
.withValue("akka.remote.untrusted-mode", ConfigValueFactory.fromAnyRef("off"))
.withValue("akka.remote.netty.tcp.bind-hostname", ConfigValueFactory.fromAnyRef(bindParts[0]))
.withValue("akka.remote.netty.tcp.bind-port", ConfigValueFactory.fromAnyRef(bindParts[1]))
.withValue("akka.remote.netty.tcp.hostname", ConfigValueFactory.fromAnyRef(exposedParts[0]))
.withValue("akka.remote.netty.tcp.port", ConfigValueFactory.fromAnyRef(exposedParts[1]))
.withValue("akka.cluster.seed-nodes", ConfigValueFactory.fromIterable(seeds))
.withValue("akka.cluster.auto-down-unreachable-after", ConfigValueFactory.fromAnyRef("10s"))
.withValue("akka.cluster.roles", ConfigValueFactory.fromIterable(roles))
.withFallback(ConfigFactory.load());

ActorSystem system = ActorSystem.create("SimpleAkka", config);
ActorRef master = system.actorOf(Props.create(Master.class), "master");
master.tell(new Master.SayHello(), ActorRef.noSender());

// Every 10 seconds, send a sayHello message
while (true) {
try {
Thread.sleep(10000);
}
catch (InterruptedException e) {
// Don’t care
}

master.tell(new Master.SayHello(), ActorRef.noSender());
}
}

On the slave side, when you bootstrap the Akka system, you’ll need to tell it to join the Master node. Likewise, this needs to know what IP address and port to bind to, what IP address and port it considers local, but also the IP address and port of the Master node.

public void runSlave(final String seedAddress, final String bindAddress, final String exposedAddress) {
    String[] seedParts = seedAddress.split(":");
    String[] bindParts = bindAddress.split(":");
    String[] exposedParts = exposedAddress.split(":");
 
    List<Address> seeds = new ArrayList<Address>();
    String seedNodeDetails = String.format("akka.tcp://SimpleAkka@%s:%s", seedParts[0], seedParts[1]);
    System.out.println("Seed node: " + seedNodeDetails);
    seeds.add(AddressFromURIString.parse(seedNodeDetails));
 
    List<String> roles = new ArrayList<String>();
    roles.add("slave");
 
    Config config = com.typesafe.config.ConfigFactory.empty()
            .withValue("akka.actor.provider", ConfigValueFactory.fromAnyRef("akka.cluster.ClusterActorRefProvider"))
            .withValue("akka.actor.serializers.java", ConfigValueFactory.fromAnyRef("akka.serialization.JavaSerializer"))
            .withValue("akka.remote.untrusted-mode", ConfigValueFactory.fromAnyRef("off"))
            .withValue("akka.remote.netty.tcp.bind-hostname", ConfigValueFactory.fromAnyRef(bindParts[0]))
            .withValue("akka.remote.netty.tcp.bind-port", ConfigValueFactory.fromAnyRef(bindParts[1]))
            .withValue("akka.remote.netty.tcp.hostname", ConfigValueFactory.fromAnyRef(exposedParts[0]))
            .withValue("akka.remote.netty.tcp.port", ConfigValueFactory.fromAnyRef(exposedParts[1]))
            .withValue("akka.cluster.auto-down-unreachable-after", ConfigValueFactory.fromAnyRef("10s"))
            .withValue("akka.cluster.roles", ConfigValueFactory.fromIterable(roles))
            .withFallback(ConfigFactory.load());
 
    ActorSystem system = ActorSystem.create("SimpleAkka", config);
    ActorRef slave = system.actorOf(Props.create(Slave.class), "slave");
    Cluster.get(system).joinSeedNodes(JavaConversions.asScalaBuffer(seeds).toList());
    system.awaitTermination();
}

List<Address> seeds = new ArrayList<Address>();
String seedNodeDetails = String.format("akka.tcp://SimpleAkka@%s:%s", seedParts[0], seedParts[1]);
System.out.println("Seed node: " + seedNodeDetails);
seeds.add(AddressFromURIString.parse(seedNodeDetails));

List<String> roles = new ArrayList<String>();
roles.add("slave");

Config config = com.typesafe.config.ConfigFactory.empty()
.withValue("akka.actor.provider", ConfigValueFactory.fromAnyRef("akka.cluster.ClusterActorRefProvider"))
.withValue("akka.actor.serializers.java", ConfigValueFactory.fromAnyRef("akka.serialization.JavaSerializer"))
.withValue("akka.remote.untrusted-mode", ConfigValueFactory.fromAnyRef("off"))
.withValue("akka.remote.netty.tcp.bind-hostname", ConfigValueFactory.fromAnyRef(bindParts[0]))
.withValue("akka.remote.netty.tcp.bind-port", ConfigValueFactory.fromAnyRef(bindParts[1]))
.withValue("akka.remote.netty.tcp.hostname", ConfigValueFactory.fromAnyRef(exposedParts[0]))
.withValue("akka.remote.netty.tcp.port", ConfigValueFactory.fromAnyRef(exposedParts[1]))
.withValue("akka.cluster.auto-down-unreachable-after", ConfigValueFactory.fromAnyRef("10s"))
.withValue("akka.cluster.roles", ConfigValueFactory.fromIterable(roles))
.withFallback(ConfigFactory.load());

ActorSystem system = ActorSystem.create("SimpleAkka", config);
ActorRef slave = system.actorOf(Props.create(Slave.class), "slave");
Cluster.get(system).joinSeedNodes(JavaConversions.asScalaBuffer(seeds).toList());
system.awaitTermination();
}

Finally, you’ll also need to collapse the various reference.conf files to avoid a ‘No configuration setting found for key ‘akka.remote.log-received-messages” error – in the Maven shade plugin configuration, below where you declare the jar’s main class, insert

<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
    <resource>reference.conf</resource>
</transformer>

If you use my code in BitBucket, then v2 has the above changes and you can see it by running the master up in a console

$ java -jar target/SimpleAkka-2.0.0.jar master 0.0.0.0:2551 127.0.0.1:2551
master received 'sayHello' at 1444294633817
New member: akka.tcp://SimpleAkka@127.0.0.1:2552 (2 total members)
master received 'sayHello' at 1444294643820
WorkResponse: akka.tcp://SimpleAkka@127.0.0.1:2552 start 1444294643841msec, end 1444294643946msec, duration 105msec

and a slave in another window. Every 10 seconds, messages should be printed to the screen.

$ java -jar target/SimpleAkka-2.0.0.jar slave 127.0.0.1:2551 0.0.0.0:2552 127.0.0.1:2552
Seed node: akka.tcp://SimpleAkka@127.0.0.1:2551
slave received 'WorkRequest' at 1444294643841

This is my personal blog - all views are my own.

Tagged with: , , ,