Monday, June 12, 2023

Introducing "standard streamed commands and events", or "stdce"

This blog introduces a stream-based command/event programming model that can be applied to many programming languages. Having an event-driven approach to programming like this helps achieve resiliency and responsiveness. Resiliency should speak for itself, and responsiveness often leads to an improved user experience.

Since writing up on Event-drive Finite State Machines a couple of years back, colleagues and I have worked on edfsm and the flip-flop protocol. "edfsm" is a Rust implementation of that previous write-up, and the flip-flop protocol is a half-duplex protocol for conveying commands and events. Commands and events are the common denominators to all that I've just referred to and form the foundation of what I'd like to introduce here today: the formality of a programming model that has surfaced for me, and one that can be used to shape many types of program, including those solved presently by "function-as-a-service" (FaaS). That's the last mention of FaaS here though; I don't want this idea to be constrained to this one, albeit popular approach to programming!

The approach I wish to formalise uses standard streams (stdio) and is essentially this:

  • stdin is used to receive a continuous stream of commands
  • stdout is used to emit a continuous stream of events where events can be of type "logged" or "ephemeral"
...and that's it.

The approach enables long-running event-driven programs to be written in virtually any programming language that supports stdio, and most programming languages do. Furthermore, the approach conveys the notion that events can be stored in an event log (logged), or that they may come and go where their loss is insignificant (ephemeral).

Pipelines can also be achieved given stdio. For example, a program hosted in an OCI container might be communicated with via Unix Domain Sockets producing and consuming with the program's stdio.

Using stdio to receive a stream of commands and emit a stream of events may not even be so novel. I'd be interested to learn of similar efforts and make comparisons. That said, this particular blog isn't going to fully formalise the approach either. That's better served with examples. But, I would like to capture a name for the approach and introduce it at a high level. To that end, I'm going to name this approach "standard streamed commands and events", or "stdce" for short.

Based on the work I've done in this space already, here are some commands and events that may be required. There will probably be more!

Commands

A command is delivered on stdin as JSON using the following form:

{
  "command": "<some-command>"
  "args": {
    "<some-arg>": <some-arg-value>
  }
}

The following commands are envisaged:
  • Configure
  • HandleRecoveryEvent
  • WaitForEvents
  • HandleEvent
The Configure command is the first command received and provides any secrets and other configurations that the program may require. I've previously used secrets to decrypt command payloads and encrypt event payloads.

HandleRecoveryEvent is sent for each event sourced from an associated log i.e. previously "logged" events. A program may reconstitute its state from these recovery events. A WaitForEvents command will be sent when all recovery events have been sent.

HandleEvent is sent for each event that occurs post-recovery. Events may be sourced from many topics including those produced by other processes.

Events

An event is a JSON output to stdout using the following form:

{
  "event": "<some-event>"
  "type": "<event-type>"
  "args": {
    "<some-arg>": <some-arg-value>
  }
}

Any event that the program wishes to emit can be conveyed using an event type of "Logged" or "Ephemeral".

Configuration

Not mentioned so far, but of course, a program will need to provide a configuration in terms of "topics" to subscribe to namespaces and other things. This information can be communicated outside of stdce and is left as an implementation concern.

Next steps

This blog is a starting point, and somewhere for me to just get the idea down and name it. I'd like to get some feedback and then expand by fleshing out a project on GitHub. Hopefully, more to come soon then...

Sunday, February 7, 2021

Event-driven Finite State Machines

I often use Event-driven Finite State Machines to capture stateful behaviour. Doing so in combination with a compiler with a strong type system helps me to think about all possible combinations of the effects of commands on a given state, and what the events produced (if any) are. In particular, I find many programs required for embedded applications can often be safely expressed by using Event-driven Finite State Machines, leading to very high-quality outcomes. They're also great for managing those stateful scenarios where network communication takes place.

My intention here is to capture a pattern that I've found myself using a great deal. I didn't come up with this pattern, and in fact for me, this pattern evolved out of using Akka persistence for event sourcing (1). These are also known as Event-driven finite state machines, which are described as the "transition from one state to another is triggered by an event or a message" (2).

Here are the two function signatures that I use:

fn for_input(s: State, c: Command) -> Vec<Event>

fn transition(s: State, e: Event) -> State

The first function yields zero or more events to be caused, given a state and a command. The second function applies an event given a state to yield a new state.

Commands are an enumeration of actions that can be performed on the state machine and defined exclusively within the confines of it. Events can come from many places outside of the state machine and cause commands to manifest themselves for the for_inputs function. For example, the state machine might not be aware of a network, yet network events such as the receipt of data can manifest in one or more commands to submit to the for_inputs function.

The for_input function yields zero or more events having carried out the command, which may be side-effecting.

For each event returned from the for_input function, the transition function is then called, yielding a new state. The transition function avoids performing side-effects and is therefore pure. We should be able to replay events to produce a state without causing side-effects. More on that later.

These functions are able to be combined to perform commands and transition state e.g.:

fn run(s: State, c: Command) -> (Vec<Event>, State)

However, I find it useful to keep both of the functions distinct even when composed, as it helps think through:

  • what commands upon state produce what events; and
  • what events cause a transition from one state to another.

These considerations map nicely to FSM diagrams e.g.



Here, the State enumeration is Initial and Idle, the Commands are Start and Stop and the events are Started and Stopped. Given the Initial state, if a Start command is received then a Started event should be produced, causing a transition to the Idle state.

Writing the code for these two separate concerns also maps quite nicely. Keeping the diagram up to date with the code, and treating it as the source-of-truth, pays dividends in communicating design and driving modifications to the code.

Another aspect of having the two functions is that the second function permits the restoration of the state from replaying a series of events. This is known as event-sourcing (3). In a nutshell, event sourcing permits us to reconstruct the state by replaying the events historically. We can therefore avoid replaying commands and their effects and call just the transition function. By leveraging event sourcing, we can also persist our events having called the for_input function, and subsequently restore our state on the state machine even when the host program is restarted.

States, Commands, and Events are often expressed in a language using enumerations. By leveraging pattern matching in languages such as Rust, Scala, and Kotlin, the compiler can help us ensure that we are thinking of all combinations of applying commands to state and events to the state. Resist the temptation to "catch-all" commands or events by using matching expressions (match arms) as it is inevitable that they'll be a state combination that you've not considered, and we want the compiler to help us exhaust them all. That said, there some commands that apply to all states. In our example, the Stop command may be universally applied no matter what the state. There are always exceptions.

The state objects will contain, well, state! This internal state can be shared between state objects leveraging the fact that there will only ever be one state at a time. Reference counting this internal state is common where value-copying may be expensive. In Rust, I've successfully used Rc<RefCell<T>> to share mutable internal states between each State enumeration.

Happy stateful programming!

(1) https://doc.akka.io/docs/akka/current/typed/persistence.html
(2) https://en.wikipedia.org/wiki/Event-driven_finite-state_machine
(3) https://martinfowler.com/eaaDev/EventSourcing.html


Tuesday, January 28, 2020

Conflict-free replicated data as an application concern

A while back, I wrote about converging observations in IoT and described how we de-duplicate sensor data. This post is a follow-on in that it describes the same domain in terms of how we distribute the data itself. My inspiration for this particular post is from Colin Breck's excellent post on "Shared-Nothing Architectures for Server Replication and Synchronization". In particular, Colin describes the development of a distributable event log that he was involved with prior to the days of Kafka. We also have a distributed event log so I thought it'd be great to share our approach given that distributing data is hard and we have adopted a relatively simple solution, no doubt used by a few others.

In a nutshell, we move the problem of distributing data to the domain of the application. The closer this problem is to the consumer of the data then the easier it seems to handle (note: I'm not saying it is easy, just easier!). My previous article on converging observations is pretty much of the same vein; de-duplicating data at the point of consumption is much less error-prone than de-duplicating at the source given that there can be many sources that would otherwise need to know of each other.

A real-world example is needed.

We have a topic that events are appended to on a durable queue (our event log). These events describe IoT devices as distinct from recording observations. We call these events "end device events". Here's a complete list of them:

  • BatteryLevelUpdated
  • NameUpdated
  • NoncesUpdated
  • NwkAddrUpdated
  • NwkAddrRemoved
  • PositionUpdated
  • SecretsUpdated
(if you're interested, we have a public API to our system describing these events and more)

Our system has a cloud-based component and an edge-based one. We provision end devices in the cloud and record their observations at the edge (the edge is a farm in the case of our agriculture business).

NwkAddrUpdated, NwkAddrRemoved, and SecretsUpdated can only occur in the cloud. This is a design decision and reflects the use-case of provisioning end devices. When the edge connects to the cloud, it receives these events which are sufficient for an end-device to start sending its observations at the edge. The edge-based end-user may subsequently associate a position and a name with the device. As we receive data from an end-device, battery levels and nonce values (for end device authentication) will also appear as events. These edge-based events are pushed to the cloud.

This single source of writing, at a fine level of granularity, results in a complete picture of end devices both in the cloud and at the edge - with no conflict resolution required!

Now, the doubters among you may be saying, "but what if I need to provide the position from both the cloud and the edge"... or something similar... Well, then you might have a PositionReset event for representing the position provided to the cloud i.e. create a new event type!

Thus... event types dictate where they are written, hence my earlier assertion that distribution becomes an application concern.

We can also replicate events in the cloud for the purposes of backup and sharding. A nice feature of durable queues is that consumers can remember the offset of the last event that was consumed. Each consumer can, therefore, operate at their own pace, which is particularly useful with replicating between the cloud and the edge as the networks connecting them are typically unreliable. Replicating within the cloud is typically very fast and barely noticeable as the networks there are much faster and more reliable.

When writing events in the cloud we can use a partition key to determine which node is responsible for receiving the event. Again, there's only one node responsible for writing.

The down-side of our approach is that if a node becomes lost, at the edge or in the cloud, then the associated events will be lost. This is acceptable for our use-case though as the mean-time between failures is relatively low. 

The approach we've taken works for our use-cases in the world of IoT where best-effort is quite acceptable as the network itself tends to be quite unreliable. There are trade-offs but, I believe, the above approach can also work for many use-cases outside of IoT.

Thanks for reading.

Friday, July 19, 2019

Converging observations in IoT

The problem is this. You have a sensor on a farm that transmits its observations. The observations can be received by one or more towers. Each tower forwards the observations it receives to a server that then stores them. You now have a situation where a single observation has been recorded twice. What to do?

You might choose to de-duplicate at the server. You know that there can only be one observation sent per hour, so if there are two within the hour that have the same values then pick just one. Easy.

However, what if you then have multiple servers, perhaps for redundancy? What then if one tower connects to one server, and another tower connects to another server? It could even be that a third party server sends us the same observations given a roaming style of arrangement. Same observations, two towers, two servers.

This is our reality.

You might employ clustering for your server and attempt to eliminate the duplicate state. Conflict-free Replicated Data Types (CRDTs) are great for this. This is a reasonable solution and having used CRDTs lovingly, we could just stop there.

We’ve chosen another strategy though, similar in spirit to CRDTs. However, we think it may be a bit simpler and it also allows for the other server to not be under our control as per the roaming scenario.

CRDTs have this wonderful property of always knowing how to merge i.e. there is never any conflict. Our view is that we permit conflict and eliminate it at the point of being consumed. We allow observations to be recorded indiscriminately as we don’t know what other systems will also be recording them.

The “point of being consumed” is most often when we render the data at the presentation layer. We provide the presentation layer with multiple sources of observations. We let the presentation layer de-duplicate. This is powerful as the presentation layer understands the context in which it is consumed. It is easy for it to reason that an observation that has the same value within the same hour is a duplicate and so it can be dropped. A presentation layer is a state machine.

Other consumers of observations e.g. a service that actuates given sensor inputs, is also a state machine and is also in a great position to de-duplicate. Its time window for duplicate detection could also be different from that of the presentation layer given that actuation may occur over a shorter or longer period of time.

Oh, and if you’re at all worried about the number of duplicates being sent to the presentation layer then: a) generally don’t worry (measure the effect); or b) de-duplicate further upstream.

And that’s it really.

Thursday, November 30, 2017

Landlord can reduce the cost of running microservices

When running Spring's PetClinic Spring Boot reference application, my experimental Landlord project appears to save 5 times the memory for the second instance of it. My terse observations show that PetClinic requires about 250MiB of resident memory when running as a standalone Java 8 application. When running two instances of it hosted by Landlord, approximately 342.2 MiB of resident memory is consumed in total i.e. Landlord + PetClinic + PetClinic.

These observations illustrate that sharing JVM machinery has positive impacts on hosting services. While running multiple PetClinic applications on the same machine isn't a particularly realistic scenario, running multiple Spring Boot microservices on the same machine is. Landlord could, therefore, provide a significant cost reduction in the hosting of any JVM based microservices.

Introduction

I've been experimenting with a project I created named "Landlord". The idea of the project is to promote a multi-tenant JVM scenario with the goal of saving memory. Our standard usage of the JVM isn't particularly kind regarding memory usage with a simple Hello World application consuming about 35MiB of memory out of the box. This figure is about 10 times what you get with a native target. For example, the same program built via Scala Native will consume about 4.5MiB of memory. Note that we're talking about resident memory - not the JVM heap (which will be much less than that).

I thought that it'd be fun to run the standard Spring Boot PetClinic application within Landlord in order to get a feel for Landlord's cost savings.

Steps

  1. Clone PetClinic and then perform a ./mvnw package in order to get a standalone jar.
  2. Let's determine the smallest amount of RAM that PetClinic can get away with comfortably. For this, I kept running the following java command until I stopped seeing Out Of Memory (OOM) exceptions from Java.

    java \
      -XX:+UseSerialGC \
      -Xss512k \
      -XX:-TieredCompilation \
      -XX:CICompilerCount=1 \
      -XX:MaxRAM=80m \
      -cp ${pwd}/target/spring-petclinic-1.5.1.jar \
      org.springframework.boot.loader.JarLauncher

    Note that the serial GC and other options are the results of my previous investigations in order to keep JVM resident memory usage down. There are pros/cons, but the above configuration is useful when deploying to small devices such as network gateways. That said, if you can get your application running well with the above options, you're likely to run even better at a larger scale, and potentially save money given a lesser number of machines to host your required load. One more option I'd normally use is -Xss256k, but I observed a stack overflow so it seems that Spring likes lots of stack.
  3. When I got to that point, I then profiled the process in order to observe the JVM heap used vs allocated vs the limit. With the above configuration, PetClinic appeared to function and I didn't observe OOM, but observing the JVM heap revealed:

    Used: 37MB Alloc.: 38MB Limit: 38MB

    That feels a bit too close to comfort and seemed to be causing the GC to kick in each time that I refreshed the page.

    So, I ended up specifying -XX:MaxRAM=100m. This then yielded:

    Used: 46MB Alloc.: 48MB Limit: 48MB
  4. Now, even though I've specified max ram as 100MiB, this turns only to be a starting point for the JVM on how it should size its memory regions. On OS X, if I use the Activity Monitor's inspection for a process (double-click on a process in its memory tab) then the following is reported: Real Memory: 259.3 MB (that's its Resident Set Size - RSS). So, even though we stated that we didn't want more than 100MiB, this is not a limit and the JVM can take more. Apparently, this is a JVM implementation consideration. I assume (hope) that the -XX:+UnlockExperimentalVMOptions -XX:+UseCGroupMemoryLimitForHeap are much stronger than -XX:MaxRAM, or perhaps this is just some OS X JVM implementation thing... I do wish that the JVM was more predictable in this regard.
  5. Time for Landlord. Let's start it up with the same options:

    JAVA_OPTS="-XX:+UseSerialGC -Xss512k -XX:-TieredCompilation -XX:CICompilerCount=1 -XX:MaxRAM=100m" \
    daemon/target/universal/stage/bin/landlordd \
      --process-dir-path=/tmp/a
  6. Let's observe its memory via the Activity Monitor: Real Memory: 133.8 MB.
  7. Now to load PetClinic. For this, we need a script to load it into Landlord:

    #!/bin/sh
    ( \
      printf "l" && \
      echo "-cp spring-petclinic-1.5.1.jar org.springframework.boot.loader.JarLauncher" && \
      tar -c -C $(pwd)/target spring-petclinic-1.5.1.jar && \
      cat <&0 \
      ) | nc -U /var/run/landlord/landlordd.sock
  8. Upon executing the above script: Real Memory: 278.3 MB. That's just 19MiB more than when it was run as a standalone application. Connecting to Landlord via YourKit shows the heap as:

    Used: 47MB Alloc.: 48MB Limit: 48MB

    ...which is quite similar to before. There doesn't appear to be any GC thrashing either. This shouldn't be any great surprise. Its thread and heap usage is quite minimal.

    Now, using Landlord for hosting just one application is not really going to give you any great benefit. Landlord's benefit's kick in when multiple applications are run. Let's run another PetClinic within Landlord.
  9. First, so that PetClinic's ports don't clash, declare a random port to bind to within src/main/resources/application.properties:
    server.port=0
  10. Package the app via ./mvnw package and then invoke the Landlord script for PetClinic as before.

    Unfortunately, this doesn't work... the embedded Tomcat of Spring Boot throws an exception:

    Caused by: java.lang.Error: factory already defined
    at java.net.URL.setURLStreamHandlerFactory(URL.java:1112) ~[na:1.8.0_131]

    While we're at it, there are a couple of places where Spring declares shutdown hooks. Landlord warns of this with the following output:

    Warning: Shutdown hooks are not applicable within landlord as many applications reside in the same JVM. Declare a `public static void trap(int signal)` for trapping signals and catch `SecurityException` around your shutdown hook code.

    Clearly, there are some changes in order for an application to run within a multi-tenant environment. The PetClinic/Spring Boot environment is built to assume that it is running within its own process. Going forward, I believe it would be easy for the Spring Boot project to cater for these multi-tenancy concerns. For now, we change the PetClinic application to use Jetty instead of Tomcat. To do this, we follow the recipe of Spring Boot's documentation.
  11. Once the Jetty version is running, I observed the native Java process with a: Real Memory: 245.9 MB. Under Landlord, the same package + Landlord: Real Memory: 290.9 MB. A bit more of a difference to the 278.3 MiB for the Tomcat based package + Landlord, but who knows what the JVM is doing... perhaps we can assume this as being some JVM anomaly.

    Now, if we try to run another PetClinic within Landlord then we get an OOM memory error. Clearly, we need more JVM not having very much at hand before. Let's re-run Landlord with -XX:MaxRAM=120m (20MiB more overall).

    We now get a problem given a clash of JMX endpoints, so we turn them off (src/main/resources/application.properties: spring.jmx.enabled=false) and try again.

    Real Memory: 342.2 MB

    That's just 51.3MiB additional RSS to run what would be 245.9 MiB to run an additional PetClinic outside of Landlord. Landlord, at least in this simple observation, is reducing the memory cost by about a factor of 5.
This has been a simple test and I welcome feedback on improving it.

Sunday, October 16, 2016

When closed source should become open

I've been thinking for some time about when closed source should become open, particularly in the context of when your core business is about producing software. If your core business is to provide a service such as movies, as in the case of Netflix, then the dynamics are different. Because the core business is to produce movies then simply go OSS and reap the benefits from having done so (as Netflix indeed have).

Before I start I should state that my views here don't describe the only reason why to go with open software; there can be other reasons of course. Indeed there are many valid reasons to start with open as well. This post just investigates the closed to open transition, and when to make it.

When your business is about producing software, you're producing software assets that contain costly intellectual property. I'm a massive fan of open software and I've made many contributions in that space. However a software business also needs to make money of course.

I assert that there is a very limited window of opportunity for a software business to retain a software asset as closed; and that window is governed by the open competition that it faces. The job of the software business then, is to stay ahead of the open curve, yet yield to open software when it starts to become a threat. This happened with Java when it was threatened by Apache Harmony. I believe that Harmony subsequently died precisely because of Sun OSS-ing Java.

I should state right now that my thoughts have been influenced by Danese Cooper who gave a great talk on this very subject during Scala Days 2015. Denese discussed why open languages win, and I think her talk has a wider application.

When discussing the subject of open vs closed software with colleagues at Lightbend over the past year or two, I've described closed software as resting on a tectonic plates. As these plates move around then the closed software at the edge falls off into the abyss of open software! I think that the analogy is mostly useful though in order to illustrate that the world changes. Because of this if you must regularly re-evaluate the competition that is open. If you have closed software solving a particularly useful/important problem then you can be fairly certain that open software will rise around it (again thinking of what Denese said here).

Open your commercial software and neutralise its open competition, also reaping the benefits of having gone open. Focus on adding higher level value building out from your core. Stay ahead of the game.

You certainly can't sit still.

Thursday, July 14, 2016

Microservices: from development to production


Let’s face it, microservices sound great, but they’re sure hard to set up and get going. There are service gateways to consider, setting up service discovery, consolidated logging, rolling updates, resiliency concerns… the list is almost endless. Distributed systems benefit the business, not so much the developer.

Until now.

Whatever you think of sbt, the primary build tool of Lagom, it is a powerful beast. As such we’ve made it do the heavy lifting of packaging, loading and running your entire Lagom system, including Cassandra, with just one simple command:


sbt> install


This “install” command will introspect your project and its sub-projects, generate configuration, package everything up, load it into a local ConductR cluster and then run it all. Just. One. Command. Try doing that with your >insert favourite build tool here<!

Lower level commands also remain available so that you can package, load and run individual services on a local ConductR cluster in support of getting everything right before pushing to production.

Lagom is aimed at making the developer productive when developing microservices. The ConductR integration now carries that same goal through to production.

Please watch the 8 minute video for a comprehensive demonstration, and be sure to visit the “Lagom for production” documentation in order to keep up to date with your production options. While we aim for Lagom to run with your favourite orchestration tool, we think you’ll find the build integration for ConductR hard to beat. Finally, you can focus on your business problem, and not the infrastructure to support it in production.

 Enjoy!