Showing posts with label python. Show all posts
Showing posts with label python. Show all posts

2021-12-21

Raspberry Pi, Home Assistant controlled On Air Light

On air light


This project was to make an On Air light that would automatically come on whenever I'm in a meeting or call on my Mac. This was achieved using Home Assistant, Home Assistant Companion for Mac, Mosquitto, Raspberry Pi, 2 Relay Module and an On Air light.


Setup

Home Assistant, the Home Assistant companion and Mosquitto all need to be installed and configured to connect together. In my case I installed everything on my Mac but it's probably more common to have Home Assistant and Mosquitto co-hosted on another server. The Home Assistant companion is installed from the App Store. It adds your Mac as a Device and within it are sensors for various things including the microphone. This allows you to make automations for when the microphone is active/inactive.


In this automation the "Microphone In Use turned on" trigger is used with an MQTT action to simply publish a payload to a topic:


We're sending "flash" to the "home/onair" topic.


Wiring

The On Air light has a button which cycles through three states: ON, FLASH and OFF. I took it apart and found that it's a simple button that when pressed completes a circuit. Each momentary press/release makes and breaks the circuit. This meant it was pretty easy to achieve this same action with a GPI controlled relay. The orange and yellow wires go from the switch points to the relay module. This is wired up to the Raspberry Pi.




Code

The onair.py script uses two simple libraries to control the GPIO and to work with MQTT: RPi.GPIO and paho-mqtt. It uses the payload received on the topic to advance the on air light's state from whatever it currently is to the desired state. As there are three states: ON, FLASH and OFF, this could mean two "button presses" - to get to OFF from ON you have to go through FLASH.


That's it!

When I start a call in Teams, Zoom, Chime... etc. the "Microphone In Use turned on" trigger fires the "home/onair:FLASH" MQTT message. The Raspberry Pi receives this, advances the state to FLASH which triggers the relay and sets the On Air light flashing. When the call is over the "Microphone In Use turned off" trigger is used to set the On Air light to OFF. 

2021-02-27

Practical ML end-to-end project: labelling, training, inference, serverless deployment

The End

The goal of this project was to make a system that could play the card game Quiddler alongside human players. The reason for doing it was to have a vehicle to learn new technology. Therefore, to make this challenging I wanted a cloud hosted system to read the physical cards from a camera and deduce the best play.

And here's the result: quiddler.jerbly.net


When you visit this web page you can open the webcam and then take a photo of the cards in your hand and the card on the deck. Provide the number of cards in your hand as a hint and then ask for the best play. The screenshot shows that the cards have been recognised successfully and the suggested play is to pick up the deck card "o" and drop the "a", then make the words "xenon" and "udo" for a score of 37.

So what happened?

  1. quiddler.jerbly.net is a custom domain assigned to an Azure Static Web App.
  2. The vue.js web app takes the webcam images and sends them to an Azure Function App.
  3. The function runs an object detection model over the image to find the letters on the cards and returns them to the web app. This IceVision model was built using hand labelled photos and trained using AzureML.
  4. The web app then sends the letters to a second function to get the suggested play.
  5. This second function uses a recursive searching method over a prefix tree index of a dictionary.
  6. The result is sent back to the page and rendered.  

There's a lot going on here. This long blog post will take you through the journey:

  • Object detection: Data labelling, model training and inference on AzureML.
  • The Game - using prefix trees to find the best play
  • Using a serverless Azure Function App for Object Detection Inference and the game
  • Creating and hosting the vue.js web app as an Azure Static Web App

Object Detection

My first attempt at a system to read the cards was through OCR. Surely reading characters is a solved problem? I went down a rabbit hole here looking at classic CV techniques and I even recreated the EAST paper so I could train my own text detector. I learned a lot about deep neural networks, pytorch, fastai and machine learning in general doing this, but it didn't make a good Quiddler card recognizer! EAST gives you rotated bounding boxes over areas of the scene that look like text. The idea was to get these rboxes, cut them out, rotate them back to horizontal, filter and pass to Tesseract. It was not great.

Then it dawned on me to stop seeing the characters on the cards as text but just as things, objects. What I needed was a good object detection framework with built-in image augmentation. When studying deep learning it's common to train a network to recognize cats and dogs, the idea being that I can show it enough cats and dogs so it learns cattiness and doggy-ness. Now if I show it a picture of a cat it's never seen before it'll recognize it as a cat since it has a higher score of cattiness. In this experiment you spend a lot of time worrying about generalizing for any cat, not just the ones in the training set. In the case of these cards though it's subtly different. I only want to detect this exact text, if it's an A I'm not trying to generalize for all different fonts or handwriting etc. I only need to detect the A as it appears on these cards. The variations which need to be covered though are to do with how the cards are held up to the camera: rotation, zoom, lighting and perspective warping. Since these are relatively easy to synthesize through image augmentation it meant I didn't have to go crazy and take 10,000 photos and label them. In fact I only used 120 photos for 31 different cards!

Labelling

I used an iPhone, a webcam and VGG Image Annotator to make the dataset. Since I started on the EAST project and rotated boxes, I had marked these up as polygons rather than flat boxes. This doesn't matter though as you can easily put a flat box around a polygon by just finding the min and max.

It's pretty amazing what you can achieve with such a small dataset. Here's a screenshot from the VIA grid view of the entire labelled training set:

IceVision

I found the IceVision object detection framework via the fastai Discord. It seemed to be the perfect fit for my problem making it really simple to train a faster-rcnn model on my dataset. All that was needed was a parser for the VIA json output. This was originally written standalone but I later did a PR to add this to the library.

Training

Training went well locally on my TensorBook but I was interested to try this out on AzureML to capture statistics from multiple training runs and to use the cloud compute for larger batch sizes (the RTX 2080 in the TensorBook has 8GB). The AzureML Quiddler notebook shows you how to upload and register a dataset, train the model and then register the best performing model in your workspace. The main concept in AzureML is that you write a training python script that takes in arguments for hyperparameters etc. You then provide the SDK with an environment containing the conda or pip dependencies and it builds and registers docker container for you. Then you launch the script via the container with the parameters you want on the compute you specify. The compute can be your local machine or a cluster in your AzureML workspace.
The gist above runs the training.py script with 3 epochs, batch_size 6 etc. on local compute. The default base image for the environment contains python 3.6 on Ubuntu 16.04. To match my dev environment I wanted python 3.8 on Ubuntu 18.04. To achieve this you need to specify an alternate base image and specify python 3.8 in the dependencies yaml file:

Dataset

Going back to the launch code for a moment you'll see the input_data argument. Here I pass it a dataset to be used by the training script as a download since I only have 120 photos. Alternatively you can ask for this to be a mount when you have a larger dataset. AzureML takes care of getting the dataset in a location for you and provides the script with a path.

To look at the registered dataset you can use the AzureML Workspace via the Portal:

 

Statistics

While the training is running you get a nice view inside your notebook. To make this really useful you can use a group of logging functions from the Run class inside your script to post data to your training run. By hooking this up to the fastai callback system you can log your losses and metrics for each epoch quite easily:
Then the notebook widget produces this:
I am working on another project to make an AzureML / fastai / IceVision helper library where you can get this and more. So by the time you're reading this there could be another dependency.

Not only do the statistics go to your notebook but they're also logged in your AzureML workspace and visible in the portal. You can then do neat things like compare experiment runs:
The Run class also contains log methods for other data types like tables and arrays. You can also log images - at the end of the training I run inference on a test set and log the output as a table with its images:
You can then look at these in the portal:

Results

My best run as determined by my test set accuracy ran for 200 epochs in 35 minutes with a small batch size, 6. This run correctly identified every card in the test set. I wanted a practical way to confirm that the model was going to be good enough though so I created a quick script to predict cards in real-time from a webcam. This allowed me to manipulate the cards in front of the camera and spot any weaknesses:

The Game

On your turn you need to try to make the best scoring word or words from your hand. You can also substitute one of your cards with the card face up on the discard pile. Or choose the next unseen card from the face-down deck. The complication for the algorithm is that you're not trying to make the longest single word but to use up all the cards in your hand on multiple words. Also, some of the cards are double-letter cards, IN for example.

Prefix Trees are used to hold the structure for all possible words and the permutations given the double-letter cards. For example the word: "inquiring" can be constructed from the cards in 8 ways:

            'in/qu/i/r/in/g': 36,
            'in/qu/i/r/i/n/g': 36,
            'in/q/u/i/r/in/g': 46,
            'in/q/u/i/r/i/n/g': 46,
            'i/n/qu/i/r/in/g': 36,
            'i/n/qu/i/r/i/n/g': 36,
            'i/n/q/u/i/r/in/g': 46,
            'i/n/q/u/i/r/i/n/g': 46

As the game progresses you start each round with an increasing number of cards in your hand. The last round has 10 cards. The implementation takes the hand cards and deck card and suggests the best play as a result.

Hand:     a/cl/g/in/th/m/p/o/u/y
Deck:     n
Score:    58
Complete: True
Words:    ['cl/o/th', 'm/u/n/g', 'p/in/y']
Pick up:  n
Drop:     a

Inference Service

AzureML has a few different ways that you can deploy a model for inference. This notebook shows the flow and interaction with a local, ACI (Azure Container Instance) and AKS (Azure Kubernetes Service) deployed AzureML service. 

The pattern is very similar to what we did for training: define an environment, write a scoring script, deploy.

The dependencies for the environment are similar to those used for training but we can use icevision[inference] instead of icevision[all] to reduce the size a bit. Plus we need to add pygtrie for the prefix tree code. I did hit on a problem here though that might help people out. I wanted to use a CPU Ubuntu 18.04 base image but for some reason it threw errors about a missing library. Thankfully the AzureML SDK allows you to specify an inline dockerfile so we can add the missing library:
Note that we're defining the scoring script, score.py, in the InferenceConfig. This contains two functions which you need to define: init() and run(). In the init() function you load the model and any other initialization, then run() is called every time a request comes in through the web server.
To consume the service you just need to know the service_uri and optionally the access key. You can get these from the portal or by calling get_keys() and grabbing the service_uri property from the service object returned when you call Model.deploy(). The gist below shows deploying to AKS and getting those properties:
Note I'm referring here to an inference cluster named jb-inf which I previously set up through the portal.

Finally we can call the web service and get the results. The scoring script takes a json payload with base64 encoded images for the hand and the deck and a hint for the number of cards in the hand:

Serverless

The inference service through AKS is great for serious usage but it's not a cost effective way to host a hobby service. Your AKS cluster can scale down pretty low but it can't go to zero. Therefore you'd be paying for 24x7 uptime even if no one visits the site for months! This is where serverless can really shine. Azure Function Apps on the Consumption Plan can scale to zero and you only pay for the time when your functions are running. On top of this there's a significant level of free tier usage per month before you would have to start paying anyway. I haven't paid a cent yet!

So what's the downside? Cold-start:
"Apps may scale to zero when idle, meaning some requests may have additional latency at startup. The consumption plan does have some optimizations to help decrease cold start time, including pulling from pre-warmed placeholder functions that already have the function host and language processes running."

Here's some data taken from running a few invocations in a row on the card detection function which I'll go through next:

As you can see, there's a slow invocation taking 50 seconds, then all subsequent ones take 10.

Function App

Azure Function Apps are essentially hosts that contain multiple functions. This is a little different if you are used to AWS Lambda functions. As you can imagine there are many different ways to develop, test and deploy Azure Functions but I have found using the Azure Functions extension for VSCode is really nice. There are great getting started guides that I won't repeat here.

When you're working with Azure functions you define settings for the host in the host.json and for python you have a single requirements.txt file to define the dependencies for the host. I decided to divide the problem of finding the best play for a set of cards into two functions. One, cards, that does the object detection on the hand and deck and a second, game, that takes two strings for the hand and deck and returns the best play. This basically splits what we had earlier from the single scoring script in the AzureML AKS deployment. Here's what the directory tree looks like:

├── cards
│   ├── function.json
│   ├── __init__.py
│   └── quiddler.pt
├── game
│   ├── function.json
│   ├── __init__.py
│   ├── quiddler_game.py
│   └── sowpods.txt
├── host.json
├── local.settings.json
└── requirements.txt

You can see the layout with the two functions and the host. Note, in the cards folder there's quiddler.pt - the trained model, and in the game folder there's sowpods.txt - the word dictionary. These are both heavyweight items for which there is some overhead to initially load and process. In both cases we use a global from the entry-point function: main() to lazy initialize, just once. Subsequent calls to this running instance will not need to perform this initialization:
The game function is quite straightforward. Let's look at the cards function. There are a couple of changes from the AKS implementation mostly to support the web app front-end that we'll get to later. Support has been added for URLs - if you pass in a URL instead of base64 encoded image it will fetch it. We're also now returning the images with the predict mark-up to display in the UI. This borrowed a bit of code from the training run when we uploaded test result images to the workspace:
A final note on the requirements.txt - to save some load time we want to keep the dependencies fairly tight. So CPU pytorch libraries are loaded and icevision with no additional options:
After deploying to Azure you can see the functions in the portal and in VSCode:
If you right-click on the function name you can select "Copy Function Url" to get the api end-point. In my case for the cards it's: https://quiddler.azurewebsites.net/api/cards

Now we can post a json payload to this URL and get a result, great! So let's make a front-end to do just that:

Static Web App

The Static Web Apps service in Azure is relatively new. At the time of writing it's still marked as Preview. Basically we want to host some html, css, javascript etc. - this is achievable using a Storage Account, but this new service has some cool additions.

First things first, let's take a look at the vue.js web app. Front-end design is not my strength, so let's just focus on how we call the serverless functions and display the results. In main.js this is handled with uploadImages(). This builds the json payload and uses axios to send it to the URL of the function we discovered earlier. If this call is successful the marked-up card images are displayed and the strings representing the detected cards are sent to the game serverless function. If this second call is successful, the results are displayed.

A really useful feature of all the Azure services used in this blog post is that they can all be developed and tested locally before you deploy. You saw how the training and inference in AzureML can all be done locally. Function Apps can run up in a local environment and you can debug right in VSCode. One complication with this is CORS. When developing locally you need to define the CORS setting for the function app in local.settings.json:

With this in place you can just open the index.html page as a file in Chrome and temporarily set the api URLs to the local endpoint. e.g. http://localhost:7071/api/

The new Static Web App features Github Action based deployment. So, once you're all set with the local development you can use the VSCode extension and provide access to the repo containing your app. It will then install an Action and deploy to Azure. Now, whenever you push to main it will deploy to production! There are also staging environments for PRs, free DevOps! You can even see the action history in VSCode:

Static Web Apps defines a domain for you that you can change by setting a Custom Domain through the portal - I assigned this to quiddler.jerbly.net. Finally, we have to set CORS in production through the portal:

Wrap-up

And that's it! Go to quiddler.jerbly.net and have a go. If you don't have a set of cards you can still see it in action by clicking "Random" for hand and deck. Oh and if you don't recognize some of the words it comes up with (this will happen a lot) just click the link though to the Collins Dictionary definition. 


2018-09-16

Serverless Machine Learning Classifier SlackBot

In this post I show the steps to build and deploy a SlackBot into AWS using Chalice. The bot uses a trained machine learning model to predict the R&D team most able to answer a question that the user has entered. Like so:

The detail on training the machine learning model will be in a later post. For now just know that text from Jira tickets assigned to scrum teams labelled A to I is used to build a classifier so we can predict a team given a question. The trained model is stored in S3.

Serverless hosting

The SlackBot is basically a web server responding to incoming requests via a rest endpoint. You could just run an Internet accessible server somewhere but this seems like a perfect application to run in AWS Lambda; each request is short-lived and stateless. The problem with AWS is there's a lot to learn to put in place all the bits and pieces you need to achieve this: Lambda functions, API Gateway, IAM roles etc. To simplify this, open source tools and frameworks can take care of a lot of this overhead. AWS Chalice is
...a microframework for writing serverless apps in python. It allows you to quickly create and deploy applications that use AWS Lambda. 
I won't repeat here what you can read in the Chalice docs. I'd suggest building a Hello World rest api to begin with and then following along with this post.

Slack client

To interact with Slack from our Python code we need to use slackclient. Since our application is already running in the Chalice framework we won't be using a lot of the Slack client framework. In fact it's only used to make the api call to return the response to the question.

The Bot does need to be registered in your Slack account. I won't step-by-step through that but here is a screenshot of the critical page showing the configuration of the Event Subscriptions:


You'll see above that there's a Request URL to fill out and in this screenshot it's referring to an AWS location. We haven't got that far yet, there's a little bit of a chicken and egg situation here. We have to build the bot and deploy it via Chalice to get the URL to plug in here. However, in order to deploy that bot we need some info from Slack:

  1. Basic Information / Verification Token -- (for the SLACK_VERIFICATION_TOKEN)
  2. OAuth & Permissions / Bot User OAuth Access Token -- (for the SLACK_BOT_TOKEN)

The Code

If you made the Hello World Chalice application you'll be familiar with the main components:
  • app.py - where we'll build the slackbot
  • .chalice/config.json - where we'll configure the app
  • requirements.txt - the python libs we need

This first block of code does some imports and some initialization. The imports refer to some packages which we'll need to bundle up into the deployment zip for the Lambda function. They are declared in the requirements.txt file for Chalice:


Part of the initialization is reading in environment variables. It's good practice to separate these configuration settings from your code. In addition to the Slack tokens mentioned earlier, we also specify the S3 bucket and key for the machine learning model's location. Here's an example config.json file:


There are a couple of other settings in this config file to look at:

lambda_memory_size - I've set this to 1536 MB - note that the memory allocation is used to calculate a proportional CPU power. Since we're loading and running an ML model the default allocation is not sufficient. My strategy is to start reasonably high and then tune down. The cloudwatch logs show the amount of time spent and memory used for each run - this is good information to use when tuning.

manage_iam_role and iam_role_arn - Chalice will attempt to build an iam role for you with the correct permissions but this doesn't always work. If S3 access has not been granted on the role you can add this in the AWS console and then provide the arn in config.json. You'll also need to set manage_iam_role to false.

Delays

Eagle eyed readers will have noticed that in the cloudwatch log screenshot above there was a big time difference between two runs, one was 10 seconds and the next less than 1 millisecond! The reason for this is to do with the way AWS Lambda works. If the function is "cold", the first run in a new container, then we have to perform the costly process of retrieving the ML model from S3 and loading it into sci-kit learn. For all subsequent calls while the function is "warm" the model will be in memory. We have to code for these "cold" and "warm" states to provide acceptable behaviour.


The code snippet above shows where we load and use the model. load_model() simply checks whether the clf has been initialized. If not it downloads the model from S3 to a /tmp location in the Lambda container and loads it. At the time of writing Lambda provides 512 MB of tmp space so watch the size of your model. While the function is "warm" subsequent calls to load_model() will return quickly without needing to load the model.

You might be wondering why we didn't just load the model in at the top of script. This would only load it in once, and it would do it naturally on the first "cold" run. The problem with this is to do with how Slack interacts with a SlackBot.


The function above is called from Slack whenever there's an incoming event based on the event types we've registered for (screenshot near the top of this post). It's also called during registration as the "challenge" to check that this is the bot matching the registration. The first if statement checks that the incoming token from Slack matches our verification token. Next, if this is a challenge request, we simply turn around and send the challenge string back.

The third if statement is an interesting one, and a reason why we had to handle the model loading the way we did. In the event api doc one of the failure conditions is "we wait longer than 3 seconds to receive a valid response from your server". This also covers the "challenge" request above, so if we loaded the ML model for 10 seconds we wouldn't even be able to register our bot! The doc goes on to say that failures are retried immediately, then after 1 minute and then 5 minutes. This code could more thoroughly deal with various failure scenarios and handle each accordingly but, for simplicity and because this isn't a mission critical app I've written the code to simply absorb retry attempts by immediately returning with a 200 OK. The reason for this is I was finding that my first "cold" run would take longer than the 3 seconds and so a retry would come in. This might even cause AWS to spawn another Lambda function because my first one was still running. Eventually, depending on timing, I might get two or three identical answers in the Slack channel as the runs eventually completed.

So, finally we get onto the bot actually performing the inference over the incoming question and returning a string suggesting the team to contact. This is actually quite straightforward - it looks for the word "who" in the question and then passes the whole string into the predict function to get a classification. Note that load_model() is only called if "who" is found.

Deployment

If you ran through the Chalice Hello World app earlier you will have got to the point where you ran "chalice deploy" and then it did it's magic and created all the AWS stuff for you and you were up and running. This might work for this SlackBot, but one problem I've run into is the 50MB zip size limit that AWS Lambda imposes. If your deployment zip (all the code and packages) is larger than this limit it'll fail. All is not lost though, for whatever reason if you deploy your zip from S3 rather than as part of your upload you can go beyond this limit.

Chalice provides an alternative to the automatic deployment method allowing you to use AWS Cloudformation instead. This way of deploying has a side-benefit for this limit problem in that it uses S3 to hold the package for deployment. Calling "chalice package", rather than deploy creates you the zip and the SAM template for you to then deploy using the AWS CLI. To automate this for my SlackBot I built a bash script:


Registration

Finally, with the bot deployed we can register it with Slack. Remember that Request URL and the challenge process? Take the URL that either "chalice deploy" or "deploy.sh" printed out and add "/slack/events" to it so it looks something like this:

  • https://##########.execute-api.us-east-1.amazonaws.com/api/slack/events

Paste this into the request URL form field and Slack will immediately attempt to verify your bot with a challenge request.

We're done!

The Serverless SlackBot is up and running and ready to answer your users questions. In the next post I'll explain how the machine learning model was created with a walkthrough of the code to collect the data from Jira, clean it, train a model on it and deliver it to S3.

Code available on Github

2015-06-14

Motion Google Drive Uploader for OAuth 2.0

Three years ago I wrote the original script to upload video to Google Drive as a trigger from Motion. Unfortunately Google recently removed support for the old security model and you now have to use OAuth 2.0. Also the gdata libraries have been superseded by the Google API Python Client. A few people have written alternatives to my original script quicker than I did, but here's my new version.

Note: I'm not using any third party libraries like PyDrive for example, this script goes straight to the Google APIs. (PyDrive has not been touched for a long time by its author and is missing features).

Motion Google Drive Uploader on Github

Installation

Google setup:


  1. Go to https://code.google.com/apis/console - make sure you're logged in as the correct user for the Drive you want to use.
  2. Use the drop-down at the top of the page to create a project called 'Uploader'.
  3. Click on "Enable an API" then "Drive API" and then click the "Enable API" button
  4. Next go to "APIs & auth -> Credentials" menu and click "Create new Client ID".
  5. Set Application type to "Installed application" and click "Configure Consent Screen"
  6. Set the Product Name to "Uploader"
  7. You'll be returned back to the dialog. Make sure "Installed application" and "other" are selected and then click "Create Client ID".
  8. Download the JSON file.
  9. Rename it to client_secrets.json and put it somewhere like /home/pi/.

Script setup:

  1. Go to /home/pi/ and get the uploader.py from github: git clone https://github.com/jerbly/motion-uploader.git
  2. Update/install Google Python API: sudo pip install --upgrade google-api-python-client
  3. Make uploader.py executable: chmod a+x uploader.py

Configuration:

  1. If you used the git clone command you'll find the example uploader.cfg file in the motion-uploader directory. This now has a couple more options which need setting up. 
  2. oauth/folder - set this to the directory where you have put the client_secrets.json file and where the script can write a credentials. e.g. /home/pi/
  3. docs/snapshot-folder - set this to a public folder name in your drive if you wish to use this feature (explained below).

Initial authentication:

  1. From the command line run the script by hand with your config file and a test avi file e.g. ./uploader.py /home/pi/uploader.cfg /home/pi/test.avi
  2. Copy the URL to your clipboard and paste it into a browser where you are already logged in to Google as the correct user.
  3. Accept and copy the authentication code to the clipboard.
  4. Paste it back in to the command line and hit enter.
  5. This will authenticate you and create a credentials file which will be used for future logins.
Now, finally, you can run up Motion and it should work like it used to.

New feature: Public Snapshot

Motion comes with a feature to periodically take a snapshot regardless of whether motion has been detected. This is a nice feature if you want to have a web site with the latest view from your webcam. You can use Google Drive to host this image rather than installing a web server on your Pi and opening firewalls etc.
  1. Create a public folder in your Google Drive: 
    1. Create a new folder called 'public'
    2. Double click on it
    3. Go to the sharing menu (person icon with + sign)
    4. Click "Advanced" and then "Change..."
    5. Set it to "On - Public on the Web"
  2. Configure your uploader.cfg so docs/snapshot-folder is set to this folder 'public'.
  3. Configure motion.conf to take a snapshot every n seconds named lastsnap.jpg and upload it:
    1. snapshot_interval 300
    2. snapshot_filename lastsnap
    3. on_picture_save /home/pi/motion-uploader/uploader.py /home/pi/uploader.cfg %f snap
To find the public URL that you can bookmark and embed in other pages run the command line with the 'snapurl' option: ./uploader.py /home/pi/uploader.cfg /home/pi/lastsnap.jpg snapurl

It will print something like this: https://googledrive.com/host/{your-folder-id-here}/lastsnap.jpg


2014-09-01

Raspberry Pi Pygame UI basics

Stage 1 setup

  1. Follow Adafruit instructions for PiTFT
  2. Get the tutorial code: git clone https://github.com/jerbly/tutorials.git

Test the setup

pi@raspberrypi ~ $ sudo python
Python 2.7.3 (default, Mar 18 2014, 05:13:23)
[GCC 4.6.3] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> import pygame
>>> import os
>>> os.putenv('SDL_FBDEV', '/dev/fb1')
>>> pygame.init()
(6, 0)
>>> lcd = pygame.display.set_mode((320, 240))
>>> lcd.fill((255,0,0))
<rect(0, 0, 320, 240)>
>>> pygame.display.update()
>>> pygame.mouse.set_visible(False)
1
>>> lcd.fill((0,0,0))
<rect(0, 0, 320, 240)>
>>> pygame.display.update()
You can also run this test (with a one second sleep) from the pygamelcd project: sudo python test1.py


From GPIO to screen

So, we can paint colours on the screen - let's do this from GPIs!

We'll use the four tactile buttons along the bottom of the screen to draw the GPIO number and a coloured background. From left to right the buttons correspond to GPIO #23, #22, #27, and #18.

(Note: If you have a revision 1 board then #27 is #21 - you'll just have to change the code a little)








import pygame
import os
from time import sleep
import RPi.GPIO as GPIO

#Note #21 changed to #27 for rev2 Pi
button_map = {23:(255,0,0), 22:(0,255,0), 27:(0,0,255), 18:(0,0,0)}

#Setup the GPIOs as inputs with Pull Ups since the buttons are connected to GND
GPIO.setmode(GPIO.BCM)
for k in button_map.keys():
    GPIO.setup(k, GPIO.IN, pull_up_down=GPIO.PUD_UP)

#Colours
WHITE = (255,255,255)

os.putenv('SDL_FBDEV', '/dev/fb1')
pygame.init()
pygame.mouse.set_visible(False)
lcd = pygame.display.set_mode((320, 240))
lcd.fill((0,0,0))
pygame.display.update()

font_big = pygame.font.Font(None, 100)

while True:
    # Scan the buttons
    for (k,v) in button_map.items():
        if GPIO.input(k) == False:
            lcd.fill(v)
            text_surface = font_big.render('%d'%k, True, WHITE)
            rect = text_surface.get_rect(center=(160,120))
            lcd.blit(text_surface, rect)
            pygame.display.update()
    sleep(0.1)

You can also run this from the pygamelcd project: sudo python test2.py


From screen to GPIO

The PiTFT from Adafruit is a touchscreen. So let's see how we get input from the screen. We'll use this to light some LEDs on the breadboard.

With the PiTFT installed and the 4 tactile buttons there aren't many GPIs left on the model B Raspberry Pi. So wire up #17 and #4. The software renders 4 labels on the screen and then looks for mouse events in the four quarters:









import pygame
from pygame.locals import *
import os
from time import sleep
import RPi.GPIO as GPIO

#Setup the GPIOs as outputs - only 4 and 17 are available
GPIO.setmode(GPIO.BCM)
GPIO.setup(4, GPIO.OUT)
GPIO.setup(17, GPIO.OUT)

#Colours
WHITE = (255,255,255)

os.putenv('SDL_FBDEV', '/dev/fb1')
os.putenv('SDL_MOUSEDRV', 'TSLIB')
os.putenv('SDL_MOUSEDEV', '/dev/input/touchscreen')

pygame.init()
pygame.mouse.set_visible(False)
lcd = pygame.display.set_mode((320, 240))
lcd.fill((0,0,0))
pygame.display.update()

font_big = pygame.font.Font(None, 50)

touch_buttons = {'17 on':(80,60), '4 on':(240,60), '17 off':(80,180), '4 off':(240,180)}

for k,v in touch_buttons.items():
    text_surface = font_big.render('%s'%k, True, WHITE)
    rect = text_surface.get_rect(center=v)
    lcd.blit(text_surface, rect)

pygame.display.update()

while True:
    # Scan touchscreen events
    for event in pygame.event.get():
        if(event.type is MOUSEBUTTONDOWN):
            pos = pygame.mouse.get_pos()
            print pos
        elif(event.type is MOUSEBUTTONUP):
            pos = pygame.mouse.get_pos()
            print pos
            #Find which quarter of the screen we're in
            x,y = pos
            if y < 120:
                if x < 160:
                    GPIO.output(17, False)
                else:
                    GPIO.output(4, False)
            else:
                if x < 160:
                    GPIO.output(17, True)
                else:
                    GPIO.output(4, True)
    sleep(0.1)


Stage 2 setup

We're now going to improve the UI by introducing a widget framework PygameUI

1. Update your version of distribute: sudo easy_install -U distribute
2. Install PygameUI: sudo pip install pygameui

PygameUI GPIOs



This example controls GPIO #17 and #4 as above but now we're using the new framework.

The widget rendering and touchscreen events are handled by PygameUI. The PiTft class defines the buttons to draw on screen and the click event to be fired when a button is pressed.










import pygame
import os
import pygameui as ui
import logging
import RPi.GPIO as GPIO

#Setup the GPIOs as outputs - only 4 and 17 are available
GPIO.setmode(GPIO.BCM)
GPIO.setup(4, GPIO.OUT)
GPIO.setup(17, GPIO.OUT)

log_format = '%(asctime)-6s: %(name)s - %(levelname)s - %(message)s'
console_handler = logging.StreamHandler()
console_handler.setFormatter(logging.Formatter(log_format))
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
logger.addHandler(console_handler)

os.putenv('SDL_FBDEV', '/dev/fb1')
os.putenv('SDL_MOUSEDRV', 'TSLIB')
os.putenv('SDL_MOUSEDEV', '/dev/input/touchscreen')

MARGIN = 20

class PiTft(ui.Scene):
    def __init__(self):
        ui.Scene.__init__(self)

        self.on17_button = ui.Button(ui.Rect(MARGIN, MARGIN, 130, 90), '17 on')
        self.on17_button.on_clicked.connect(self.gpi_button)
        self.add_child(self.on17_button)

        self.on4_button = ui.Button(ui.Rect(170, MARGIN, 130, 90), '4 on')
        self.on4_button.on_clicked.connect(self.gpi_button)
        self.add_child(self.on4_button)

        self.off17_button = ui.Button(ui.Rect(MARGIN, 130, 130, 90), '17 off')
        self.off17_button.on_clicked.connect(self.gpi_button)
        self.add_child(self.off17_button)

        self.off4_button = ui.Button(ui.Rect(170, 130, 130, 90), '4 off')
        self.off4_button.on_clicked.connect(self.gpi_button)
        self.add_child(self.off4_button)

    def gpi_button(self, btn, mbtn):
        logger.info(btn.text)
        
        if btn.text == '17 on':
            GPIO.output(17, False)
        elif btn.text == '4 on':
            GPIO.output(4, False)
        elif btn.text == '17 off':
            GPIO.output(17, True)
        elif btn.text == '4 off':
            GPIO.output(4, True)

ui.init('Raspberry Pi UI', (320, 240))
pygame.mouse.set_visible(False)
ui.scene.push(PiTft())
ui.run()


Analog input



This next example uses a 10K potentiometer to provide a varying voltage. For analog to digital I normally use an MCP3008 over SPI. Unfortunately the downside to the Pi TFT touchscreen is that both SPI channels on the Pi are in use. So I've switched to an I2C ADC from Adafruit: ADS1115 16-Bit ADC - 4 Channel with Programmable Gain Amplifier.
Get the Adafruit Python library: git clone https://github.com/adafruit/Adafruit-Raspberry-Pi-Python-Code.git

If you need to enable i2c follow this guide: Configuring I2C






A few important notes about this code.

  • A thread is used to constantly read the potentiometer. If you take the reading in-line in the scene update method then you'll slow down the screen refresh rate.
  • The PotReader class is given a reference to the PiTft class in order to pass data
  • The ui.Scene class (PiTft) is instantiated after the call to ui.init - if you do this the other way around it will fail.
  • A signal handler is used to trap ctrl+c and terminate the PotReader thread before calling sys.exit - otherwise the program will not close.

import sys
sys.path.append('/home/pi/Adafruit-Raspberry-Pi-Python-Code/Adafruit_ADS1x15')

import pygame
import os
import pygameui as ui
import logging
import RPi.GPIO as GPIO
import signal
from Adafruit_ADS1x15 import ADS1x15
import threading
import time

ADS1015 = 0x00  # 12-bit ADC
ADS1115 = 0x01  # 16-bit ADC

# Select the gain
# gain = 6144  # +/- 6.144V
gain = 4096  # +/- 4.096V
# gain = 2048  # +/- 2.048V
# gain = 1024  # +/- 1.024V
# gain = 512   # +/- 0.512V
# gain = 256   # +/- 0.256V

# Select the sample rate
sps = 8    # 8 samples per second
# sps = 16   # 16 samples per second
# sps = 32   # 32 samples per second
# sps = 64   # 64 samples per second
# sps = 128  # 128 samples per second
# sps = 250  # 250 samples per second
# sps = 475  # 475 samples per second
# sps = 860  # 860 samples per second

# Initialise the ADC using the default mode (use default I2C address)
# Set this to ADS1015 or ADS1115 depending on the ADC you are using!
adc = ADS1x15(ic=ADS1115)

#Setup the GPIOs as outputs - only 4 and 17 are available
GPIO.setmode(GPIO.BCM)
GPIO.setup(4, GPIO.OUT)
GPIO.setup(17, GPIO.OUT)

log_format = '%(asctime)-6s: %(name)s - %(levelname)s - %(message)s'
console_handler = logging.StreamHandler()
console_handler.setFormatter(logging.Formatter(log_format))
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
logger.addHandler(console_handler)

os.putenv('SDL_FBDEV', '/dev/fb1')
os.putenv('SDL_MOUSEDRV', 'TSLIB')
os.putenv('SDL_MOUSEDEV', '/dev/input/touchscreen')

MARGIN = 20

class PotReader():
    def __init__(self, pitft):
        self.pitft = pitft
        self.terminated = False
        
    def terminate(self):
        self.terminated = True
        
    def __call__(self):
        while not self.terminated:
            # Read channel 0 in single-ended mode using the settings above
            volts = adc.readADCSingleEnded(0, gain, sps) / 1000
            self.pitft.set_volts_label(volts)
            self.pitft.set_progress(volts / 3.3)

class PiTft(ui.Scene):
    def __init__(self):
        ui.Scene.__init__(self)

        self.on17_button = ui.Button(ui.Rect(MARGIN, MARGIN, 130, 60), '17 on')
        self.on17_button.on_clicked.connect(self.gpi_button)
        self.add_child(self.on17_button)

        self.on4_button = ui.Button(ui.Rect(170, MARGIN, 130, 60), '4 on')
        self.on4_button.on_clicked.connect(self.gpi_button)
        self.add_child(self.on4_button)

        self.off17_button = ui.Button(ui.Rect(MARGIN, 100, 130, 60), '17 off')
        self.off17_button.on_clicked.connect(self.gpi_button)
        self.add_child(self.off17_button)

        self.off4_button = ui.Button(ui.Rect(170, 100, 130, 60), '4 off')
        self.off4_button.on_clicked.connect(self.gpi_button)
        self.add_child(self.off4_button)

        self.progress_view = ui.ProgressView(ui.Rect(MARGIN, 200, 280, 40))
        self.add_child(self.progress_view)

        self.volts_value = ui.Label(ui.Rect(135, 170, 50, 30), '')
        self.add_child(self.volts_value)

    def gpi_button(self, btn, mbtn):
        logger.info(btn.text)
        
        if btn.text == '17 on':
            GPIO.output(17, False)
        elif btn.text == '4 on':
            GPIO.output(4, False)
        elif btn.text == '17 off':
            GPIO.output(17, True)
        elif btn.text == '4 off':
            GPIO.output(4, True)

    def set_progress(self, percent):
        self.progress_view.progress = percent
        
    def set_volts_label(self, volts):
        self.volts_value.text = '%.2f' % volts

    def update(self, dt):
        ui.Scene.update(self, dt)


ui.init('Raspberry Pi UI', (320, 240))
pygame.mouse.set_visible(False)

pitft = PiTft()

# Start the thread running the callable
potreader = PotReader(pitft)
threading.Thread(target=potreader).start()

def signal_handler(signal, frame):
    print 'You pressed Ctrl+C!'
    potreader.terminate()
    sys.exit(0)
        
signal.signal(signal.SIGINT, signal_handler)

ui.scene.push(pitft)
ui.run()

2014-06-01

Raspberry Pi Tweet Controlled RGB LCD

I've written another article over on Tuts.

The article shows how to use a Raspberry Pi with an RGB LCD to monitor tweets. Tweets containing specific keywords are displayed in defined colours. It explains how to create a Twitter application to use the stream API to push data to a multi-threaded Python program.


A second follow-up article to "How to Build a Tweet Controlled RGB LCD" is planned. This will add a web interface to the program so you can change what you're following and the colour mappings.

2013-09-07

Build a Raspberry Pi Moisture Sensor to Monitor Your Plants

*** NOTE: ControlMyPi shutting down ***

Here's a snippet from a detailed tutorial I've written for Tuts+

 ....You will be able to monitor the sensor locally on the LCD or remotely, via ControlMyPi.com, and receive daily emails if the moisture drops below a specified level.

Along the way I will:

  • wire up and read a value from an analog sensor over SPI using a breadboard
  • format the sensor reading nicely in the console
  • display the sensor reading on an RGB LCD display
  • have the Raspberry Pi send an email with the sensor reading
  • easily monitor the sensor and some historic readings on the web

Read the whole tutorial here: Build a Raspberry Pi Moisture Sensor to Monitor Your Plants


2013-03-21

Raspberry Pi parking camera with distance sensor

This build brings together a few other projects to make something potentially quite useful for a change - a parking camera with distance sensor. The feed from the webcam is shown on the LCD with a distance read-out underneath. As you get closer to the object (my hand in the videos) a circle is overlaid on the video which gets larger as you move closer. Once you get to within 30cm of the object the word "STOP" is overlaid and everything turns red.

Here it is in action:

And a close-up of the screen:


The project is made up of the following:

Camera: Microsoft Lifecam Cinema - I've used this with lots of Raspberry Pi projects - it works nicely and has a good microphone too.

Distance sensorSharp GP2Y0A02YK0F - my article Raspberry Pi distance measuring sensor with LCD output explains how to put this together.

I'm also using an Adafruit Pi Cobbler to breakout the header onto the breadboard.




For the software I'm using Pygame. Thankfully the camera supports a 176x144 resolution and since my screen is 176x220 this fits perfectly. So, after some initializing there is a main loop which simply: blits the image from the camera, reads from the distance sensor, draws the circle and text. Finally update() is called to send this to the framebuffer.

import pygame
import pygame.camera
import os
import mcp3008

BLACK = 0,0,0
GREEN = 0,255,0
RED = 255,0,0

if not os.getenv('SDL_FBDEV'):
    os.putenv('SDL_FBDEV', '/dev/fb1')

if not os.getenv('SDL_VIDEODRIVER'):
    os.putenv('SDL_VIDEODRIVER', 'fbcon')

pygame.init()
lcd = pygame.display.set_mode((176, 220))
pygame.mouse.set_visible(False)
lcd.fill(BLACK)
pygame.display.update()

pygame.camera.init()
 
size = (176,144)
cam = pygame.camera.Camera('/dev/video0', size, 'RGB')

cam.start()

font_big = pygame.font.Font(None, 50)
surf = pygame.Surface(size)
while True:
    lcd.fill(BLACK)
    cam.get_image(surf)
    lcd.blit(surf, (0,0))

    cm = mcp3008.read_2Y0A02_sensor(7)
    colour = GREEN
    if cm < 30:
        colour = RED
        text_surface = font_big.render('STOP', True, colour)
        rect = text_surface.get_rect(center=(88,72))
        lcd.blit(text_surface, rect)

    if cm < 140:
        pygame.draw.circle(lcd, colour, (88,72), (150-cm)/2, 3)
    
    text_surface = font_big.render('%dcm'%cm, True, colour)
    rect = text_surface.get_rect(center=(88,180))
    lcd.blit(text_surface, rect)

    pygame.display.update()


Finally here's the mcp3008 module which is imported above. NOTE: Since the LCD is using SPI 0.0 I have used SPI 0.1 for the mcp3008. You'll see this in the code below:

import spidev

spi = spidev.SpiDev()
spi.open(0,1)

# read SPI data from MCP3008 chip, 8 possible adc's (0 thru 7)
def readadc(adcnum):
    if ((adcnum > 7) or (adcnum < 0)):
        return -1
    r = spi.xfer2([1,(8+adcnum)<<4,0])
    adcout = ((r[1]&3) << 8) + r[2]
    return adcout

def read_3v3(adcnum):
    r = readadc(adcnum)
    v = (r/1023.0)*3.3
    return v

def read_2Y0A02_sensor(adcnum):
    r = []
    for i in range (0,10):
        r.append(readadc(adcnum))
    a = sum(r)/10.0
    v = (a/1023.0)*3.3
    d = 16.2537 * v**4 - 129.893 * v**3 + 382.268 * v**2 - 512.611 * v + 306.439
    cm = int(round(d))
    return cm


2013-03-11

Raspberry Pi system monitor embedded on your own site


*** NOTE: ControlMyPi shutting down ***

Above is an embedded ControlMyPi panel showing some system stats from my Raspberry Pi.

If you want to run one of these yourself set up your Raspberry Pi for ControlMyPi by following the instructions on the site and then run the script below (after changing it to use your account and password).

To embed it on your site use an iframe using the instructions on the ControlMyPi FAQ.

If you want this to run automatically every time you boot up just add a line to /etc/rc.local e.g. python /path/to/script/pimonitor.py &

'''
Created on 10 Mar 2013

ControlMyPi Raspberry Pi system monitor. See www.controlmypi.com.

@author: Jeremy Blythe
'''

import subprocess
import logging
import time
from controlmypi import ControlMyPi
        
def get_ip_address(interface):
    "Returns the IP address for the given interface e.g. eth0"
    try:
        s = subprocess.check_output(["ip","addr","show",interface])
        return s.split('\n')[2].strip().split(' ')[1].split('/')[0]
    except:
        return '?.?.?.?'

def get_ram():
    "Returns a tuple (total ram, available ram) in megabytes. See www.linuxatemyram.com"
    try:
        s = subprocess.check_output(["free","-m"])
        lines = s.split('\n')        
        return ( int(lines[1].split()[1]), int(lines[2].split()[3]) )
    except:
        return 0

def get_process_count():
    "Returns the number of processes"
    try:
        s = subprocess.check_output(["ps","-e"])
        return len(s.split('\n'))        
    except:
        return 0

def get_up_stats():
    "Returns a tuple (uptime, 5 min load average)"
    try:
        s = subprocess.check_output(["uptime"])
        load_split = s.split('load average: ')
        load_five = float(load_split[1].split(',')[1])
        up = load_split[0]
        up_pos = up.rfind(',',0,len(up)-4)
        up = up[:up_pos].split('up ')[1]
        return ( up , load_five )        
    except:
        return ( '' , 0 )

def get_connections():
    "Returns the number of network connections"
    try:
        s = subprocess.check_output(["netstat","-tun"])
        return len([x for x in s.split() if x == 'ESTABLISHED'])
    except:
        return 0
    
def get_temperature():
    "Returns the temperature in degrees C"
    try:
        s = subprocess.check_output(["/opt/vc/bin/vcgencmd","measure_temp"])
        return float(s.split('=')[1][:-3])
    except:
        return 0

def on_msg(conn, key, value):
    pass

if __name__ == '__main__':
    logging.basicConfig(level=logging.ERROR)

    total_ram = get_ram()[0]
    
    p = [ 
        [ ['O'] ],
        [ ['L','Up time'],['S','up',''] ],
        [ ['L','Processes'],['S','pcount',''] ],
        [ ['L','Connections'],['S','ncount',''] ],
        [ ['C'] ],
        [ ['O'] ],
        [ ['G','ram','free Mb',0,0,total_ram],['G','load','load',0,0,4],['G','temp',u'\xB0C',0,0,80] ], 
        [ ['C'] ]
        ]

    conn = ControlMyPi('you@yours.com', 'password', 'pimonitor', 'Pi system monitor', p, on_msg)
    if conn.start_control():
        try:
            status = {'ram':0, 'temp':0, 'load':0, 'pcount':0, 'ncount':0, 'up':''}
            while True:
                to_send = {}
                
                to_send['ram'] = get_ram()[1]
                to_send['temp'] = get_temperature()
                up, load = get_up_stats()
                to_send['load'] = load
                to_send['pcount'] = get_process_count()
                to_send['ncount'] = get_connections()
                to_send['up'] = up
                
                for k,v in to_send.items():
                    if status[k] == v:
                        del to_send[k]
                
                if len(to_send) > 0:
                    status.update(to_send)
                    conn.update_status(to_send)
                    
                time.sleep(30)
        finally:
            conn.stop_control()

2013-03-09

Raspberry Pi midi driven solenoid bell

This is completely pointless but a bit of fun I had to share. I've been thinking about hooking my Roland TD9 v-drum kit up to a Raspberry Pi for a while for another project so I bought a really cheap Midi to USB gadget: USB Midi Cable Lead Adaptor

To my surprise this worked out-the-box, nothing to install. I made sure my Raspbian OS was up to date before I started but that was it. I have never done anything with Midi before but I knew it was a simple serial protocol and so assumed I'd be able to open some kind of tty device. In fact the ALSA driver on the OS detects it correctly as a USB-Midi input/ouput device. You can see this by running the amidi command:
pi@raspberrypi ~ $ amidi -l
Dir Device    Name
IO  hw:1,0,0  USB2.0-MIDI MIDI 1
 O  hw:1,0,1  USB2.0-MIDI MIDI 2

There are probably loads of proper Midi tools you can use since it has been discovered correctly by I just wanted to look at the raw bytes coming in. The device node that's been created in the file system can be found in /dev/snd:
pi@raspberrypi ~ $ ls /dev/snd
by-id  by-path  controlC0  controlC1  midiC1D0  pcmC0D0p  seq  timer

So all my program has to do is read bytes from /dev/snd/midiC1D0. To get this to work I didn't need to understand or decode much of the protocol - I'm basically just looking for a sequence when an "instrument" is hit. In this case it is a sequence (in hex) of 99 XX where XX is the instrument code, 26 for snare drum, 24 for kick drum etc. There's a lot more going on but I can ignore the rest apart from a useful continual pulse of FE. This is known as "Active sense" and you get this once every 300ms. I'm using this to switch the GPIO off again as you'll see in the code later. In order to ring the bell you need a quick on/off motion. You can read more about the solenoid bell in my previous post: Raspberry Pi solenoid alarm bell.

Anyway, here's a rather shakey video (sorry) and the code. Enjoy!


import RPi.GPIO as GPIO

inst = {
        '\x26':'snare',
        '\x28':'snare rim',
        '\x30':'tom1',
        '\x32':'tom1 rim',
        '\x2d':'tom2',
        '\x2f':'tom2 rim',
        '\x2b':'tom3',
        '\x3a':'tom3 rim',
        '\x24':'kick',
        '\x1a':'hi-hat rim',
        '\x2e':'hi-hat head',
        '\x2c':'hi-hat close',
        '\x31':'crash head',
        '\x37':'crash rim',
        '\x33':'ride head',
        '\x3b':'ride rim',
        '\x35':'ride bell',
}

f=open('/dev/snd/midiC1D0')

note=False

GPIO_NUM = 18
GPIO.setmode(GPIO.BCM)
GPIO.setup(GPIO_NUM, GPIO.OUT)

while True:
        b = f.read(1)
        if b == '\xfe':
                GPIO.output(GPIO_NUM, False)
        else:
#               if b == '\x40':
#                       print hex(ord(b))
#               else:
#                       print hex(ord(b)),
                if b == '\x99':
                        note=True
                elif note:
                        if b in inst:
                                print inst[b]
                                if b == '\x26':
                                        GPIO.output(GPIO_NUM, True)
                        else:
                                print hex(ord(b))
                        note=False

2013-02-24

Live Web Bicycle Dashboard - the code

*** NOTE: ControlMyPi shutting down ***

This post is a walkthrough of the code running on the Raspberry Pi as seen in the previous post: Live Web Bicycle Dashboard using ControlMyPi. This file, and the required mcp3008.py, are available in the examples from ControlMyPi. See "How to connect your pi".

Firstly at the top of the file are a few constants to use with ControlMyPi. The PANEL_FORM defines what ControlMyPi will render on the web site. Each update-able widget has a name so we can push changes up to ControlMyPi as new data is read from the attached devices. For example the 'P','streetview' widget defines a Picture widget. Whenever we want to display a new streetview image we can push the URL up to ControlMyPi and it in turn will push this change out to any browser currently viewing the page.

The last line of the panel defines two buttons and a status text widget. These are used to start and stop recording the telemetry to a file. More about this at the end of this post.

For information about all the available widgets in ControlMyPi go to here: ControlMyPi docs

The last few constants in this section define the URLs used for Google Maps Image APIs. %s substitutions are defined in these strings for us to apply longitude, latitude and heading later on. Also an API_KEY constant is defined here. You can comment this out initially to test but you will need to get a key eventually as the quota for image fetches is quite low without a key. With a key you get 25000 per day for free.

'''
Created on 6 Nov 2012

Bicycle telemetry recorder with Live web dashboard through ControlMyPi.com

See: http://jeremyblythe.blogspot.com
     http://www.controlmypi.com
     Follow me on Twitter for updates: @jerbly
     
@author: Jeremy Blythe
'''

import serial
import subprocess
import mcp3008
import time
from controlmypi import ControlMyPi

JABBER_ID = 'you@your.jabber.host'
JABBER_PASSWORD = 'yourpassword'
SHORT_ID = 'bicycle'
FRIENDLY_NAME = 'Bicycle telemetry system'
PANEL_FORM = [
             [ ['S','locked',''] ],
             [ ['O'] ],
             [ ['P','streetview',''],['P','map',''] ],
             [ ['C'] ],
             [ ['O'] ],
             [ ['L','Speed'],['G','speed','mph',0,0,50], ['L','Height'],['S','height',''] ],
             [ ['C'] ],
             [ ['L','Accelerations'] ],
             [ ['G','accx','X',0,-3,3], ['G','accy','Y',0,-3,3], ['G','accz','Z',1,-3,3] ],
             [ ['L','Trace file'],['B','start_button','Start'],['B','stop_button','Stop'],['S','recording_state','-'] ]
             ]

API_KEY = '&key=YOUR_API_KEY'
STREET_VIEW_URL = 'http://maps.googleapis.com/maps/api/streetview?size=360x300&location=%s,%s&fov=60&heading=%s&pitch=0&sensor=true'+API_KEY
MAP_URL = 'http://maps.googleapis.com/maps/api/staticmap?center=%s,%s&zoom=15&size=360x300&sensor=true&markers=%s,%s'+API_KEY

The GPS class.

The constructor goes through a process of setting the GPS unit into 38400 baud and 10Hz update mode. During testing I noticed that if you don't increase the baud rate to 38400 (up from 9600) then the unit won't go into 10Hz mode. Presumably this is because there's too much data to get out per second at 9600 baud so it's incompatible. Finally I set the unit to only produce RMC and GGA messages - everything but the height is available in the RMC message.

Every time the read method is called on this class a single line is read from the GPS unit. If an RMC or GGA message is found then the info is decoded and the member variables are updated. As you'll see later the design of this whole application hinges around the GPS. The program basically runs as fast as the GPS produces output, since the unit is in 10Hz mode we collect all the data for an update and then there's a short delay until the next set of data. All this happens 10 times per second. During the "data gaps" the serial port 0.01 second time out comes in to play to stop the main loop from freezing allowing us to do things like read key presses from the TextStar display.

class GPS:
    def __init__(self):
        self.height = '0'
        self.time_stamp = ''
        self.active = False
        self.lat = None
        self.lat_dir = None
        self.lon = None
        self.lon_dir = None
        self.speed = None
        self.heading = None
        self.date = ''
        # Connect to GPS at default 9600 baud
        self.ser = serial.Serial('/dev/ttyAMA0',9600,timeout=0.01)
        # Switch GPS to faster baud
        self.send_and_get_ack('251',',38400')
        # Assume success - close and re-open serial port at new speed
        self.ser.close()
        self.ser = serial.Serial('/dev/ttyAMA0',38400,timeout=0.01)
        # Set GPS into RMC and GGA only mode
        self.send_and_get_ack('314',',0,1,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0')
        # Set GPS into 10Hz mode
        self.send_and_get_ack('220',',100')

    def checksum(self,cmd):
        calc_cksum = 0
        for s in cmd:
            calc_cksum ^= ord(s)
        return '$'+cmd+'*'+hex(calc_cksum)[2:]

    def send_and_get_ack(self,cmdno,cmdstr):
        '''Send the cmd and wait for the ack'''
        #$PMTK001,604,3*32
        #PMTK001,Cmd,Flag 
        #Cmd: The command / packet type the acknowledge responds. 
        #Flag: .0. = Invalid command / packet. 
        #.1. = Unsupported command / packet type 
        #.2. = Valid command / packet, but action failed 
        #.3. = Valid command / packet, and action succeeded 
        cmd = 'PMTK%s%s' % (cmdno,cmdstr)
        msg = self.checksum(cmd)+chr(13)+chr(10)
        #print '>>>%s' % cmd
        self.ser.write(msg)
        ack = False
        timeout = 300
        while (not ack) and timeout > 0:
            line = str(self.ser.readline())
            if line.startswith('$PMTK001'):
                tokens = line.split(',')
                ack = tokens[2][0] == '3'
                #print '<<<%s success=%s' % (line,ack)
            timeout -= 1
        return ack

    def read(self):
        '''Read the GPS'''
        line = str(self.ser.readline())
        #print line
    
        if line.startswith('$GPGGA'):
            # $GPGGA,210612.300,5128.5791,N,00058.5165,W,1,8,1.18,41.9,M,47.3,M,,*79
            # 9 = Height in metres
            tokens = line.split(',')
            if len(tokens) < 15:
                return
            try:
                self.height = tokens[9]
            except ValueError as e:
                print e    
        elif line.startswith('$GPRMC'):
            # $GPRMC,105215.000,A,5128.5775,N,00058.5070,W,0.12,103.43,211012,,,A*78
            # 1 = Time
            # 2 = (A)ctive or (V)oid
            # 3 = Latitude
            # 5 = Longitude
            # 7 = Speed in knots
            # 8 = Compass heading
            # 9 = Date
            #Divide minutes by 60 and add to degrees. West and South = negative
            #Multiply knots my 1.15078 to get mph.
            tokens = line.split(',')
            if len(tokens) < 10:
                return
            try:
                self.time_stamp = tokens[1]
                self.active = tokens[2] == 'A'
                self.lat = tokens[3]
                self.lat_dir = tokens[4]
                self.lon = tokens[5]
                self.lon_dir = tokens[6]
                self.speed = tokens[7]
                self.heading = tokens[8]
                self.date = tokens[9]
                if self.active:
                    self.lat = float(self.lat[:2]) + float(self.lat[2:])/60.0
                    if self.lat_dir == 'S':
                        self.lat = -self.lat
                    self.lon = float(self.lon[:3]) + float(self.lon[3:])/60.0
                    if self.lon_dir == 'W':
                        self.lon = -self.lon
                    self.speed = float(self.speed) * 1.15078
            except ValueError as e:
                print e

The TextStar class.

The TextStar serial LCD is a great little handy device not just for the 16x2 display but also the 4 buttons around the edge of the display for input. Normally I connect this straight into the serial pins on the Raspberry Pi, but in this case I have the GPS connected there. So, I'm using a USB to TTL serial converter. If you have a standard USB to RS232 converter you can use that too, just be sure to set the TextStar into RS232 mode.

The constructor opens the serial port through the USB and throws the first few inputs away. Something I noticed during testing is that when the TextStar starts up it spews out a few characters which could spoil the key reading code later. So, the start up routine reads up to 16 characters after a 3 second delay from opening the port. Also, I set up the "on_rec_button" event here. This is the method to call if the record toggle button is pressed.

As well as updating the display with the GPS and Accelerometer info this class displays the currently assigned wired ethernet address and ppp address. This is really useful as it allows you to plug in to a network and easily find the address you've been assigned so you can then ssh in. Secondly it's a confidence check that the 3g is working as you'll see the ppp address.

The key reading routine uses the 'a' button to rotate through the pages of info and the 'c' button to call the 'on_rec_button' event which is used to toggle recording.
        
class TextStar:
    def __init__(self, on_rec_button):
        self.LCD_UPDATE_DELAY = 5
        self.lcd_update = 0
        self.page = 0
        self.ser = serial.Serial('/dev/ttyUSB0',115200,timeout=0.01)
        # Throw away first few key presses after waiting for the screen to start up
        time.sleep(3)
        self.ser.read(16)
        self.on_rec_button = on_rec_button
    
    def get_addr(self,interface):
        try:
            s = subprocess.check_output(["ip","addr","show",interface])
            return s.split('\n')[2].strip().split(' ')[1].split('/')[0]
        except:
            return '?.?.?.?'
    
    def write_ip_addresses(self):
        self.ser.write(chr(254)+'P'+chr(1)+chr(1))
        self.ser.write('e'+self.get_addr('eth0').rjust(15)+'p'+self.get_addr('ppp0').rjust(15))

    def update(self,gps,acc,rec):
        self.lcd_update += 1
        if self.lcd_update > self.LCD_UPDATE_DELAY:
            self.lcd_update = 0
            self.ser.write(chr(254)+'P'+chr(1)+chr(1))
            
            if not gps.active and (self.page == 0 or self.page == 1):
                self.ser.write('NO FIX: '+gps.date+' '+rec.recording)
                self.ser.write(gps.time_stamp.ljust(16))
            elif self.page == 0:
                self.ser.write(('%.8f' % gps.lat).rjust(14)+" "+rec.recording)
                self.ser.write(('%.8f' % gps.lon).rjust(14)+"  ")
            elif self.page == 1:
                #0.069 223.03 48.9 -0.010 0.010 0.980
                self.ser.write('{: .3f}{:>9} '.format(gps.speed,gps.height))
                if acc:
                    self.ser.write('{: .2f}{: .2f}{: .2f}'.format(*acc))
                else:
                    self.ser.write(' '*16)

    def read_key(self):
        key = str(self.ser.read(1))
        if key != '' and key in 'abcd':
            self.lcd_update = self.LCD_UPDATE_DELAY
            if key == 'c':
                self.on_rec_button()
            elif key == 'a':
                self.page += 1
                if self.page > 2:
                    self.page = 0
                elif self.page == 2:
                    self.write_ip_addresses()

The Recorder class

The current status is logged to file every time there's a new reading from the GPS. So that's 10 times per second. If the Accelerometer is not used then dashes replace the X,Y and Z readings. Likewise if there is no GPS lock only the date and time is logged and dashes replace the rest of the data.

The recorder has a reference to the ControlMyPi connection and uses this to push an update showing the recording state. This is either "Recording" or "Stopped" and when it's stopped the generated file name is shown.
        
class Recorder:
    def __init__(self,gps,cmp):
        self.gps = gps
        self.cmp = cmp
        self.recording = 's'
        self.rec_file = None

    def start(self):
        if self.recording == 's':
            self.recording = 'r'
            self.rec_file = open("/home/pi/gps-"+self.gps.date+self.gps.time_stamp+".log", "a")
            self.cmp.update_status( {'recording_state':'Recording'} )
    
    def stop(self):
        if self.recording == 'r':
            self.recording = 's'
            self.rec_file.close()
            self.cmp.update_status( {'recording_state':'Stopped - [%s]' % self.rec_file.name} )

    def update(self,acc):
        if self.recording == 'r':
            if acc:
                acc_str = '%.2f %.2f %.2f' % acc
            else:
                acc_str = '- - -'
                
            if self.gps.active:
                self.rec_file.write('%s %s %.8f %.8f %.3f %s %s %s\n' % (self.gps.date, self.gps.time_stamp, self.gps.lat, self.gps.lon, self.gps.speed, self.gps.heading, self.gps.height, acc_str))
            else:
                self.rec_file.write('%s %s - - - - - %s\n' % (self.gps.date, self.gps.time_stamp, acc_str))

The Accelerometer class

The MCP3008 is used to read the three voltages from the 3 axis accelerometer, convert them to digital readings and retrieve them over SPI. Details of this technique are written up here: Raspberry Pi hardware SPI analog inputs using the MCP3008. As the comment in the code states there's a little tuning to be done to get good readings.
class Accelerometer:
    def read_accelerometer(self):
        '''Read the 3 axis accelerometer using the MCP3008. 
           Each axis is tuned to show +1g when oriented towards the ground, this will be different
           for everyone and dependent on physical factors - mostly how flat it's mounted.
           The result is rounded to 2 decimal places as there is too much noise to be more
           accurate than this.
           Returns a tuple (X,Y,Z).'''  
        x = mcp3008.readadc(0)
        y = mcp3008.readadc(1)
        z = mcp3008.readadc(2)
        return ( round((x-504)/102.0,2) , round((y-507)/105.0,2) , round((z-515)/102.0,2) )

Construction and Events

In this section the objects are created and a couple of call-back events are assigned. If you don't have an Accelerometer set acc to None. Also, if you don't have a TextStar LCD set lcd to None. Notably the two call-backs are to handle incoming button events from either ControlMyPi or the TextStar keys.

# Start the GPS
gps = GPS()

# Create the Accelerometer object. Change to acc=None if you don't have an accelerometer.
acc = Accelerometer()

# Control My Pi
def on_control_message(conn, key, value):
    if key == 'start_button':    
        rec.start()
    elif key == 'stop_button':
        rec.stop()

conn = ControlMyPi(JABBER_ID, JABBER_PASSWORD, SHORT_ID, FRIENDLY_NAME, PANEL_FORM, on_control_message)

# Recording
rec = Recorder(gps, conn)

def on_rec_button():
    if rec.recording == 's':
        rec.start()
    else:
        rec.stop()    

# Start the TextStar LCD. Change to lcd=None if you don't have a TextStar LCD.
lcd = TextStar(on_rec_button)

The main loop

Finally, after connecting to ControlMyPi, the main loop starts. Here we call the readers and the updaters. We read from the GPS, Accelerometer and TextStar keypad every loop. We keep track of the time stamp we're on and when we move to the next 10th of a second we call the updaters.

The updaters are the LCD, ControlMyPi and the Recorder. The Recorder writes to the file every change of time stamp provided it's in record mode. The LCD and ControlMyPi write less frequently, a simple counter is used to action every n'th update.

You'll notice there is no explicit yield in the main loop. It's quite normal to put a time.sleep(n) into the main loop to stop tight-looping. In this case we're using the blocking serial port reads with their timeouts to yield.

ControlMyPi only needs to be updated with new information, if the GPS is not locked (active) then we don't bother to send the old information again. Instead we send a "NOT LOCKED" message. The status dict is simply filled with the widget names that we want to update and the new values. This dict is then sent to ControlMyPi with the call to update_status.
if conn.start_control():
    try:
        conn.update_status( {'recording_state':'Stopped'} )
        # Start main loop
        old_time_stamp = 'old'
        CMP_UPDATE_DELAY = 50
        cmp_update = 0
        
        while True:
            #Read the 3 axis accelerometer
            if acc:
                xyz = acc.read_accelerometer()
            else:
                xyz = None
                
            #Read GPS
            gps.read()
                    
            #Update ControlMyPi, LCD and Recorder if we have a new reading 
            if gps.time_stamp != old_time_stamp:
                if lcd:
                    lcd.update(gps, xyz, rec)      

                # Don't update ControlMyPi every tick, it'll be too much - approx. 5 seconds is
                # about right as it gives the browser time to fetch the streetview and map 
                cmp_update += 1
                if cmp_update > CMP_UPDATE_DELAY:
                    cmp_update = 0
                    status = {}
                    if xyz:
                        status['accx'] = xyz[0]
                        status['accy'] = xyz[1]
                        status['accz'] = xyz[2]
                    
                    if gps.active:
                        status['locked'] = 'GPS locked'
                        slat = str(gps.lat)
                        slon = str(gps.lon)
                        status['streetview'] = STREET_VIEW_URL % (slat,slon,gps.heading)
                        status['map'] = MAP_URL % (slat,slon,slat,slon)
                        status['speed'] = int(round(gps.speed))
                        status['height'] = '{:>9}'.format(gps.height)
                        conn.update_status(status)
                    else:
                        status['locked'] = 'GPS NOT LOCKED'
                        conn.update_status(status)
              
                #Update recorder every tick
                rec.update(xyz)
        
                old_time_stamp = gps.time_stamp
        
            #Read keypad
            if lcd:        
                lcd.read_key()
    finally:
        conn.stop_control()
else:
    print("FAILED TO CONNECT")


That's it! If you're brave enough to try this yourself there are a few points where I've left some commented-out print statements for debug. This and some simpler projects are available in a zip on ControlMyPi here: How to connect your pi.

Have fun.