- Introduction
- Where is the code?
- Demo Video
- Prerequisites
- How do I run all this stuff?
- Known issues
- Some basics
- The app
- Routing
- Some common React / Bootstrap UI components
- Registration workflow
- Passenger registration
- Driver registration
- Login React component
- Json Support
- Login route
- Login controller
- CreatJob React component
- JSON Payload
- Push the consumed Job out of the forever frame (Comet functionality in Play backend)
- Have a new RxJs based Observable over the comet based forever frame, and ensure that is working
- Final diagram to help you solidify this section
- ViewJob react component
- So what should the View Job page do?
- What does it look like when run?
- Walking through Kafka Streams interactive queries
- REST Endpoint Facade
- Controller Action
- React front end for ratings
- Login workflow
- CreateJob workflow
- ViewJob workflow
- ViewRating workflow
- Conclusion
Introduction
This article is the culmination of stuff I have been doing on the train on my way to work over 6 months. I have written quite a lot of blog posts on this already which you can read about on the projects home page on my blog : https://sachabarbs.wordpress.com/2017/05/01/madcap-idea/
There are 13 blog posts there, but I thought it would be good to also have this overall article which covers all of it, as the blog posts are more a sequence of events that I went through which talk about the pieces in great detail as I went through them
For example these were the blog posts
- MADCAP IDEA PART 1 : START OF THE CLIENT SIDE PORTION OF THE WEB SITE
- MADCAP IDEA PART 2 : ADDING DI/IOC TO THE CLIENT SIDE FRONT END WEB SITE
- MADCAP IDEA PART 3 : BRINGING PLAY BACK END INTO THE FOLD + SOME BASIC STREAMING
- MADCAP IDEA PART 4 : PROTOTYPING THE SCREENS
- MADCAP IDEA PART 5 : ADDING REACT-ROUTER
- MADCAP IDEA PART 6 : STATIC SCREEN DESIGN
- MADCAP IDEA PART 7 : REGISTRATION/LOGIN BACKEND
- MADCAP IDEA PART 8 : INTERMEDIATE STEP, REST API FOR INTERACTIVE KAFKA STREAM KTABLE QUERIES
- MADCAP IDEA PART 9 : KAFKA STREAMS INTERACTIVE QUERIES
- MADCAP IDEA PART 10 : PLAY FRAMEWORK REACTIVE KAFKA PRODUCER
- MADCAP IDEA PART 11 : FINISHING THE VIEW RATING PAGE
- MADCAP IDEA PART 12 : GETTING THE CREATE JOB TO WORK ENDEND
- MADCAP IDEA PART 13 : GETTING THE VIEW JOB TO WORK END-END
Special Thanks
I would like to call out a special thanks to a very special man, Peter O'Hanlon code project legend and all round awesome dude. Who I asked to review this behemoth article, Pete immediately said yes, and found time to make that happen, depsite the fact he is a family man runs his own business and has his own cool ideas going on.
Thanks Pete, I salute you
So just exactly what is it that I was/am talking about?
In essence I want to write a very (pardon the pun) but uber simple uber type app. Where there are the following functional requirements
- There should be a web interface that a client can use. Clients may be a driver or a pickup client requiring a delivery
- There should be a web interface that a pickup client can use that shows a pickup client location on a map, which the pickup client chooses.
- The pickup client may request a pickup job, in which case drivers that are in the area bid for a job.
- The pickup client location should be visible to a driver on a map
- A driver may bid for a pickup client job, and the bidding driver(s) location should be visible to the pickup client.
- The acceptance of the bidding driver is down to the pickup client
- Once a pickup client accepts a driver, ONLY the assigned driver(s) current map position will be shown to the pickup client
- When a pickup client is happy that they have been picked up by a driver, the pickup client may rate the driver from 1-10, and the driver may also rate the pickup client from 1-10.
- The rating should only be available once a pickup client has marked a job as completed
- A driver or a pickup client should ALWAYS be able to view their previous ratings.
Whilst this may sound childs play to a lot of you (me included if I stuck to using simply CRUD operations), I just want to point out that this app is meant as a learning experience so I will not be using a simple SignalR Hub, and a couple of database tables.
I have written this project using a completely different set of technologies from the norm. Some of the technology choices could easily scale to hundreds of thousands of requests per second (Kafka has your back here)
TECHNOLOGIES INVOLVED
- WebPack
- React
- React Router
- PowerShell
- TypeScript
- Babel.js
- Akka
- Scala
- Play (Scala Http Stack)
- MongoDB
- SBT
- Kafka
- Kafka Streams
They say a picture says a 1000 words, so here is a nice picture to get things started
Now before we carry on, let me just acknowledge that www.codeproject.com is mainly biased towards Microsoft tech and this code is mainly Scala/TypeScript. However I think there is still plenty to learn along the way, so don't let the fact it's not .NET/C/C++ put you off
Where is the code?
The code for this lot is here : https://github.com/sachabarber/MadCapIdea
Demo Video
CLICK THE IMAGE TO LAUNCH THE VIDEO
Prerequisites
As shown in the introduction section, there are many moving peices to this demo so, naturally, there are quite a few dependencies. I did try to get it to work in Docker for you, however I found that I still needed to create external scripts to orchestrate it all anyway. In the end I just went with downloading stuff as I will specify, and then giving a single PowerShell script to run most stuff, apart from 2 Scala projects.
This is the list of stuff you will need in order to run this code:
This has all been developed on Windows, so these instructions are all about how to get stuff working on Windows
- MongoDB : https://www.mongodb.com/dr/fastdl.mongodb.org/win32/mongodb-win32-x86_64-2008plus-ssl-3.4.6-signed.msi/download
- Confluence Platform 3.3.0 Open Source : http://packages.confluent.io/archive/3.3/confluent-oss-3.3.0-2.11.zip
- SBT
- Java 8 SDK
- Webpack
- Node.Js
- NPM
- IntelliJ IDEA v17.0 community
- PowerShell
Once you have downloaded all of this you will need to do a few things in order to run it nicely on Windows.
- Download the dependencies above (Keep a note of where you downloaded them as you will need them here and later)
- Replace the official YOUR extract location\confluent-3.3.0\bin\windows BAT files with the ones found here :https://github.com/renukaradhya/confluentplatform/tree/master/bin/windows
- Modify the YOUR extract location\confluent-3.3.0\etc\kafka\zookeeper.properties file to change the dataDir
to something like dataDir=c:/temp/zookeeper
- Modify the YOUR extract location\confluent-3.3.0\etc\kafka\server.properties file to uncomment the line delete.topic.enable=true
- Modify the YOUR extract location\confluent-3.3.0\etc\kafka\server.properties file file to change the log.dirs
to log.dirs=c:/temp/kafka-logs
You will need to remember some of these paths for the next section too, so just be mindful that you may have to edit the PowerShell script later with some new paths
How do I run all this stuff?
There are quite a few moving peices to this app, and they all need to be running in order for it to all work together.
1. Update node.js dependencies
Make sure you have Node.Js installed, and make sure NPM is installed too, also ensure that webpack is globally installed
Open command line and change to the MadCapIdea\PlayBackEndApi\FrontEndWebSite\
folder and run npm install
- now run
webpack
from same folder
2. Kafka/Zookeeper etc etc
You can run the following powershell script to get all the pre-requistites up and running (assuming you have downloaded them all)
- Open PowerShell command line and change to the
PowerShellProject\PowerShellProject\
folder and run.\RunPipeline.ps1
. Make sure you modify the top section paths to match your own installation paths
3. Play application
- Open the SBT/Scala project inside IntelliJ IDEA (you will need the SBT plugin, and Java8 installed on your machine).
- Open this folder
MadCapIdea\PlayBackEndApi
and run it.You may need to create a run time configuration
4. Kafka Streams application
- Open the SBT/Scala project inside IntelliJ IDEA (you will need the SBT plugin, and Java8 installed on your machine).
- Open this folder
MadCapIdea\KafkaStreams
and run it.You may need to create a run time configuration where you point to this main classRatingStreamProcessingApp
5. React
- Open a browser navigate to
http://localhost:9000
, and register some users both passenger/driver
I normally follow this set of steps afterwards
- open a tab, login as a passenger that I had created
- go to the "create job" page, click the map, push the "create job" button
- open a NEW tab, login as a new driver, go to the "view job" page
- on the 1st tab (passenger) click the map to push passenger position to driver
- on the 2nd tab (driver) click the map to push driver position to passenger
- repeat last 4 steps for additonal driver
- on client tab pick driver to accept, click accept button
- complete the job from client tab, give driver rating
- complete the job from paired driver tab, give passenger rating
- go to "view rating" page, should see ratings
Known issues
The following are known issues
- Once a driver and passenger become paired, the position updates from either are no longer reflected. I am sure this would boil down to a single JavaScript method that needs updating in the
ViewJob.tsx
file. - On the ViewJob page I originally wanted the passenger to be able to accept a driver by clicking a button in the drivers Overlay GoogleMap component. However no matter what I tried this causes a MapClick event to still happen, which changed the passengers position. So I had to resort to using a weird drop down select way for a passenger to accept a driver. This sucked, but meh
So those are the issues, I just kind of got to the end of a very long road (I have been writing about this on an off for 6 months of train rides, as well as not caring at all, and just staring into space) and was just happy that I got 99% of the stuff done that I set out to do. I just thought you know what, the app as it is now, demonstrates everything I set out to do, so I'm ok with 1-2 known issues.
Some basics
Before we dive into the actual code for the app (and there is quite a bit of it), I just thought it may be good to go over some of the individual building blocks that make up the app as a whole first, the next few sections will do that. It is a long list of stuff, and the demo app makes use of all of this plus a few more bits, that I deemed to be not important enough to warrant their own sections. We will learn more about each of these when we walk through the actual demo app code, this is more of an overiew of the various parts before we look at the specific usage
What is Kafka?
Overview
Apache Kafka is an open-source stream processing platform developed by the Apache Software Foundation written in Scala and Java. The project aims to provide a unified, high-throughput, low-latency platform for handling real-time data feeds. Its storage layer is essentially a "massively scalable pub/sub message queue architected as a distributed transaction log," making it highly valuable for enterprise infrastructures to process streaming data. Additionally, Kafka connects to external systems (for data import/export) via Kafka Connect and provides Kafka Streams, a Java stream processing library.
The design is heavily influenced by transaction logs.
Apache Kafka Architecture
Kafka stores messages which come from arbitrarily many processes called "producers". The data can thereby be partitioned in different "partitions" within different "topics". Within a partition the messages are indexed and stored together with a timestamp. Other processes called "consumers" can query messages from partitions. Kafka runs on a cluster of one or more servers and the partitions can be distributed across cluster nodes.
Apache Kafka efficiently processes the real-time and streaming data when used along with Apache Storm, Apache HBase and Apache Spark. Deployed as a cluster on multiple servers, Kafka handles its entire publish and subscribe messaging system with the help of four APIs, namely, producer API, consumer API, streams API and connector API. Its ability to deliver massive streams of message in a fault-tolerant fashion has made it replace some of the conventional messaging systems like JMS, AMQP, etc.
The major terms of Kafka's architecture are topics, records, and brokers. Topics consist of stream of records holding different information. On the other hand, Brokers are responsible for replicating the messages.
- Producer API - Permits the applications to publish streams of records. (covered in this article)
- Consumer API - Permits the application to subscribe to the topics and processes the stream of records. (covered in this article)
- Streams API This API converts the input streams to output and produces the result. (covered in this article)
- Connector API Executes the reusable producer and consumer APIs that can link the topics to the existing applications. (not covered in this article)
Anatomy of a Kafka Topic
Offset : messages in the partitions are each assigned a unique (per partition) and sequential Id, called the "offset". The "offset" is tracked by consumers, where each consumer tracks via (offset, partition, topic) tuples
Consumer Groups
Consumers label themselves with a consumer group name, and each record published to a topic is delivered to one consumer instance within each subscribing consumer group. Consumer instances can be in separate processes or on separate machines.
If all the consumer instances have the same consumer group, then the records will effectively be load balanced over the consumer instances.
If all the consumer instances have different consumer groups, then each record will be broadcast to all the consumer processes
A two server Kafka cluster hosting four partitions (P0-P3) with two consumer groups. Consumer group A has two consumer instances and group B has four.
Kafka Performance
Due to its widespread integration into enterprise-level infrastructures, monitoring Kafka performance at scale has become an increasingly important issue. Monitoring end-to-end performance requires tracking metrics from brokers, consumer, and producers, in addition to monitoring ZooKeeper which is used by Kafka for coordination among consumers
https://en.wikipedia.org/wiki/Apache_Kafka up on date 02/01/18
What is Kafka Streams?
The Streams API of Apache Kafka, available through a Java library, can be used to build highly scalable, elastic, fault-tolerant, distributed applications and microservices. First and foremost, the Kafka Streams API allows you to create real-time applications that power your core business. It is the easiest yet the most powerful technology to process data stored in Kafka. It builds upon important concepts for stream processing such as efficient management of application state, fast and efficient aggregations and joins, properly distinguishing between event-time and processing-time, and seamless handling of late-arriving and out-of-order data.
A unique feature of the Kafka Streams API is that the applications you build with it are normal Java applications. These applications can be packaged, deployed, and monitored like any other Java application there is no need to install separate processing clusters or similar special-purpose and expensive infrastructure!
An application that uses the Kafka Streams API is a normal Java application. Package, deploy, and monitor it like you would do for any other Java application. Even so, your application will be highly scalable, elastic, and fault-tolerant.
https://docs.confluent.io/current/streams/introduction.html up on date 02/01/18
So that is what the official docs say about it. Here is my take on it
Kafka Streams is an additional API on top of Kafka that allows you to perform many aggregate and filtering, time based windowing operations over the incoming messages that can either be stored to an internal database key-value representation known as a KTable which uses a state store (based on RocksDB), or you may choose to push the transformed stream values out to a new output topic.
You can perform complex stream processing, merge streams, and also store accumulated stream state.
It is a an AWESOME bit of kit
What is Play?
The Play Framework is a Scala based MVC (model view controller) type web application framework. As such it has in built mechanisms for things typical of a MVC web framework (certainly if you have done any ASP MVC .NET you would find it very familiar).
So we have the typical MVC concerns covered by the Play Framework
- Controllers
- Actions
- Routing
- Model binding
- JSON support
- View engine
Thing is, I will not be doing any actual HTML in the Play Framework back end code, I want to do all of that using the previously covered webpack/typescript/react starter code I have shown so far. Rather I will be using the Play Framework as a API backend, where we will simply be using various controllers as endpoint to accept/serve JSON, and Event streamed data. All the actual front end work/routing will be done via webpack and React.
There are still some very appealing parts in Play that I did want to make use of, such as:
- It is Scala, which means when I come to integrate Kafka / Kafka Streams it will be ready to do so
- It uses Akka which I wanted to use. I also want to use Akka streams, which Play also supports
- Play controllers lend themselves quite nicely to being able to create a fairly simple REST API
- It can be used fairly easily to serve static files (think of these as the final artifacts that come out of the webpack generation pipeline). So things like minimized CSS / JS etc etc
So hopefully you can see that using Play Framework still made a lot of sense, even if we will only end up using 1/2 of what it has to offer. To be honest the idea of using controllers for a REST API is something that is done in ASP MVC .NET all time either by using of actual controllers or by using the WebApi.
Ok so now that we know what we will be using Play Framework for, how about we dive into the code for this post.
Play Framework Basics
Lets start by looking at the bare bones structure of a Play Framework application, which looks like this (I am using IntelliJ IDEA as my IDE)
Lets talk a bit about each of these folders
app
This folder would hold controllers/views (I have added the Entities folder there that is not part of a Play Framework application requirements). Inside the controllers folder you would find controllers, and inside the views folder you would find views . For the final app there will be no views folder, I simply kept that in this screenshot to talk about what a standard Play Framework application looks like
conf
This folder contains the configuration for the Play Framework application. This would include the special routes file, and any other application specific configuration would might have.
Lets just spend a minute having a look at the Play Framework routes file, which you can read more about here : https://www.playframework.com/documentation/2.5.x/ScalaRouting
The routes file has its own DSL, that is responsible for matching a give route with a controller + controller action. The controller action that matches the route is ultimately responsible for servicing the http request. I think the DSL shown in routes file below is pretty self explanatory with perhaps the exception of the assets based routes.
All assets based http requests (ie ones that start with /assets for example http://localhost:9000/assets/images/favicon.png would actually be routed through to a special controller called Assets. You dont see any code for this one, its part of the Play Framework application codebase. This special Assets inbuilt play controller is responsible for serving up static data files which it expects to find in the public folder. So for example our initial request of http://localhost:9000/assets/images/favicon.png would get translated into this file (relative path from project root) /public/images/favicon.png. As I say this is handled for you by the special Assets built in controller.
The only other funky part to the Assets based route is that it uses a *file in its route. Which essentially boils down to the play framework being able match a multi-part path. Which we actually just saw with the example above http://localhost:9000/assets/images/favicon.png , see how that contains not only the file name, but also a directory of images . The Assets controller + routing is able to deal with that path just fine.
# Routes # This file defines all application routes (Higher priority routes first) # ~~~~ # Home page GET / controllers.HomeController.index() GET /scala/comet/liveClock controllers.ScalaCometController.streamClock() GET /scala/comet/kick controllers.ScalaCometController.kickRandomTime() # Map static resources from the /public folder to the /assets URL path GET /assets/*file controllers.Assets.at(path="/public", file)
Ok so moving on to the rest of the standard folders that come with a Play Framework application
public
This is where you will need to put any static content that you wish to be served. Obviously views (if you use that part of play) will be within in the app/views folder. Like I say I am not using the views aspect of Play so you will not be seeing any views in my views folder. I instead want to let webpack et all generate my routing, web page etc etc. I do however want to serve bundles so later on I will be showing you how my webpack generated bundles fit in with the Play Framework ecco system.
target
Since this is a scala based project we get the standard scala based folders, and target is one of them, that has the generated/compiled code in it.
SBT
It is worth pointing out that my Play Framework application is an SBT based project, as such there is an SBT aspect to it, which largely boils down to these files
Project [root-build] / plugs.sbt file
This file adds Play as a plugin for the SBT project
// The Lightbend repository resolvers += Resolver.typesafeRepo("releases") // Use the Play sbt plugin for Play projects addSbtPlugin("com.typesafe.play" % "sbt-plugin" % "2.5.14")
build.sbt
This is the main SBT build file for the project. This is where all our external dependencies are brought in etc etc (standard SBT stuff)
import play.sbt._ import sbt.Keys._ import sbt._ name := "play-streaming-scala" version := "1.0-SNAPSHOT" scalaVersion := "2.11.11" lazy val root = (project in file(".")).enablePlugins(play.sbt.PlayScala) javacOptions ++= Seq("-source", "1.8", "-target", "1.8", "-Xlint") initialize := { val _ = initialize.value if (sys.props("java.specification.version") != "1.8") sys.error("Java 8 is required for this project.") }
So I think that covers the basics of a standard Play Framework application, the remainder of this article will cover the actual code for the demo project, where we will dive into routes/controllers etc etc
What is Akka
Akka is a great framework for building distibuted fault tolerant apps. It is built around the concept of actors, and has failure in mnid from the outset. It also comes with suport for HTTP over actors, and comes with play suport out of the box. One thing that also comes with Akka that I make use of is its reactive streams API. This is something like Rx whre you build streams of data that are also fault tolerant, and also feature back pressure to ensure a consumer is not over burdened by a fast producer. I have written a lot about Akka before which you can read more about here : https://sachabarbs.wordpress.com/akka-series/ where I cover quite a lot of Akkas functionality
What is WebPack?
We are headed towards an era where browsers will natively support modules, and dependencies but we are not there yet. Over the years there have been many attempts at JavaScript module management, and dependency tracking of related files, such as
- ASP MVC bundles (fairly simplistic only bundles no other features)
- CommonJs (require syntax, which is fairly intrusive in how your JS files work)
- Browserify (again fairly intrusive in how your JS files are loaded and what they need to manage dependencies)
- NPM (not really a module manager as such more like NuGet for installing packages)
- SystemJS (has import/export syntax but a bit more syntax than Webpack)
- JSPM uses SystemJS (fairly nice but not in as much use as Webpack)
- Various task runners can be used such as Grunt/Gulp to build bundles
All of these have there pros/cons, however I think that it is fair to say that WebPack has emerged as the defacto standard module manager for JavaScript (for now anyway), that supports rich dependency graphs and bundles and also supports things like
- Transpilers (such as Babel/Typescript/SASS/SCSS/LESS etc etc)
- Sourcemaps (to go from your transpiled JS back to what you wrote)
- Minification
- File hashing (to allow browsers to load latest bundles as it has a different hash)
This is kind of what you get with webpack, where it is very clever about preserving the dependency graph and the bundles created, and allows you to use things like export
and import
to manage your inter dependencies
What is Babel?
Babel is a very neat JavaScript library that allows you to use next generation JavaScript syntax now, even if your targetted browser doesn't suport the syntax you are trying to use. Is is installed via the Node Packager Manager (NPM) and is one part of you transpilation pipeline. For this article we push stuff through TypeScript transpiler -> Babel transpiler and the final result is JavaScript that can be sent ot the browser.
Using babel this is the sort of things you will be able to do, where native browser support for these may be varied, or not supported at all
What is SASS/SCSS/LESS
I guess if you have used HTML you will have certainluy used CSS. Now CSS is great, but it lacks certain things such as
- Variables
- Heirachy selectors
- Mixins
This is something that SASS/SCSS fixes. In much the same way that things like Typescript are transpiled to JavaScript, SASS/SCSS is transpiled to CSS.
By using SCSS you are now able to do things like this, note the variables
$blue: #3bbfce; $margin: 16px; .content_navigation { border-color: $blue; color: darken($blue, 10%); } .border { padding: $margin / 2; margin: $margin / 2; border-color: $blue; }
What is React?
Unless you have been living under a rock you will have heard/used/seen React. React is maintained and developed by Facebook, and it is a JavaScript library for building user interfaces. It is not as much as a framework as say something like Angular is, and it is really just the "View" part of the typical MVC puzzle that is so prevelant in modern JS UI libraries/frameworks. It is declarative, and component centric. It is accepted that a React app would be made up of many smaller components. I can't give you a full walkthrough of React here, but we will certainly see more React code/components as the article demo code is dissected below
What is TypeScript?
Typescript is a Microsoft offering that attempts to bring better typings and other useful constructs/language features to JavaScript. As TypeScript is a superset of JavaScript, existing JavaScript programs are also valid TypeScript programs. Typescript files will get transpiled into regular javascript files via a Typescript compiler (or in the case of WebPack via a Webpack loader)
Some of the features that TypeScript brings to the table are
- Types
- Interfaces
- Parameter types
- Enums
- Generics
The demo article that goes with this makes a lot of use of TypeScript (particularly TSX which is a TypeScript verison of a React JSX file), and will also make use of a nice Inversion Of Control container for TypeScript called Inversify.
The app
The next set of sections will walk through the demo app. I have decided to take a workflow approach to describing the app, and as such will walk/talk through each of the workflows, which I think is the best way to do it
The app has MANY moving parts, but can largely be broken down into the following areas
- Play backend API (scala)
- React front end (TypeScript)
- Kafka Streams (scala)
We will get to all of these parts, but before that let's examine some of the stuff that helps the front end work get of the ground
NPM requirements
The demo project uses quite a few components, such as
- TypeScript
- React
- Babel
- RX
- SCSS
- Various other libraries
As such we need a way to pull all these packages into use, so for that and the front end side of things we use NPM which you will need to have installed. The following shows the NPM dependencies for the demo app PlayBackEndApi/FrontEndWebSite/package.json
file
{ "name": "task1webpackconfig", "version": "1.0.0", "description": "webpack 2 + TypeScript 2 + Babel example", "repository": { "type": "git", "url": "git+https://github.com/sachabarber/MadCapIdea.git" }, "keywords": [ "babel", "typescript", "webpack", "bundling", "javascript", "npm" ], "author": "sacha barber", "homepage": "https://github.com/sachabarber/MadCapIdea#readme", "dependencies": { "bootstrap": "^3.3.7", "inversify": "^4.1.0", "jquery": "^3.2.1", "lodash": "^4.17.4", "react": "^15.5.4", "react-bootstrap": "^0.28.1", "react-bootstrap-validation": "^0.1.11", "react-dom": "^15.5.4", "react-google-maps": "^7.0.0", "react-measure": "^2.0.2", "react-router": "^3.0.5", "react-stars": "^2.1.0", "reflect-metadata": "^0.1.10", "revalidator": "^0.3.1", "rx": "^4.1.0", "webpack": "^2.5.1", "webpack-merge": "^4.1.0" }, "devDependencies": { "@types/jquery": "^2.0.43", "@types/lodash": "^4.14.63", "@types/react": "^15.0.24", "@types/react-dom": "^15.5.0", "@types/rx": "^4.1.1", "awesome-typescript-loader": "^3.1.3", "babel-core": "^6.24.1", "babel-loader": "^7.0.0", "babel-preset-es2015": "^6.24.1", "babel-preset-es2015-native-modules": "^6.9.4", "babel-preset-react": "^6.24.1", "css-loader": "^0.28.1", "extract-text-webpack-plugin": "^2.1.0", "file-loader": "^0.11.1", "html-webpack-plugin": "^2.28.0", "node-sass": "^4.5.2", "on-build-webpack": "^0.1.0", "sass-loader": "^6.0.3", "source-map-loader": "^0.2.1", "typescript": "^2.3.2", "url-loader": "^0.5.8", "webpack": "^2.4.1" }, "scripts": { "build-dev": "webpack -d --config webpack.develop.js", "build-prod": "webpack --config webpack.production.js" } }
Babel config
As stated above the demo project makes use of Babeljs.io to use future state JavaScript now. As such we also need to supply a small Babel config file (PlayBackEndApi/FrontEndWebSite/.babelrc
), this is shown below, where we opt to use the ES2015/React presets
{ "presets": ["es2015","react"] }
TypeScript config
The demo also makes use of TypeScript which also need some specific configuration. As such we also need to supply a small TypeScript config file (PlayBackEndApi/FrontEndWebSite/tsconfig.json
), this is shown below, the full set of TypeScript options can also be found here :https://www.typescriptlang.org/docs/handbook/tsconfig-json.html
{ "compilerOptions": { "allowSyntheticDefaultImports": true, "moduleResolution": "node", "outDir": "./dist/", "sourceMap": true, "noImplicitAny": false, "module": "es2015", "target": "es5", "lib": ["es6", "dom"], "jsx": "react", "experimentalDecorators": true, "emitDecoratorMetadata": true, "types" : ["jquery", "lodash", "react", "react-dom", "reflect-metadata"] }, "include": [ "./src/**/*" ] }
Sourcemap files
Sourcemaps are a magical thing that allow you to write your JavaScript in TypeScript say which is then possibly run through other transpilers (such as Babel) and then webpack where it is bundled according to your rules, where you then send the JavaScript to the browser. That sounds great, but do you write code write code first time, I don't, and constantly need to debug stuff.
So if I wrote stuff in TypeScript and that gets turned into plain vanilla JavaScript, how on earth do I make sense of the stuff that is in the browser?
Well that is where sourcemaps come in, the literally send a clever map that allows you to put a break point into your source code (which will be sent to the browser, ideally only in development phases) and it will know how to translate that breakpoint into the correct place/line in the transpiled vanilla JavaScript that the browser is using.
Webpack has in built support for sourcemaps (phew)
The best write up of source maps I have seen is this one :https://www.html5rocks.com/en/tutorials/developertools/sourcemaps/
It should be noted that SourceMaps should only be used in development phases, as in production you should/would want to minify your JavaScript too.
How does Webpack/Play work with this?
WebPack has this concept of loaders that are what gets used to load different file types. So for example you can use a TypeScript loader that gets piped through a babel loader, where the final result will be vanilla JavaScript.
- You can also control how you final bundle will work, what name it has, where is will be generated etc etc
- How minification will work
- Configure loaders
- Turn on SourceMap
- Configure things like jquery and lodash to appear in the usual places as
$
and_
respectively
Perhaps its best to see the demo codes WebPack file which looks like this
PlayBackEndApi/FrontEndWebSite/webpack.config.js
let _ = require('lodash'); let webpack = require('webpack'); let path = require('path'); let fs = require("fs"); let WebpackOnBuildPlugin = require('on-build-webpack'); let ExtractTextPlugin = require('extract-text-webpack-plugin'); let HtmlWebpackPlugin = require('html-webpack-plugin'); let babelOptions = { "presets": ["es2015", "react"] }; function isVendor(module) { return module.context && module.context.indexOf('node_modules') !== -1; } let entries = { index: './src/index.tsx', indexCss: './scss/index.scss' }; //build it to the Play Framework public folder, which is services by the assets controller let buildDir = path.resolve(__dirname, '../public/dist'); module.exports = { context: __dirname, entry: entries, output: { filename: '[name].bundle.[hash].js', path: buildDir, //this is to make it play nice with the Play Framework Assets controllers //that deals with static data publicPath: '/assets/dist' }, // these break for node 5.3+ when building WS stuff node: { fs: 'empty' }, watch: true, devServer: { open: true, // to open the local server in browser contentBase: __dirname, }, // Enable sourcemaps for debugging webpack's output. devtool: "source-map", resolve: { extensions: [".tsx", ".ts", ".js", ".jsx"], modules: [path.resolve(__dirname, "src"), "node_modules"] }, plugins: [ //The ProvidePlugin makes a module available as a variable in every other //module required by webpack new webpack.ProvidePlugin({ $: "jquery", jQuery: "jquery", "window.jQuery": "jquery" }), // creates a common vendor js file for libraries in node_modules new webpack.optimize.CommonsChunkPlugin({ names: ['vendor'], minChunks: function (module, count) { return isVendor(module); } }), // creates a common vendor js file for libraries in node_modules new webpack.optimize.CommonsChunkPlugin({ name: "commons", chunks: _.keys(entries), minChunks: function (module, count) { return !isVendor(module) && count > 1; } }), //will unlink unused files on a build //http://stackoverflow.com/questions/40370749/how-to-remove-old-files-from-the-build-dir-when-webpack-watch new WebpackOnBuildPlugin(function (stats) { const newlyCreatedAssets = stats.compilation.assets; const unlinked = []; fs.readdir(path.resolve(buildDir), (err, files) => { files.forEach(file => { if (file != "fonts") { if (!newlyCreatedAssets[file]) { fs.unlink(path.resolve(buildDir + '\\' + file)); unlinked.push(file); } } }); if (unlinked.length > 0) { console.log('Removed old assets: ', unlinked); } }) }), //scss/sass files extracted to common css bundle new ExtractTextPlugin({ filename: '[name].bundle.[hash].css', allChunks: true, }), new HtmlWebpackPlugin({ filename: 'index.html', template: 'template.html', }) ], module: { rules: [ // All files with a '.ts' or '.tsx' extension will be handled by 'awesome-typescript-loader' 1st // then 'babel-loader' // NOTE : loaders run right to left (think of them as a cmd line pipe) { test: /\.ts(x?)$/, exclude: /node_modules/, use: [ { loader: 'babel-loader', options: babelOptions }, { loader: 'awesome-typescript-loader' } ] }, // All files with a .css extenson will be handled by 'css-loader' { test: /\.css$/, loader: ExtractTextPlugin.extract(['css-loader?importLoaders=1']), }, // All files with a .scss|.sass extenson will be handled by 'sass-loader' { test: /\.(sass|scss)$/, loader: ExtractTextPlugin.extract(['css-loader', 'sass-loader']) }, // All files with a '.js' extension will be handled by 'babel-loader'. { test: /\.js$/, exclude: /node_modules/, use: [ { loader: 'babel-loader', options: babelOptions } ] }, { test: /\.png$/, loader: "url-loader?limit=100000" }, { test: /\.jpg$/, loader: "file-loader" }, { test: /\.woff(\?.*)?$/, loader: 'url-loader?prefix=fonts/&name=fonts/[name].[ext]&limit=10000&mimetype=application/font-woff' }, { test: /\.woff2(\?.*)?$/, loader: 'url-loader?prefix=fonts/&name=fonts/[name].[ext]&limit=10000&mimetype=application/font-woff2' }, { test: /\.ttf(\?.*)?$/, loader: 'url-loader?prefix=fonts/&name=fonts/[name].[ext]&limit=10000&mimetype=application/octet-stream' }, { test: /\.eot(\?.*)?$/, loader: 'file-loader?prefix=fonts/&name=fonts/[name].[ext]' }, { test: /\.svg(\?.*)?$/, loader: 'url-loader?prefix=fonts/&name=fonts/[name].[ext]&limit=10000&mimetype=image/svg+xml' }, // All output '.js' files will have any sourcemaps re-processed by 'source-map-loader'. { enforce: "pre", test: /\.js$/, loader: "source-map-loader" } ] } };
Of particular note is this part
output: { filename: '[name].bundle.[hash].js', path: buildDir, //this is to make it play nice with the Play Framework Assets controllers //that deals with static data publicPath: '/assets/dist' }
What that is doing is using a hash for the file name, such that the browser would immediately see a change in the file, so would reload it, and not cache old JavaScript. The other part of this is where the file gets generated. As this "front end" part still needs to be hosted in back end (Play framework for this article) we need to ensure that the final JavaScript ends up in a location that the Play framework is able to render. For this article this means the static Play framework route which is configured as follows in the PlayBackEndApi/routes
file in the PlayBackEndApi
project
GET /assets/*file controllers.Assets.at(path="/public", file)
PlayBackEndApi/FrontEndWebSite/webpack.develop.js
Where we might have a specialized develop phase variant (see how we use the base file webpack.config.js
as a base) file that looks like this
let commonConfig = require('./webpack.config.js'); let webpack = require('webpack'); let Merge = require('webpack-merge'); module.exports = function (env) { return Merge(commonConfig, {}) }
PlayBackEndApi/FrontEndWebSite/webpack.production.js
Where we might have a specialized production phase variant (see how we use the base file webpack.config.js
as a base) file that looks like this, where we do things like this to make the emitted JavaScript production like
- Minify it
- Strip comments
- Strip console.log (some browsers don't have it)
- Don't enable SourceMaps
let commonConfig = require('./webpack.config.js'); let webpack = require('webpack'); let Merge = require('webpack-merge'); module.exports = function (env) { return Merge(commonConfig, { plugins: [ new webpack.LoaderOptionsPlugin({ minimize: true, debug: false }), new webpack.optimize.UglifyJsPlugin({ // Eliminate comments comments: false, beautify: false, mangle: { screw_ie8: true, keep_fnames: true }, compress: { screw_ie8: true, // remove warnings warnings: false, // Drop console statements drop_console: true }, comments: false, sourceMap: false }) ] }) }
The SPA
So the React front end is really a Single Page Application, as such you can expect to find a single page. So just where is that single page?
We we need to go back to some of the stuff in the webpack.config.js
file, specifically these bits
//build it to the Play Framework public folder, which is services by the assets controller let buildDir = path.resolve(__dirname, '../public/dist'); output: { filename: '[name].bundle.[hash].js', path: buildDir, //this is to make it play nice with the Play Framework Assets controllers //that deals with static data publicPath: '/assets/dist' }, .... .... .... .... new HtmlWebpackPlugin({ filename: 'index.html', template: 'template.html', })
What this is doing is saying that Webpack will try and generate a file called index.html
by using a template called template.html
and the final file will be put in a relative path called /assets/dist
, which if you go back a bit was mapped using the following in the PlayBackEndApi/routes
file in the PlayBackEndApi
project
GET /assets/*file controllers.Assets.at(path="/public", file)
PlayBackEndApi/FrontEndWebSite/template.html
So what does the template.html
file look like, well here it is, where it also includes a CDN references to Google maps API as we use that in this article
<!DOCTYPE html> <html> <head> <meta charset="UTF-8" /> <title>Hello React!</title> <script src="https://maps.googleapis.com/maps/api/js?key=AIzaSyBVtreRNA537_WsNSn2_kOiz3Xhm8w6pEo" type="text/javascript"></script> </head> <body> <div> <iframe id="comet" src="/job/streamedJob"></iframe> </div> <div id="root"></div> <!-- Main --> </body> </html>
PlayBackEndApi/public/dist/index.html
So after WebPack has worked its magic, what does the final file look like. Well it looks like this
<!DOCTYPE html> <html> <head> <meta charset="UTF-8" /> <title>Hello React!</title> <script src="https://maps.googleapis.com/maps/api/js?key=AIzaSyBVtreRNA537_WsNSn2_kOiz3Xhm8w6pEo" type="text/javascript"></script> <link href="/assets/dist/vendor.bundle.5c5feaa8663412cf31c5.css" rel="stylesheet"> <link href="/assets/dist/indexCss.bundle.5c5feaa8663412cf31c5.css" rel="stylesheet"></head> <body> <div> <iframe id="comet" src="/job/streamedJob"></iframe> </div> <div id="root"></div> <!-- Main --> <script type="text/javascript" src="/assets/dist/vendor.bundle.5c5feaa8663412cf31c5.js"></script> <script type="text/javascript" src="/assets/dist/index.bundle.5c5feaa8663412cf31c5.js"></script> </body> </html>
See how the correct Css and JavaScript files have been placed into the HEAD/Script tags automatically, this is great I think, and they also have nice hashes as part of the file name, to allow the browser caches to be easily invalidated
Play "Home" route
So that's great but how does this page get served up. Well the trick to that lies in this Play framework route entry
# Home page GET / controllers.HomeController.index()
Which has this server side code to serve the route
package controllers import javax.inject.Inject import play.api.mvc.{Action, Controller} class HomeController @Inject() (environment: play.api.Environment) extends Controller { def index() = Action { val fullpath = s"${environment.rootPath}\\public\\dist\\index.html" val htmlContents = scala.io.Source.fromFile(fullpath).mkString Ok(htmlContents).as("text/html") } }
Routing
As I stated above this demo is a Singlle Page App (SPA) type of app. So how is that done. That is done in this file PlayBackEndApi/FrontEndWebSite/src/index.tsx
, which is also set as an entry point in the WebPack webpack.config.js
file.
let entries = { index: './src/index.tsx', ..... ..... ..... ..... }; //build it to the Play Framework public folder, which is services by the assets controller let buildDir = path.resolve(__dirname, '../public/dist'); module.exports = { context: __dirname, entry: entries, ..... ..... ..... ..... ..... }
PlayBackEndApi/FrontEndWebSite/src/index.tsx
Where the actual routing work is done like this, where the main top level components will be
- Login
- Register
- Logout
- CreateJob
- ViewJob
- ViewRating
Some of which are conditional routes depending on whether you are logged in or not.
import * as React from "react"; import * as ReactDOM from "react-dom"; import 'bootstrap/dist/css/bootstrap.css'; import { Nav, Navbar, NavItem, NavDropdown, MenuItem, Button } from "react-bootstrap"; import { Router, Route, hashHistory } from 'react-router' import { Login } from "./Login"; import { Logout } from "./Logout"; import { Register } from "./Register"; import { CreateJob } from "./CreateJob"; import { ViewJob } from "./ViewJob"; import { ViewRating } from "./ViewRating"; import { ContainerOperations } from "./ioc/ContainerOperations"; import { AuthService } from "./services/AuthService"; import { JobService } from "./services/JobService"; import { JobStreamService } from "./services/JobStreamService"; import { PositionService } from "./services/PositionService"; import { TYPES } from "./types"; import Rx from 'rx'; let authService = ContainerOperations.getInstance().container.get<AuthService>(TYPES.AuthService); let jobService = ContainerOperations.getInstance().container.get<JobService>(TYPES.JobService); let jobStreamService = ContainerOperations.getInstance().container.get<JobStreamService>(TYPES.JobStreamService); let positionService = ContainerOperations.getInstance().container.get<PositionService>(TYPES.PositionService); jobStreamService.init(); export interface MainNavProps { authService: AuthService; jobService: JobService; jobStreamService: JobStreamService; positionService: PositionService; } export interface MainNavState { isLoggedIn: boolean; } class MainNav extends React.Component<MainNavProps, MainNavState> { private _subscription: any; constructor(props: any) { super(props); console.log(props); this.state = { isLoggedIn: false }; } componentWillMount() { this._subscription = this.props.authService.getAuthenticationStream() .subscribe(isAuthenticated => { this.state = { isLoggedIn: isAuthenticated }; if (this.state.isLoggedIn) { hashHistory.push('/createjob'); } else { hashHistory.push('/'); } }); } componentWillUnmount() { this._subscription.dispose(); } render() { return ( this.state.isLoggedIn ? <Navbar collapseOnSelect> <Navbar.Header> <Navbar.Brand> <span>Simple Kafka-Uber</span> </Navbar.Brand> <Navbar.Toggle /> </Navbar.Header> <Navbar.Collapse> <Nav pullRight> <NavItem eventKey={2} href='#/logout'>Logout</NavItem> <NavItem eventKey={2} href='#/createjob'>Create Job</NavItem> <NavItem eventKey={2} href='#/viewjob'>View Job</NavItem> <NavItem eventKey={2} href='#/viewrating'>View Rating</NavItem> </Nav> </Navbar.Collapse> </Navbar> : <Navbar pullRight collapseOnSelect> <Navbar.Header> <Navbar.Brand> <span>Simple Kafka-Uber</span> </Navbar.Brand> <Navbar.Toggle /> </Navbar.Header> <Navbar.Collapse> </Navbar.Collapse> </Navbar> ) } } class App extends React.Component<undefined, undefined> { render() { return ( <div> <div> <MainNav authService={authService} jobService={jobService} jobStreamService={jobStreamService} positionService={positionService} /> {this.props.children} </div> </div> ) } } ReactDOM.render(( <Router history={hashHistory}> <Route component={App}> <Route path="/" component={Login} authService={authService} /> <Route path="/register" component={Register} authService={authService} /> <Route path="/logout" component={Logout} authService={authService} jobService={jobService} positionService={positionService} /> <Route path="/createjob" component={CreateJob} authService={authService} jobService={jobService} positionService={positionService} /> <Route path="/viewjob" component={ViewJob} authService={authService} jobService={jobService} jobStreamService={jobStreamService} positionService={positionService} /> <Route path="/viewrating" component={ViewRating} authService={authService} /> </Route> </Router> ), document.getElementById('root'));
This makes use of ReactRouter
version 3.0.5 to do the routing within the SPA, and also make use of React-Bootstrap for rendering the NavBar
This TypeScipt also hooks up a RxJS subcription to determine whether you are logged in/out, this is done using the following injectable service PlayBackEndApi/FrontEndWebSite/src/services/AuthService.ts
import { injectable, inject } from "inversify"; import { TYPES } from "../types"; import Rx from 'rx'; @injectable() export class AuthService { private _isAuthenticated: boolean; private _authenticatedSubject = new Rx.Subject<boolean>(); constructor() { } clearUser = (): void => { this._isAuthenticated = false; sessionStorage.removeItem('currentUserProfile'); this._authenticatedSubject.onNext(false); } storeUser = (currentProfile: any): void => { if (currentProfile == null || currentProfile == undefined) return; this._isAuthenticated = true; sessionStorage.setItem('currentUserProfile', JSON.stringify(currentProfile)); this._authenticatedSubject.onNext(true); } userName = (): string => { var userProfile = JSON.parse(sessionStorage.getItem('currentUserProfile')); return userProfile.user.fullName; } user = (): any => { var userProfile = JSON.parse(sessionStorage.getItem('currentUserProfile')); return userProfile.user; } userEmail = (): string => { var userProfile = JSON.parse(sessionStorage.getItem('currentUserProfile')); return userProfile.user.email; } isDriver = (): boolean => { var userProfile = JSON.parse(sessionStorage.getItem('currentUserProfile')); return userProfile.isDriver; } isAuthenticated = (): boolean => { return this._isAuthenticated; } getAuthenticationStream = (): Rx.Observable<boolean> => { return this._authenticatedSubject.asObservable(); } }
Some common React / Bootstrap UI components
As with any UI work, you will inevitably end up where you need some core components that you re-use over and over again. For the demo app I had these 3 reusable React components that make use of React/React-Bootstrap
YesNoDialog
This represents a generic re-usable yes/no dialog that can be triggered, here is the code for this one. The important part is that the various prop values can be controlled via the parent component state values
import * as React from "react"; import * as ReactDOM from "react-dom"; import * as _ from "lodash"; import 'bootstrap/dist/css/bootstrap.css'; import { Button, Modal } from "react-bootstrap"; //TODO : Fix this export interface YesNoDialogProps { headerText: string; theId: string; launchButtonText: string; yesCallBack: any; noCallBack: any; } export interface YesNoDialogState { showModal: boolean; } export class YesNoDialog extends React.Component<YesNoDialogProps, YesNoDialogState> { constructor(props) { super(props); console.log(this.props); //set initial state this.state = { showModal: false }; } _yesClicked = () => { this.setState({ showModal: false }); this.props.yesCallBack(); } _noClicked = () => { this.setState({ showModal: false }); this.props.noCallBack(); } _close = () => { this.setState({ showModal: false }); } _open = () => { this.setState({ showModal: true }); } render() { return ( <div className="leftFloat"> <Button id={this.props.theId} type='button' bsSize='small' bsStyle='primary' onClick={this._open}>{this.props.launchButtonText}</Button> <Modal show={this.state.showModal} onHide={this._close}> <Modal.Header closeButton> <Modal.Title>{ this.props.headerText }</Modal.Title> </Modal.Header> <Modal.Body> <h4>Are you sure?</h4> </Modal.Body> <Modal.Footer> <Button type='button' bsSize='small' bsStyle='primary' onClick={this._yesClicked}>Yes</Button> <Button type='button' bsSize='small' bsStyle='danger' onClick={this._noClicked}>Cancel</Button> </Modal.Footer> </Modal> </div> ); } }
This looks like this when rendered
OkDialog
This represents a generic re-usable ok dialog that can be triggered, here is the code for this one. The important part is that the various prop values can be controlled via the parent component state values
import * as React from "react"; import * as ReactDOM from "react-dom"; import * as _ from "lodash"; import 'bootstrap/dist/css/bootstrap.css'; import { Button, Modal } from "react-bootstrap"; //TODO : Fix this export interface OkDialogProps { headerText: string; bodyText: string; open: boolean; okCallBack: any; } export interface OkDialogState { showModal: boolean; } export class OkDialog extends React.Component<OkDialogProps, OkDialogState> { constructor(props) { super(props); console.log(this.props); //set initial state this.state = { showModal: false }; } componentDidMount() { if (this.props.open === true) { this.setState({ showModal: true }); } } _okClicked = () => { this.setState({ showModal: false }); this.props.okCallBack(); } _close = () => { this.setState({ showModal: false }); this.props.okCallBack(); } _open = () => { this.setState({ showModal: true }); } render() { return ( <div className="leftFloat"> <Modal show={this.state.showModal} onHide={this._close}> <Modal.Header closeButton> <Modal.Title>{ this.props.headerText }</Modal.Title> </Modal.Header> <Modal.Body> <h4>{this.props.bodyText}</h4> </Modal.Body> <Modal.Footer> <Button type='button' bsSize='small' bsStyle='primary' onClick={this._okClicked}>Ok</Button> </Modal.Footer> </Modal> </div> ); } }
This looks like this when rendered
RatingDialog
This represents a generic re-usable rating control where rating can be from 1-5; here is the code for this one. The important part is that the various prop values can be controlled via the parent component state values
import * as React from "react"; import * as ReactDOM from "react-dom"; import * as _ from "lodash"; import 'bootstrap/dist/css/bootstrap.css'; import { Button, Modal } from "react-bootstrap"; import ReactStars from 'react-stars'; export interface RatingDialogProps { headerText: string; theId: string; okCallBack: any; } export interface RatingDialogState { showModal: boolean; rating: number; } export class RatingDialog extends React.Component<RatingDialogProps, RatingDialogState> { constructor(props) { super(props); console.log(this.props); //set initial state this.state = { showModal: false, rating:0 }; } _close = () => { this.setState( { showModal: false, rating:0 } ); } _open = () => { this.setState( { showModal: true, rating: 0 } ); } _ratingChanged = (newRating) => { console.log(newRating) this.setState( { rating: newRating } ); } _okClicked = () => { this._close(); this.props.okCallBack(); } render() { return ( <div className="leftFloat"> <Button id={this.props.theId} type='button' bsSize='small' bsStyle='primary' onClick={this._open}>Complete</Button> <Modal show={this.state.showModal} onHide={this._close}> <Modal.Header closeButton> <Modal.Title>{ this.props.headerText }</Modal.Title> </Modal.Header> <Modal.Body> <h4>Give your rating between 1-5</h4> <ReactStars count={5} onChange={this._ratingChanged} size={24} color2={'#ffd700'} /> </Modal.Body> <Modal.Footer> <Button type='submit' bsSize='small' bsStyle='primary' onClick={this._okClicked}>Ok</Button> </Modal.Footer> </Modal> </div> );
This looks like this when rendered
For the rating component I make use of this React library : https://www.npmjs.com/package/react-stars
Registration workflow
The registration workflow is pretty much as described above, and uses the pieces shown above.
There are 2 types of registration, where we need to capture different data
- Passenger registration
- Driver registration
We will examine the passenger registration in detail, with the driver registration being fairly similar in nature
Passenger registration
Registration React component
PlayBackEndApi/FrontEndWebSite/src/PassengerRegistration.tsx
is the main file for the React TSX representing the Passenger registration component.
import * as React from "react"; import * as ReactDOM from "react-dom"; import { OkDialog } from "./components/OkDialog"; import 'bootstrap/dist/css/bootstrap.css'; import { Well, Grid, Row, Col, ButtonInput } from "react-bootstrap"; import { AuthService } from "./services/AuthService"; import { hashHistory } from 'react-router'; import { Form, ValidatedInput } from 'react-bootstrap-validation'; import revalidator from 'revalidator'; let schema = { properties: { fullName: { type: 'string', minLength: 8, maxLength: 60, required: true, allowEmpty: false }, email: { type: 'string', maxLength: 255, format: 'email', required: true, allowEmpty: false }, password: { type: 'string', minLength: 8, maxLength: 60, required: true, allowEmpty: false } } }; export interface PassengerRegistrationProps { authService: AuthService; } export interface PassengerRegistrationState { okDialogOpen: boolean; okDialogKey: number; okDialogHeaderText: string; okDialogBodyText: string; wasSuccessful: boolean; } export class PassengerRegistration extends React.Component<PassengerRegistrationProps, PassengerRegistrationState> { constructor(props: any) { super(props); this.state = { okDialogHeaderText: '', okDialogBodyText: '', okDialogOpen: false, okDialogKey: 0, wasSuccessful: false }; } render() { return ( <Form className="submittable-form-inner" // Supply callbacks to both valid and invalid // submit attempts validateAll={this.validateForm} onInvalidSubmit={this.handleInvalidSubmit} onValidSubmit={this.handleValidSubmit}> <Grid> <Row className="show-grid"> <Col xs={10} md={6}> <h4>Passenger details</h4> </Col> </Row> <Row className="show-grid"> <Col xs={10} md={6}> <ValidatedInput type='text' label='FullName' name='fullName' errorHelp='FullName is invalid' /> </Col> </Row> <Row className="show-grid"> <Col xs={10} md={6}> <ValidatedInput type='text' label='Email' name='email' errorHelp='Email address is invalid' /> </Col> </Row> <Row className="show-grid"> <Col xs={10} md={6}> <ValidatedInput type='password' label='Password' name='password' errorHelp='Password is invalid' /> </Col> </Row> <Row className="show-grid"> <Col xs={10} md={6}> <ButtonInput id="registerBtn" type='submit' bsSize='small' bsStyle='primary' value='Register'>Register</ButtonInput> </Col> </Row> <Row className="show-grid"> <span> <OkDialog open={this.state.okDialogOpen} okCallBack={this.okDialogCallBack} headerText={this.state.okDialogHeaderText} bodyText={this.state.okDialogBodyText} key={this.state.okDialogKey} /> </span> </Row> </Grid> </Form> ) } validateForm = (values) => { let res = revalidator.validate(values, schema); // If the values passed validation, we return true if (res.valid) { return true; } // Otherwise we should return an object containing errors // e.g. { email: true, password: true } return res.errors.reduce((errors, error) => { // Set each property to either true or // a string error description errors[error.property] = true; return errors; }, {}); } handleInvalidSubmit = (errors, values) => { // Errors is an array containing input names // that failed to validate this.setState( { okDialogHeaderText: 'Validation Error', okDialogBodyText: 'Form has errors and may not be submitted', okDialogOpen: true, okDialogKey: Math.random() }); } handleValidSubmit = (values) => { var passenger = values; var self = this; $.ajax({ type: 'POST', url: 'registration/save/passenger', data: JSON.stringify(passenger), contentType: "application/json; charset=utf-8", dataType: 'json' }) .done(function (jdata, textStatus, jqXHR) { var redactedPassenger = passenger; redactedPassenger.password = ""; console.log("redacted ${redactedPassenger}"); console.log(redactedPassenger); console.log("Auth Service"); console.log(self.props.authService); let userProfile = { isDriver: false, user: redactedPassenger }; self.setState( { wasSuccessful: true, okDialogHeaderText: 'Registration Successful', okDialogBodyText: 'You are now registered', okDialogOpen: true, okDialogKey: Math.random() }); self.props.authService.storeUser(userProfile); }) .fail(function (jqXHR, textStatus, errorThrown) { self.setState( { okDialogHeaderText: 'Error', okDialogBodyText: jqXHR.responseText, okDialogOpen: true, okDialogKey: Math.random() }); }); } okDialogCallBack = () => { this.setState( { okDialogOpen: false }); if (this.state.wasSuccessful) { hashHistory.push('/'); } } }
There are a couple of things of note there
- That we use a standard POST
- That will post the registration data as JSON to the Play API backend code
- That we also show a standard Bootstrap OkDialog which we looked at last time, which when the Ok button is clicked will use the React Router to navigate to the route page
- That we use the react-bootstrap-validation package for field validation
- That if the registration process is ok we store the user data in the
AuthorisationService.ts
that we saw earlier, where it in turn writes to the browsersessionStorage
Lets now turn our attention to the Play API backend code that goes with the Passenger Registration
Json Support
We need to start with the JSON suport from JavaScript to Scala. This is done using this file PlayBackEndApi/app/Entities/PassengerRegistrationEntities.scala
, where we use Play framework JSON combinators
- Reads : Which allows reading a JSON string into a Scala object
- Writes : Which allows a Scala object to be turned into a JSON string
- Format : is just a mix of the Reads and Writes Traits and can be used for implicit conversion in place of its components.
The recommendation for both of these is that they are exposed as implicit vals
you can read more about it here : https://www.playframework.com/documentation/2.6.x/ScalaJsonCombinators
package entities import play.api.libs.json._ import play.api.libs.functional.syntax._ case class PassengerRegistration( fullName: String, email: String, password: String) object PassengerRegistration { implicit val formatter = Json.format[PassengerRegistration] } object PassengerRegistrationJsonFormatters { implicit val passengerRegistrationWrites = new Writes[PassengerRegistration] { def writes(passengerRegistration: PassengerRegistration) = Json.obj( "fullName" -> passengerRegistration.fullName, "email" -> passengerRegistration.email, "password" -> passengerRegistration.password ) } implicit val passengerRegistrationReads: Reads[PassengerRegistration] = ( (JsPath \ "fullName").read[String] and (JsPath \ "email").read[String] and (JsPath \ "password").read[String] )(PassengerRegistration.apply _) }
Registration route
Once we have that in place we need to turn our attention to the actual endpoint to support the POST of a PassengerRegistration object. We first need to set up the route in the conf/routes file as follows:
# Registration page POST /registration/save/passenger controllers.RegistrationController.savePassengerRegistration()
Registration controller
So now that we have talked about the JSON Reads/Writes and we know that we need Mongo downloaded and running (see how do I run this section at start), lets see what the actual controller looks like shall we. Here is the FULL code for the Passenger Registration portion of the PlayBackEndApi/app/controllers/RegistrationController.scala
package controllers import javax.inject.Inject import play.api.mvc.{Action, Controller, Result} import entities._ import entities.DriverRegistrationJsonFormatters._ import entities.PassengerRegistrationJsonFormatters._ import scala.concurrent.{ExecutionContext, Future} import play.modules.reactivemongo._ import play.api.Logger import utils.Errors import play.api.libs.json._ import reactivemongo.api.ReadPreference import reactivemongo.play.json._ import collection._ class RegistrationController @Inject() (val reactiveMongoApi: ReactiveMongoApi) (implicit ec: ExecutionContext) extends Controller with MongoController with ReactiveMongoComponents { def passRegistrationFuture: Future[JSONCollection] = database.map(_.collection[JSONCollection]("passenger-registrations")) def driverRegistrationFuture: Future[JSONCollection] = database.map(_.collection[JSONCollection]("driver-registrations")) def savePassengerRegistration = Action.async(parse.json) { request => Json.fromJson[PassengerRegistration](request.body) match { case JsSuccess(newPassRegistration, _) => //https://github.com/ReactiveMongo/ReactiveMongo-Extensions/blob/0.10.x/guide/dsl.md val query = Json.obj("email" -> Json.obj("$eq" -> newPassRegistration.email)) dealWithRegistration[PassengerRegistration]( newPassRegistration, passRegistrationFuture, query, PassengerRegistration.formatter) case JsError(errors) => Future.successful(BadRequest("Could not build a PassengerRegistration from the json provided. " + Errors.show(errors))) } } private def dealWithRegistration[T]( incomingRegistration: T, jsonCollectionFuture: Future[JSONCollection], query: JsObject, formatter: OFormat[T]) (implicit ec: ExecutionContext): Future[Result] = { def hasExistingRegistrationFuture = jsonCollectionFuture.flatMap { //http://reactivemongo.org/releases/0.11/documentation/advanced-topics/collection-api.html _.find(query) .cursor[JsObject](ReadPreference.primary) .collect[List]() }.map(_.length match { case 0 => false case _ => true } ) hasExistingRegistrationFuture.flatMap { case false => { for { registrations <- jsonCollectionFuture writeResult <- registrations.insert(incomingRegistration)(formatter,ec) } yield { Logger.debug(s"Successfully inserted with LastError: $writeResult") Ok(Json.obj()) } } case true => Future(BadRequest("Registration already exists")) } } }
Lets break this down into chunks
- The controller constructor
- This takes a
ReactiveMongoApi
(this is mandatory to satisfy the base traitMongoController
requirements) - Inherits from
MongoController
which provides a lot of use functionality - It also inherits from
ReactiveMongoComponents
in order to allow the cake pattern/self typing requirements of the baseMongoController
which expects aReactiveMongoComponents
- This takes a
- The use of
JSONCollection
- There is a
Future[JSONCollection]
that represents the passenger collection in Mongo. This is a collection that stores JSON. When using reactive Mongo you have a choice about whether to use the standard BSON collections of JSON. I opted for JSON
- There is a
- The Guts Of The Logic
- So now we have discussed the controller constructor and the Mongo collections. We just need to talk about the actual work that happens on registration. In a nutshell it works like this
- The incoming JSON string is turned into a
PassengerRegistration
object via Play - We then create a new JSON query object to query the Mongo
JSONCollection
to see if a registration already exists - If a registration already exists we exit with a
BadRequest
output - If a registration does NOT already exist we insert the new registration details into the Mongo
JSONCollection
, and the we return anOk
output
- The incoming JSON string is turned into a
- So now we have discussed the controller constructor and the Mongo collections. We just need to talk about the actual work that happens on registration. In a nutshell it works like this
And that is how the Passenger Registration works.
Driver registration
The driver registration works in much the same way as described above, its just slightly different JSON, but it does share the same core logic/controller as the Passenger Registration
Login workflow
The login uses the pieces shown above.
This workflow is VERY similar to registration, so feel free to skip this section if you think you understand how registration concepts work
Login React component
This is the main PlayBackEndApi/FrontEndWebSite/src/Login.tsx
file for the React TSX representing the login component.
import * as React from "react"; import * as ReactDOM from "react-dom"; import { OkDialog } from "./components/OkDialog"; import 'bootstrap/dist/css/bootstrap.css'; import { Well, Grid, Row, Col, ButtonInput } from "react-bootstrap"; import { Form, ValidatedInput } from 'react-bootstrap-validation'; import revalidator from 'revalidator'; import { AuthService } from "./services/AuthService"; let schema = { properties: { email: { type: 'string', maxLength: 255, format: 'email', required: true, allowEmpty: false }, password: { type: 'string', minLength: 8, maxLength: 60, required: true, allowEmpty: false } } }; export interface LoginState { okDialogOpen: boolean; okDialogKey: number; okDialogHeaderText: string; okDialogBodyText: string; } export class Login extends React.Component<undefined, LoginState> { private _authService: AuthService; constructor(props: any) { super(props); console.log(props); this._authService = props.route.authService; this.state = { okDialogHeaderText: '', okDialogBodyText: '', okDialogOpen: false, okDialogKey: 0 }; } render() { return ( <Well className="outer-well"> <Form // Supply callbacks to both valid and invalid // submit attempts validateAll={this.validateForm} onInvalidSubmit={this.handleInvalidSubmit} onValidSubmit={this.handleValidSubmit}> <Grid> <Row className="show-grid"> <Col xs={10} md={6}> <h4>ENTER YOUR LOGIN DETAILS</h4> <span><h6>Or click <a href="#/register">here</a> to register</h6></span> </Col> </Row> <Row className="show-grid"> <Col xs={10} md={6}> <ValidatedInput type='text' label='Email' name='email' errorHelp='Email address is invalid' /> </Col> </Row> <Row className="show-grid"> <Col xs={10} md={6}> <ValidatedInput type='password' name='password' label='Password' errorHelp='Password is invalid' /> </Col> </Row> <Row className="show-grid"> <Col xs={10} md={6}> <ValidatedInput type='checkbox' name='isDriver' label='Are you a driver?' /> </Col> </Row> <Row className="show-grid"> <Col xs={10} md={6}> <ButtonInput id="loginBtn" type='submit' bsSize='small' bsStyle='primary' value='Register'>Login</ButtonInput> </Col> </Row> <Row className="show-grid"> <span> <OkDialog open={this.state.okDialogOpen} okCallBack={this.okDialogCallBack} headerText={this.state.okDialogHeaderText} bodyText={this.state.okDialogBodyText} key={this.state.okDialogKey} /> </span> </Row> </Grid> </Form> </Well> ) } validateForm = (values) => { let res = revalidator.validate(values, schema); // If the values passed validation, we return true if (res.valid) { return true; } // Otherwise we should return an object containing errors // e.g. { email: true, password: true } return res.errors.reduce((errors, error) => { // Set each property to either true or // a string error description errors[error.property] = true; return errors; }, {}); } handleInvalidSubmit = (errors, values) => { console.log(values); // Errors is an array containing input names // that failed to validate this.setState( { okDialogHeaderText: 'Validation Error', okDialogBodyText: 'Form has errors and may not be submitted', okDialogOpen: true, okDialogKey: Math.random() }); } handleValidSubmit = (values) => { var logindetails = values; var self = this; $.ajax({ type: 'POST', url: 'login/validate', data: JSON.stringify(logindetails), contentType: "application/json; charset=utf-8", dataType: 'json' }) .done(function (jdata, textStatus, jqXHR) { console.log("result of login"); console.log(jqXHR.responseText); let currentUser = JSON.parse(jqXHR.responseText); let userProfile = { isDriver: logindetails.isDriver, user: currentUser }; self._authService.storeUser(userProfile); self.setState( { okDialogHeaderText: 'Login Successful', okDialogBodyText: 'You are now logged in', okDialogOpen: true, okDialogKey: Math.random() }); }) .fail(function (jqXHR, textStatus, errorThrown) { self.setState( { okDialogHeaderText: 'Error', okDialogBodyText: jqXHR.responseText, okDialogOpen: true, okDialogKey: Math.random() }); }); } okDialogCallBack = () => { this.setState( { okDialogOpen: false }); } }
There are a couple of things of note there
- That we use a standard POST
- That will post the login data as JSON to the Play API backend code
- That we use the react-bootstrap-validation package for field validation
- That if the login process is okay we store the user data in the
AuthorisationService.ts
that we saw earlier, where it in turn writes to the browsersessionStorage
Lets now turn our attention to the Play API backend code that goes with the Login
Json Support
We need to start with the JSON support from JavaScript to Scala. This is done using this file PlayBackEndApi/app/Entities/LoginEntities.scala
, where we use Play framework JSON combinators
- Reads : Which allows reading a JSON string into a Scala object
- Writes : Which allows a Scala object to be turned into a JSON string
- Format : is just a mix of the Reads and Writes Traits and can be used for implicit conversion in place of its components.
The recommendation for both of these is that they are exposed as implicit vals you can read more about it here :https://www.playframework.com/documentation/2.6.x/ScalaJsonCombinators
package entities import play.api.libs.json._ import play.api.libs.functional.syntax._ case class Login(email: String, password: String, isDriver: Boolean) object Login { implicit val formatter = Json.format[Login] } object LoginJsonFormatters { implicit val loginWrites = new Writes[Login] { def writes(login: Login) = Json.obj( "email" -> login.email, "password" -> login.password, "isDriver" -> login.isDriver ) } implicit val loginReads: Reads[Login] = ( (JsPath \ "email").read[String] and (JsPath \ "password").read[String] and ((JsPath \ "isDriver").read[Boolean]) )(Login.apply _) }
Login route
Once we have that in place we need to turn our attention to the actual endpoint to support the POST of a Login object. We first need to set up the route in the conf/routes file as follows:
# Login page POST /login/validate controllers.LoginController.validateLogin()
Login controller
So now that we have talked about the JSON Reads/Writes and we know that we need Mongo downloaded and running (see how do I run this section at start), lets see what the actual controller looks like shall we. Here is the FULL code for the PlayBackEndApi/app/controllers/LoginController.scala
package controllers import javax.inject.Inject import entities.DriverRegistrationJsonFormatters._ import entities.PassengerRegistrationJsonFormatters._ import entities._ import play.api.Logger import play.api.libs.json._ import play.api.mvc.{Action, Controller, Result} import play.modules.reactivemongo._ import reactivemongo.api.ReadPreference import reactivemongo.play.json._ import reactivemongo.play.json.collection._ import utils.Errors import scala.concurrent.{ExecutionContext, Future} class LoginController @Inject() (val reactiveMongoApi: ReactiveMongoApi) (implicit ec: ExecutionContext) extends Controller with MongoController with ReactiveMongoComponents { def passRegistrationFuture: Future[JSONCollection] = database.map(_.collection[JSONCollection]("passenger-registrations")) def driverRegistrationFuture: Future[JSONCollection] = database.map(_.collection[JSONCollection]("driver-registrations")) def validateLogin = Action.async(parse.json) { request => Json.fromJson[Login](request.body) match { case JsSuccess(newLoginDetails, _) => newLoginDetails.isDriver match { case false => { val maybePassengerReg = extractExistingRegistration( passRegistrationFuture.flatMap { _.find(Json.obj("email" -> Json.obj("$eq" -> newLoginDetails.email))). cursor[JsObject](ReadPreference.primary). collect[List]() }) returnRedactedRegistration[PassengerRegistration]( maybePassengerReg, (reg: PassengerRegistration) => Ok(Json.toJson(reg.copy(password = ""))) ) } case true => { val maybeDriverReg = extractExistingRegistration( driverRegistrationFuture.flatMap { _.find(Json.obj("email" -> Json.obj("$eq" -> newLoginDetails.email))). cursor[JsObject](ReadPreference.primary). collect[List]() }) returnRedactedRegistration[DriverRegistration]( maybeDriverReg, (reg: DriverRegistration) => Ok(Json.toJson(reg.copy(password = ""))) ) } } case JsError(errors) => Future.successful(BadRequest("Could not build a Login from the json provided. " + Errors.show(errors))) } } private def returnRedactedRegistration[T] ( maybeDriverRegFuture: Future[Option[JsObject]], redactor : T => Result )(implicit reads: Reads[T]): Future[Result] = { maybeDriverRegFuture.map { case Some(json) => { val reg = Json.fromJson[T](json) reg match { case JsSuccess(reg, _) => { redactor(reg) } case _ => BadRequest("Login already exists") } } case None => BadRequest("Could not find login") } } private def extractExistingRegistration[T] (incomingRegistrations: Future[List[T]]) (implicit writes: Writes[T], ec: ExecutionContext): Future[Option[T]] = { incomingRegistrations.map(matchedRegistrations => matchedRegistrations.length match { case 0 => None case _ => Some(matchedRegistrations(0)) } ) } }
This is similar enough in nature to the registration process, so I will not go into this any further.
CreateJob workflow
The CreateJob uses the pieces shown above, and is intended to work like this
CreatJob React component
This is the code for the CreateJob
React component
import * as React from "react"; import * as ReactDOM from "react-dom"; import * as _ from "lodash"; import Measure from 'react-measure' import { OkDialog } from "./components/OkDialog"; import 'bootstrap/dist/css/bootstrap.css'; import { Well, Grid, Row, Col, ButtonInput, ButtonGroup, Button } from "react-bootstrap"; import { AuthService } from "./services/AuthService"; import { JobService } from "./services/JobService"; import { PositionService } from "./services/PositionService"; import { UUIDService } from "./services/UUIDService"; import { Position } from "./domain/Position"; import { hashHistory } from 'react-router'; import { withGoogleMap, GoogleMap, Marker, InfoBox, OverlayView } from "react-google-maps"; const STYLES = { overlayView: { background: `white`, border: `1px solid #ccc`, padding: 15, }, icon: { marginTop: 5, marginBottom: 5, marginLeft: 20 } } const GetPixelPositionOffset = (width, height) => { return { x: -(width / 2), y: -(height / 2) }; } const CreateJobGoogleMap = withGoogleMap(props => ( <GoogleMap ref={props.onMapLoad} defaultZoom={16} defaultCenter={{ lat: 50.8202949, lng: -0.1406958 }} onClick={props.onMapClick}> <OverlayView key='createJobKey' mapPaneName={OverlayView.OVERLAY_MOUSE_TARGET} position={props.currentPosition} getPixelPositionOffset={GetPixelPositionOffset}> <div style={STYLES.overlayView}> <img style={STYLES.icon} src='/assets/images/passenger.png' /> <br /> <Button type='button' bsSize='xsmall' bsStyle='primary' onClick={() => props.onCreateJobClick()} disabled={props.hasIssuedJob} value='Create Job'>Create Job</Button> </div> </OverlayView> </GoogleMap> )); export interface CreateJobState { currentPosition: Position; dimensions: { width: number, height: number }; hasIssuedJob: boolean; okDialogOpen: boolean; okDialogKey: number; okDialogHeaderText: string; okDialogBodyText: string; wasSuccessful: boolean; } export class CreateJob extends React.Component<undefined, CreateJobState> { private _authService: AuthService; private _jobService: JobService; private _positionService: PositionService; constructor(props: any) { super(props); this._jobService = props.route.jobService; this._authService = props.route.authService; this._positionService = props.route.positionService; console.log(this._authService.userName()); console.log(this._authService.userEmail()); console.log("CreateJob ctor"); console.log(this._jobService); if (!this._authService.isAuthenticated()) { hashHistory.push('/'); } if (this._authService.isDriver()) { hashHistory.push('/viewjob'); } this.state = { currentPosition: new Position(50.8202949, -0.1406958), dimensions: { width: -1, height: -1 }, hasIssuedJob: this._jobService.hasIssuedJob(), okDialogHeaderText: '', okDialogBodyText: '', okDialogOpen: false, okDialogKey: 0, wasSuccessful: false }; } render() { const adjustedwidth = this.state.dimensions.width; return ( <Well className="outer-well"> <Grid> <Row className="show-grid"> <Col xs={10} md={6}> <h4>SET YOUR CURRENT LOCATION</h4> <h6>Click the map to set your current location</h6> </Col> </Row> <Row className="show-grid"> <Col xs={10} md={6}> <Measure bounds onResize={(contentRect) => { this.setState({ dimensions: contentRect.bounds }) }} > {({ measureRef }) => <div ref={measureRef}> <CreateJobGoogleMap containerElement={ <div style={{ position: 'relative', top: 0, left: 0, right: 0, bottom: 0, justifyContent: 'flex-end', alignItems: 'center', width: { adjustedwidth }, height: 600, marginTop: 20, marginLeft: 0, marginRight: 0, marginBottom: 20 }} /> } mapElement={ <div style={{ position: 'relative', top: 0, left: 0, right: 0, bottom: 0, width: { adjustedwidth }, height: 600, marginTop: 20, marginLeft: 0, marginRight: 0, marginBottom: 20 }} /> } onMapLoad={this.handleMapLoad} onMapClick={this.handleMapClick} currentPosition={this.state.currentPosition} onCreateJobClick={this.handleCreateJobClick} hasIssuedJob={this.state.hasIssuedJob} /> </div> } </Measure> </Col> </Row> <Row className="show-grid"> <span> <OkDialog open={this.state.okDialogOpen} okCallBack={this.okDialogCallBack} headerText={this.state.okDialogHeaderText} bodyText={this.state.okDialogBodyText} key={this.state.okDialogKey} /> </span> </Row> </Grid> </Well> ); } handleCreateJobClick = () => { var self = this; var currentUser = this._authService.user(); var newJob = { jobUUID: UUIDService.createUUID(), clientFullName: currentUser.fullName, clientEmail: currentUser.email, clientPosition: { latitude: self.state.currentPosition.latitude, longitude: self.state.currentPosition.longitude }, driverFullName: '', driverEmail: '', vehicleDescription: '', vehicleRegistrationNumber: '', isAssigned: false, isCompleted: false } $.ajax({ type: 'POST', url: 'job/submit', data: JSON.stringify(newJob), contentType: "application/json; charset=utf-8", dataType: 'json' }) .done(function (jdata, textStatus, jqXHR) { self._jobService.storeUserIssuedJob(newJob); const newState = Object.assign({}, self.state, { hasIssuedJob: self._jobService.hasIssuedJob() }); self.setState(newState) self._positionService.storeUserPosition(self.state.currentPosition); hashHistory.push('/viewjob'); }) .fail(function (jqXHR, textStatus, errorThrown) { const newState = Object.assign({}, self.state, { okDialogHeaderText: 'Error', okDialogBodyText: jqXHR.responseText, okDialogOpen: true, okDialogKey: Math.random() }) self.setState(newState) }); } okDialogCallBack = () => { this.setState( { okDialogOpen: false }); } handleMapLoad = (map) => { if (map) { console.log(map.getZoom()); } } handleMapClick = (event) => { const newState = Object.assign({}, this.state, { currentPosition: new Position(event.latLng.lat(), event.latLng.lng()) }) this.setState(newState) } }
As you can see this component makes use of a Google Maps component. I chose this one to use :https://tomchentw.github.io/react-google-maps/.
One thing to note is that as I wanted the map to be responsive but the Google Map component I chose needed to have a fixed size, I had to use another React library to allow me to measure the screen at runtime, and adjust the React Google Maps component on the fly. This Measure package is this one :https://github.com/souporserious/react-measure which allows you to decorate/wrap your component in a measure type component.
As before we send the actual job JSON payload, but before doing that we need to allow the user to specify their position such that the position can be retrieved later
Once the client sets their OWN position, they are able to create a job, and push out a new job. If they already have a job in flight the client is NOT able to create a new job
To deal with the users current position, I also created this simple domain object and service class
export class Position { //my JSON API prefers nice names latitude: number; longitude: number; //map component wants these abbreviated names lat: number; lng: number; constructor(latitude: number, longitude: number) { this.latitude = latitude; this.longitude = longitude; //keep map happy this.lat = latitude; this.lng = longitude; } }
import { injectable, inject } from "inversify"; import { Position } from "../domain/Position"; import { PositionMarker } from "../domain/PositionMarker"; import { TYPES } from "../../src/types"; import { AuthService } from "./AuthService"; @injectable() export class PositionService { private _authService: AuthService; constructor( @inject(TYPES.AuthService) authService: AuthService) { this._authService = authService; } clearUserJobPositions = (): void => { let keyCurrentUserJobPositions = 'currentUserJobPositions_' + this._authService.userEmail(); sessionStorage.removeItem(keyCurrentUserJobPositions); } storeUserJobPositions = (jobPositions: Array<PositionMarker>): void => { if (jobPositions == null || jobPositions == undefined) return; let currentUsersJobPositions = { currentUser: this._authService.user(), jobPositions: jobPositions } let keyCurrentUserJobPositions = 'currentUserJobPositions_' + this._authService.userEmail(); sessionStorage.setItem(keyCurrentUserJobPositions, JSON.stringify(currentUsersJobPositions)); } userJobPositions = (): Array<PositionMarker> => { let keyCurrentUserJobPositions = 'currentUserJobPositions_' + this._authService.userEmail(); var currentUserJobPositions = JSON.parse(sessionStorage.getItem(keyCurrentUserJobPositions)); return currentUserJobPositions.jobPositions; } hasJobPositions = (): boolean => { let keyCurrentUserJobPositions = 'currentUserJobPositions_' + this._authService.userEmail(); var currentUserJobPositions = JSON.parse(sessionStorage.getItem(keyCurrentUserJobPositions)); return currentUserJobPositions != null && currentUserJobPositions != undefined; } clearUserPosition = (): void => { let keyCurrentUserPosition = 'currentUserPosition_' + this._authService.userEmail(); sessionStorage.removeItem(keyCurrentUserPosition); } storeUserPosition = (position: Position): void => { if (position == null || position == undefined) return; let currentUsersPosition = { currentUser: this._authService.user(), position: position } let keyCurrentUserPosition = 'currentUserPosition_' + this._authService.userEmail(); sessionStorage.setItem(keyCurrentUserPosition, JSON.stringify(currentUsersPosition)); } currentPosition = (): Position => { let keyCurrentUserPosition = 'currentUserPosition_' + this._authService.userEmail(); var currentUsersPosition = JSON.parse(sessionStorage.getItem(keyCurrentUserPosition)); return currentUsersPosition.position; } hasPosition = (): boolean => { let keyCurrentUserPosition = 'currentUserPosition_' + this._authService.userEmail(); var currentUsersPosition = JSON.parse(sessionStorage.getItem(keyCurrentUserPosition)); return currentUsersPosition != null && currentUsersPosition != undefined; } }
JSON Payload
As before we send this across to the Play back end using this route
POST /job/submit controllers.JobController.submitJob()
JobController
Ok so we now know that we have a new endpoint that can accept a job JSON object. What does it do with this Job JSON object. Well quite simply it does this
- Converts the JSON into a Scala object
- Sends it out over Kafka using Reactive Kafka publisher
You may be asking yourself why we want to burden ourselves with Kafka here at all if all we are going to do is get a Job JSON payload in them send it out via Kafka only to have it come back in via Kafka. This seems weird, so why bother? The reason we want to involve Kafka here, is for the audit an commit log facility that it provided. We want a record of the events and thats what Kafka gives us, a nice append only log
Anyway what does the new endpoint code look like that accepts the job. Here it is
package controllers import javax.inject.Inject import entities.Job import entities.JobJsonFormatters._ import entities._ import actors.job.{JobConsumerActor, JobProducerActor} import akka.actor.{ActorSystem, OneForOneStrategy, Props, SupervisorStrategy} import akka.pattern.{Backoff, BackoffSupervisor} import akka.stream.scaladsl.{BroadcastHub, Keep, MergeHub} import akka.stream.{ActorMaterializer, ActorMaterializerSettings, Supervision} import play.api.http.ContentTypes import play.api.libs.Comet import play.api.libs.json._ import play.api.libs.json.Json import play.api.libs.json.Format import play.api.libs.json.JsSuccess import play.api.libs.json.Writes import play.api.mvc.{Action, Controller} import utils.Errors import scala.concurrent.{ExecutionContext, Future} import scala.util.Random import scala.concurrent.duration._ class JobController @Inject() ( implicit actorSystem: ActorSystem, ec: ExecutionContext ) extends Controller { val rand = new Random() //Error handling for streams //http://doc.akka.io/docs/akka/2.5.2/scala/stream/stream-error.html val decider: Supervision.Decider = { case _ => Supervision.Restart } implicit val mat = ActorMaterializer( ActorMaterializerSettings(actorSystem).withSupervisionStrategy(decider)) val (sink, source) = MergeHub.source[JsValue](perProducerBufferSize = 16) .toMat(BroadcastHub.sink(bufferSize = 256))(Keep.both) .run() //job producer val childJobProducerActorProps = Props(classOf[JobProducerActor],mat,ec) val jobProducerSupervisorProps = createBackoffSupervisor(childJobProducerActorProps, s"JobProducerActor_${rand.nextInt()}") val jobProducerSupervisorActorRef = actorSystem.actorOf(jobProducerSupervisorProps, name = "jobProducerSupervisor") //job consumer val childJobConsumerActorProps = Props(new JobConsumerActor(sink)(mat,ec)) val jobConsumerSupervisorProps = createBackoffSupervisor(childJobConsumerActorProps, s"JobConsumerActor_${rand.nextInt()}") val jobConsumerSupervisorActorRef = actorSystem.actorOf(jobConsumerSupervisorProps, name = "jobConsumerSupervisor") jobConsumerSupervisorActorRef ! Init def streamedJob() = Action { Ok.chunked(source via Comet.json("parent.jobChanged")).as(ContentTypes.HTML) } def submitJob = Action.async(parse.json) { request => Json.fromJson[Job](request.body) match { case JsSuccess(job, _) => { jobProducerSupervisorActorRef ! job Future.successful(Ok(Json.toJson(job.copy(clientEmail = job.clientEmail.toUpperCase)))) } case JsError(errors) => Future.successful(BadRequest("Could not build a Job from the json provided. " + Errors.show(errors))) } } private def createBackoffSupervisor(childProps:Props, actorChildName: String) : Props = { BackoffSupervisor.props( Backoff.onStop( childProps, childName = actorChildName, minBackoff = 3.seconds, maxBackoff = 30.seconds, randomFactor = 0.2 ).withSupervisorStrategy( OneForOneStrategy() { case _ => SupervisorStrategy.Restart }) ) } }
There is a fair bit going on in that code. Lets dissect it a bit
- We create a backoff supervisor for both the Kafka producer/consumer actors
- We create a stream that is capable of writing to the Comet frame socket
- We provide the sink side (MergeHub) of the stream to the consumer actor, such that when it reads a value from Kafka it will be pumped into the sink which will then travel through the Akka stream back to the web page via the BroadcastHub and Comet forever frame back to the HTML (and ultimately RxJs Subject)
Push the consumed Job out of the forever frame (Comet functionality in Play backend)
Okay, so we just saw how the 2 actors are created under back off supervisors, and how the consumer (the one that reads from Kafka) gets the ability to essentially write back to the forever frame in the HTML which is like this
<iframe id="comet" src="/job/streamedJob"></iframe>
Where the actual route for Play framework is configured as this
So how does the job go out into Kafka land?
That part is quite simple, here it is
GET /job/streamedJob controllers.JobController.streamedJob()
And this is the relevant part of the JobController
that deals with exposing the comet stream to the browser
val (sink, source) = MergeHub.source[JsValue](perProducerBufferSize = 16) .toMat(BroadcastHub.sink(bufferSize = 256))(Keep.both) .run() .... .... .... .... def streamedJob() = Action { Ok.chunked(source via Comet.json("parent.jobChanged")).as(ContentTypes.HTML) }
So diving back into what happens when a new Job comes in, we can see that this goes through the JobProducerActor
below
ackage actors.job import kafka.topics.JobTopics import serialization.JSONSerde import akka.Done import akka.actor.{Actor, PoisonPill} import akka.kafka.ProducerSettings import akka.kafka.scaladsl.Producer import akka.stream.scaladsl.{Keep, MergeHub, Source} import akka.stream.{ActorMaterializer, KillSwitches} import entities.Job import org.apache.kafka.clients.producer.ProducerRecord import org.apache.kafka.common.serialization.{ByteArraySerializer, StringSerializer} import utils.Settings import scala.concurrent.ExecutionContext import scala.util.{Failure, Success} class JobProducerActor( implicit materializer: ActorMaterializer, ec: ExecutionContext ) extends Actor { val jSONSerde = new JSONSerde[Job] val jobProducerSettings = ProducerSettings( context.system, new StringSerializer, new ByteArraySerializer) .withBootstrapServers(s"${Settings.bootStrapServers}") val ((mergeHubSink, killswitch), kafkaSourceFuture) = MergeHub.source[Job](perProducerBufferSize = 16) .map(job => { val jobBytes = jSONSerde.serializer().serialize("", job) (job, jobBytes) }) .map { jobWithBytes => val (job, jobBytes) = jobWithBytes new ProducerRecord[String, Array[Byte]]( JobTopics.JOB_SUBMIT_TOPIC, job.clientEmail, jobBytes) } .viaMat(KillSwitches.single)(Keep.both) .toMat(Producer.plainSink(jobProducerSettings))(Keep.both) .run() kafkaSourceFuture.onComplete { case Success(value) => println(s"Got the callback, value = $value") case Failure(e) => { self ! PoisonPill } } override def postStop(): Unit = { super.postStop() println(s"JobProducerActor seen 'Done'") killswitch.shutdown() } override def receive: Receive = { case (job: Job) => { println(s"JobProducerActor seen ${job}") Source.single(job).runWith(mergeHubSink) } case Done => { println(s"JobProducerActor seen 'Done'") killswitch.shutdown() self ! PoisonPill } } }
Ill be honest there is a fair bit going on in that small chunk of code above. What is happening exactly?
- The most important point is that we simply use the actor as a vessel to host a reactive Kafka Akka stream
RunnableGraph
representing a Graph ofMergeHub
> Reactive Kafka producer sink. This is completely fine and a normal thing to do. Discussing Akka streams is out of scope for this article but if you want to know more you can read more on a previous post I did here : https://sachabarbs.wordpress.com/2016/12/13/akka-streams/ - So we now know this actor hosts a stream, but the stream could fail, or the actor could fail. So what we want is if the actor fails the stream is stopped, and if the stream fails the actor is stopped. To do that we need to do a couple of thing
- STREAM FAILING : Since the
RunnableGraph
can return aFuture[T]
we can hook a callback Success/Failure on that, and send aPoisonPill
to the hosting actor. Then the supervisor actor we saw above would kick in and try and create a new instance of this actor. Another thing to note is that the stream hosted in this actor uses theActorMaterializer
that was supplied by theJobController
, where we provided a restart supervision strategy for the stream. - ACTOR FAILING : If the actor itself fails the Akka framework will call the postStop() method, at which point we want to shutdown the stream within this actor. So how can we shutdown the hosted stream? Well see in the middle of the stream setup there is this line
.viaMat(KillSwitches.single)(Keep.both)
this allows us to get a killswitch from the materialized values for the stream. Once we have a KillSwitch we can simply call its shutDown() method. - BELTS AND BRACES : I have also provided a way for the outside world to shutdown this actor and its hosted stream. This is via sending this actor a
Done
message. I have not put this in yet, but the hook is there to demonstrate how you could do this.
- STREAM FAILING : Since the
- We can see that there is a
MergeHub
source which allows external code to push stuff through theMergeHub
via the materialized Sink value from within the actor - We can also see that the
Job
object that the actor sees is indeed pushed into theMergeHub
materialized Sink via this actor, and then some transformation is done on it, to grab its raw bytes - We can see the final stage in the
RunnableGraph
is the Reactive KafkaProducer.plainSink
. Which would result in a message being pushed out to a Kafka topic from the hosted stream, pushedJob
object from this actor into the stream And I think that is the main set of points about how this actor works
Consume the Job over a Kafka Topic (using Akka Streams / Reactive Kafka)
Lets see the JobConsumerActor
which takes a Akka Stream Sink (MergeHub
from JobController
) and pushes the value out to it, when it sees a new value from Kafka on the job topic job-submit-topic. This then travels through the Akka stream where it goes via the BroadcastHub
out to the forever from in the HTML.
Here is the code, it may look scary but really its just reading a value of the Kafka topic and pushing it out via the Sink (MergeHub
) where the sink is hooked up to a forever frame that Play framework supports. Where the forever frame is open and waiting for data to be sent to it via a comet endpoint which you can read more about here :https://www.playframework.com/documentation/2.6.x/ScalaComet
package actors.job import entities.{Job, Init} import kafka.topics.JobTopics import serialization.JSONSerde import akka.{Done, NotUsed} import akka.actor.{Actor, ActorSystem, PoisonPill} import akka.kafka.{ConsumerSettings, ProducerSettings, Subscriptions} import akka.kafka.scaladsl.{Consumer, Producer} import akka.stream.scaladsl.{Keep, MergeHub, Sink, Source} import akka.stream.{ActorMaterializer, KillSwitches} import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.clients.producer.ProducerRecord import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer, StringDeserializer, StringSerializer} import play.api.libs.json.{JsValue, Json} import utils.Settings import scala.concurrent.ExecutionContext import scala.util.{Failure, Success} //TODO : This actor shouls take in a way of pushing back to Websocket class JobConsumerActor (val sink:Sink[JsValue, NotUsed]) (implicit materializer: ActorMaterializer, ec: ExecutionContext ) extends Actor { val jSONSerde = new JSONSerde[Job] val jobConsumerSettings = ConsumerSettings( context.system,new StringDeserializer(),new ByteArrayDeserializer()) .withBootstrapServers(s"${Settings.bootStrapServers}") .withGroupId("group1") .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") val ((_, killswitch), kafkaConsumerFuture) = Consumer.committableSource(jobConsumerSettings, Subscriptions.topics(JobTopics.JOB_SUBMIT_TOPIC)) .mapAsync(1) { msg => { val jobBytes = msg.record.value val job = jSONSerde.deserializer().deserialize(JobTopics.JOB_SUBMIT_TOPIC,jobBytes) self ! job msg.committableOffset.commitScaladsl() } } .viaMat(KillSwitches.single)(Keep.both) .toMat(Sink.last)(Keep.both) .run() kafkaConsumerFuture.onComplete { case Success(value) => println(s"Got the callback, value = $value") case Failure(e) => { self ! PoisonPill } } override def postStop(): Unit = { super.postStop() println(s"JobConsumerActor seen 'Done'") killswitch.shutdown() } override def receive: Receive = { case (job: Job) => { println(s"JobConsumerActor seen ${job}") val finalJsonValue = Json.toJson(job) Source.single(finalJsonValue).runWith(sink) } case Done => { println(s"JobConsumerActor seen 'Done'") killswitch.shutdown() self ! PoisonPill } case Init => { println("JobConsumerActor saw init") } } }
Have a new RxJs based Observable over the comet based forever frame, and ensure that is working
So at the end of the pipeline, we have a forever frame in the browser (always available within Index.html
) that we wish to get events from. Ideally we want to turn this rather bland event into a better RxJs Observable, so how do we do that?
Its quite simple we use this little service that is able to create a new Observable
from the incoming event for us, which if you recall the Play server side stuff was like this
Ok.chunked(source via Comet.json("parent.jobChanged")).as(ContentTypes.HTML)
So this is how we turn that into a nice RxJS Observable
import { injectable, inject } from "inversify"; import { JobEventArgs } from "../domain/JobEventArgs"; import Rx from 'rx'; @injectable() export class JobStreamService { private _jobSourceObservable: Rx.Observable<any>; constructor() { } init = (): void => { window['jobChanged'] = function (incomingJsonPayload: any) { let evt = new CustomEvent('onJobChanged', new JobEventArgs(incomingJsonPayload)); window.dispatchEvent(evt); } this._jobSourceObservable = Rx.Observable.fromEvent(window, 'onJobChanged'); } getJobStream = (): Rx.Observable<any> => { return this._jobSourceObservable; } }
Where the JobEventArgs
looks like this
export class JobEventArgs { detail: any; constructor(detail: any) { this.detail = detail; } }
We can this use this service in other code and subscribe to this RxJs Observable that the above service exposes. Here is an example of subscribing to it.
componentWillMount() { this._subscription = this._jobStreamService.getJobStream() .subscribe( jobArgs => { console.log('RX saw onJobChanged'); console.log('RX x = ', jobArgs.detail); }, error => { console.log('RX saw ERROR'); console.log('RX error = ', error); }, () => { console.log('RX saw COMPLETE'); } ); }
We will see much more of the RX stuff in the next part of this article, but for now just all you need to know is that there is an injectable service that you may use to listen to the Job <code>
Observable
Final diagram to help you solidify this section
There is a lot going on above so I thought a final diagram may help
Okay, now would be a good time to get a cup of coffee, as although the next 2 sections build upon what we have just seen, there are still quite a lot of moving pieces to cover, especially the Kafka Streams / Interactive Queries stuff which we have yet to look at. So go on treat yourself to a nice strong coffee.
ViewJob workflow
The ViewJob
uses the pieces shown above, and is intended to work like this
ViewJob react component
As can be seen above this is after a passenger has opened a browser session, and 2 other users (acting as drivers) have pushed through their positions, obviously this is just one view (the passenger view), to fully understand what the other users would see in their browser session we will walk through a scenario below.
So what should the View Job page do?
This page should do the following things
- If a passenger sends out a job it should be seen by ANY driver that is logged in (providing the job is not already assigned to a driver)
- Positions updates from a passenger to drivers (that know about the passenger) should show the new passenger position
- When a driver pushes out (single laptop requires that users click on map to make their own position known to others) their new position that the client sees that and updates the driver marker accordingly
- That a passenger can accept a driver for a job
- That a driver can not accept a job from a passenger
- That once a job is paired between passenger/driver only those 2 markers will be shown if you are either of these users
- That once a job is paired between passenger/driver AND YOU ARE NOT ONE OF THESE USERS that you ONLY see your own markers
- That a job may be completed by passenger OR driver independently and that they are able to Rate each other
This page is fairly complex, and contains many helper methods that carry out the functions above, probably too many to show. So I will just show the skeleto of the code in this case, and then we will walk through some screen shots, and a scenario that exercises this code.
It should be noted that this area is the one that has the 2 bugs/issues that I have within this code. As I said at the start, I kind of did what I wanted to do in this article, and it demonstrated everything I wanted to show. So meh
import * as React from "react"; import * as ReactDOM from "react-dom"; import * as _ from "lodash"; import Measure from 'react-measure' import { RatingDialog } from "./components/RatingDialog"; import { YesNoDialog } from "./components/YesNoDialog"; import { OkDialog } from "./components/OkDialog"; import { AcceptList } from "./components/AcceptList"; import 'bootstrap/dist/css/bootstrap.css'; import { Well, Grid, Row, Col, ButtonInput, ButtonGroup, Button, Modal, Popover, Tooltip, OverlayTrigger } from "react-bootstrap"; import { AuthService } from "./services/AuthService"; import { JobService } from "./services/JobService"; import { JobStreamService } from "./services/JobStreamService"; import { PositionService } from "./services/PositionService"; import { Position } from "./domain/Position"; import { PositionMarker } from "./domain/PositionMarker"; import { hashHistory } from 'react-router'; import { withGoogleMap, GoogleMap, Marker, OverlayView } from "react-google-maps"; const STYLES = { overlayView: { background: `white`, border: `1px solid #ccc`, padding: 15, } } const GetPixelPositionOffset = (width, height) => { return { x: -(width / 2), y: -(height / 2) }; } const ViewJobGoogleMap = withGoogleMap(props => ( <GoogleMap ref={props.onMapLoad} defaultZoom={16} defaultCenter={{ lat: 50.8202949, lng: -0.1406958 }} onClick={props.onMapClick}> {props.markers.map((marker, index) => ( <OverlayView key={marker.key} mapPaneName={OverlayView.OVERLAY_MOUSE_TARGET} position={marker.position} getPixelPositionOffset={GetPixelPositionOffset}> <div style={STYLES.overlayView}> <img src={marker.icon} /> <strong>{marker.key}</strong> </div> </OverlayView> ))} </GoogleMap> )); export interface ViewJobState { markers: Array<PositionMarker>; okDialogOpen: boolean; okDialogKey: number; okDialogHeaderText: string; okDialogBodyText: string; dimensions: { width: number, height: number }, currentPosition: Position; isJobAccepted: boolean; finalActionHasBeenClicked: boolean; } type DoneCallback = (jdata: any, textStatus: any, jqXHR: any) => void export class ViewJob extends React.Component<undefined, ViewJobState> { private _authService: AuthService; private _jobService: JobService; private _jobStreamService: JobStreamService; private _positionService: PositionService; private _subscription: any; private _currentJobUUID: any; constructor(props: any) { super(props); this._authService = props.route.authService; this._jobStreamService = props.route.jobStreamService; this._jobService = props.route.jobService; this._positionService = props.route.positionService; if (!this._authService.isAuthenticated()) { hashHistory.push('/'); } let savedMarkers: Array<PositionMarker> = new Array<PositionMarker>(); if (this._positionService.hasJobPositions()) { savedMarkers = this._positionService.userJobPositions(); } this.state = { markers: savedMarkers, okDialogHeaderText: '', okDialogBodyText: '', okDialogOpen: false, okDialogKey: 0, dimensions: { width: -1, height: -1 }, currentPosition: this._authService.isDriver() ? null : this._positionService.currentPosition(), isJobAccepted: false, finalActionHasBeenClicked: false }; } componentWillMount() { var self = this; this._subscription = this._jobStreamService.getJobStream() .retry() .where(function (x, idx, obs) { return self.shouldShowMarkerForJob(x.detail); }) .subscribe( jobArgs => { console.log('RX saw onJobChanged'); console.log('RX x = ', jobArgs.detail); this._jobService.clearUserIssuedJob(); this._jobService.storeUserIssuedJob(jobArgs.detail); this.addMarkerForJob(jobArgs.detail); }, error => { console.log('RX saw ERROR'); console.log('RX error = ', error); }, () => { console.log('RX saw COMPLETE'); } ); } componentWillUnmount() { this._subscription.dispose(); this._positionService.storeUserJobPositions(this.state.markers); } render() { const adjustedwidth = this.state.dimensions.width; return ( <Well className="outer-well"> <Grid> <Row className="show-grid"> <Col xs={10} md={6}> <h4>CURRENT JOB</h4> </Col> </Row> <Row className="show-grid"> <Col xs={10} md={6}> <AcceptList markers={_.filter(this.state.markers, { isDriverIcon: true })} currentUserIsDriver={this._authService.isDriver()} clickCallback={this.handleMarkerClick} /> </Col> </Row> <Row className="show-grid"> <Col xs={10} md={6}> <Measure bounds onResize={(contentRect) => { this.setState({ dimensions: contentRect.bounds }) }}> {({ measureRef }) => <div ref={measureRef}> <ViewJobGoogleMap containerElement={ <div style={{ position: 'relative', top: 0, left: 0, right: 0, bottom: 0, width: { adjustedwidth }, height: 600, justifyContent: 'flex-end', alignItems: 'center', marginTop: 20, marginLeft: 0, marginRight: 0, marginBottom: 20 }} /> } mapElement={ <div style={{ position: 'relative', top: 0, left: 0, right: 0, bottom: 0, width: { adjustedwidth }, height: 600, marginTop: 20, marginLeft: 0, marginRight: 0, marginBottom: 20 }} /> } markers={this.state.markers} onMapClick={this.handleMapClick} /> </div> } </Measure> </Col> </Row> {this.state.isJobAccepted === true ? <Row className="show-grid"> <span> <RatingDialog theId="viewJobCompleteBtn" headerText="Rate your driver/passenger" okCallBack={this.ratingsDialogOkCallBack} actionPerformed={this.state.finalActionHasBeenClicked} /> {!(this._authService.isDriver() === true) ? <YesNoDialog theId="viewJobCancelBtn" launchButtonText="Cancel" actionPerformed={this.state.finalActionHasBeenClicked} yesCallBack={this.jobCancelledCallBack} noCallBack={this.jobNotCancelledCallBack} headerText="Cancel the job" /> : null } <OkDialog open={this.state.okDialogOpen} okCallBack={this.okDialogCallBack} headerText={this.state.okDialogHeaderText} bodyText={this.state.okDialogBodyText} key={this.state.okDialogKey} /> </span> </Row> : null } </Grid> </Well> ); } handleMapClick = (event) => { .... .... this._positionService.clearUserJobPositions(); this._positionService.storeUserJobPositions(this.state.markers); this.pushOutJob(newPosition, currentJob); } handleMarkerClick = (targetMarker) => { console.log('button on AcceptList clicked:' + targetMarker.key); console.log(targetMarker); let currentJob = this._jobService.currentJob(); let jobForMarker = targetMarker.jobForMarker; let clientMarker = _.find(this.state.markers, { 'isDriverIcon': false }); if (clientMarker != undefined && clientMarker != null) { let clientJob = clientMarker.jobForMarker; clientJob.driverFullName = jobForMarker.driverFullName; clientJob.driverEmail = jobForMarker.driverEmail; clientJob.driverPosition = jobForMarker.driverPosition; clientJob.vehicleDescription = jobForMarker.vehicleDescription; clientJob.vehicleRegistrationNumber = jobForMarker.vehicleRegistrationNumber; clientJob.isAssigned = true; let self = this; console.log("handleMarkerClick job"); console.log(clientJob); this.makePOSTRequest('job/submit', clientJob, this, function (jdata, textStatus, jqXHR) { console.log("After is accepted"); const newState = Object.assign({}, self.state, { isJobAccepted: true }) self.setState(newState); }); } } addMarkerForJob = (jobArgs: any): void => { console.log("addMarkerForJob"); console.log(this.state); if (this.state.isJobAccepted || jobArgs.isAssigned) { this.processAcceptedMarkers(jobArgs); } else { this.processNotAcceptedMarkers(jobArgs); } } processAcceptedMarkers = (jobArgs: any): void => { .... .... } processNotAcceptedMarkers = (jobArgs: any): void => { .... .... } addClientDetailsToDrivers = (newMarkersList: PositionMarker[]): void => { .... } updateStateForMarkers = (newState: any, newMarkersList: PositionMarker[], newPositionForUser: Position, jobArgs:any): void => { //Update the list of position markers in the PositionService this._positionService.clearUserJobPositions(); this._positionService.storeUserJobPositions(newMarkersList); //Update the position in the PositionService if (newPositionForUser != undefined && newPositionForUser != null) { this._positionService.clearUserPosition(); this._positionService.storeUserPosition(newPositionForUser); } this._jobService.clearUserIssuedJob(); this._jobService.storeUserIssuedJob(jobArgs); //update the state this.setState(newState); } updateMatchedUserMarker = (jobEmailToCheck: string, newMarkersList: PositionMarker[], jobPosition: Position, jobForMarker:any): void => { if (jobEmailToCheck != undefined && jobEmailToCheck != null) { let matchedMarker = _.find(this.state.markers, { 'email': jobEmailToCheck }); if (matchedMarker != null) { //update its position matchedMarker.position = jobPosition; matchedMarker.jobForMarker = jobForMarker; } } } updateStateForNewMarker = (newMarkersList:PositionMarker[], position: Position): any => { if (position != null) { return Object.assign({}, this.state, { currentPosition: position, markers: newMarkersList }) } else { return Object.assign({}, this.state, { markers: newMarkersList }) } } updateStateForAcceptedMarker = (newMarkersList: PositionMarker[], position: Position): any => { if (position != null) { return Object.assign({}, this.state, { currentPosition: position, markers: newMarkersList, isJobAccepted: true }) } else { return Object.assign({}, this.state, { markers: newMarkersList, isJobAccepted: true }) } } shouldShowMarkerForJob = (jobArgs: any): boolean => { let isDriver = this._authService.isDriver(); let currentJob = this._jobService.currentJob(); let hasJob = currentJob != undefined && currentJob != null; //case 1 - No job exists, to allow driver to add their mark initially if (!hasJob && isDriver) return true; //case 2 - Job exists and is unassigned and if there is no other active // job for this client/ driver if (hasJob && !currentJob.isAssigned) return true; //case 3 - If the job isAssigned and its for the current logged in client/driver if (hasJob && currentJob.isAssigned) { if (currentJob.clientEmail == jobArgs.clientEmail) { return true; } if (currentJob.driverEmail == jobArgs.driverEmail) { return true; } } return false; } pushOutJob = (newPosition: Position, jobForMarker : any): void => { .... .... .... var newJob = { jobUUID: this._currentJobUUID != undefined && this._currentJobUUID != '' ? this._currentJobUUID : '', clientFullName: localClientFullName, clientEmail: localClientEmail, clientPosition: localClientPosition, driverFullName: localDriverFullName, driverEmail: localDriverEmail, driverPosition: localDriverPosition, vehicleDescription: isDriver ? this._authService.user().vehicleDescription : '', vehicleRegistrationNumber: isDriver ? this._authService.user().vehicleRegistrationNumber : '', isAssigned: localIsAssigned, isCompleted: false } console.log("handlpushOutJob job"); console.log(newJob); this.makePOSTRequest('job/submit', newJob, self, function (jdata, textStatus, jqXHR) { self._jobService.clearUserIssuedJob(); self._jobService.storeUserIssuedJob(newJob); }); } createDriverMarker = ( driver: any, event: any): PositionMarker => { .... } ratingsDialogOkCallBack = (theRatingScore: number) => { console.log('RATINGS OK CLICKED'); //POST rating data } makePOSTRequest = (route: string, jsonData: any, context: ViewJob, doneCallback: DoneCallback) => { //Post job data } .... .... }
Some highlights
- We use RX.Js to listen to new events straight over the Comet based forever frame, that the server side Play scala code pushes a message out on
- There was a funny thing with driver acceptance which I originally wanted to be a button on a drivers marker within the map. However this caused an issue with the Map where it would get a Map event when clicking on an overlay (higher Z-Order so should not happen). This is a feature of the React Google Map component. I could not find a fix I liked (I did mess around with form event mouseEnter/mouseLeave but it was just not that great, so I opted to chose to put the acceptance of driver outside of the map, thus avoiding the issue altogether)
What does it look like when run?
Some scenarios of what it looks like running are shown below.
In order to run it to this point, I normally follow this set of steps afterwards
- open a tab, login as a passenger that I had created
- go to the create job page, click the map, push the create job button
- open a NEW tab, login as a new driver, go to the view job page
- on the 1st tab (passenger) click the map to push passenger position to driver
- on the 2nd tab (driver) click the map to push driver position to passenger
- repeat last 4 steps for additional driver
- on client tab pick driver to accept, click accept button
- complete the job from client tab, give driver rating
- complete the job from paired driver tab, give passenger rating
- go to view rating page, should see ratings
One of the challenges with an app like this, is that it is a streaming app. So this means that when a client pushes out a new job, there may be no-one listening for that job. Drivers may not even be logged in at all, or may login later, so they effectively subscribe late. For this app dealing with that was kind of out of scope. To remedy this, you need to ensure that position updates (clicking on the map for the given user browser session (ie tab)) gets pushed to other user browser sessions where the marker is not currently shown.
In a real world app, we might choose to do one of the following to fix this permanently
- Store some, and when a new user joins grab all unassigned passengers/driver within some geographical area
- Store last N-Many passenger/driver positions and push these on new login (there is no guarantee that these are in the same geographical area as us though, could be completely unrelated/of no practical concern for the current user)
Anyway as I say this out of scope for this demo project, but I hope that it does give you some insight as to why you need to push position updates manually
This gives you an example of what it all looks like when its running (not accepted yet)
This is what it looks like for the following setup
- 1 x passenger (sacha barber)
- 2 x driver (driver 1 / driver 2)
So now lets see what happens when we accept one of the drivers. I have chosen driver 1 for this example
This gives you an example of what it all looks like when its running (after job accepted between passenger/driver1)
Here is what things look like after job has been accepted
ViewRating workflow
The Rating flow is one of the more complex aspects of this entire article. The idea is as follows:
- At the end of a job the passenger may rate the driver for the Job
- At the end of a job the driver may rate the passenger for the Job
- These Ratings should be stored/aggregated and exposes via a REST API for the client/driver such that they can view their accumulated Rating over time
Simple enough idea right, but this part will use Kafka Streams
Walking through a Kafka Streams processing node, and the duality of streams
Before we get started I just wanted to include a several excerpts taken from the official Kafka docs :http://docs.confluent.io/current/streams/concepts.html#duality-of-streams-and-tables which talks about KStream and KTable objects (which are the stream and table objects inside Kafka streams)
When implementing stream processing use cases in practice, you typically need both streams and also databases. An example use case that is very common in practice is an e-commerce application that enriches an incoming stream of customer transactions with the latest customer information from a database table. In other words, streams are everywhere, but databases are everywhere, too.
Any stream processing technology must therefore provide first-class support for streams and tables. Kafkas Streams API provides such functionality through its core abstractions for streams and tables, which we will talk about in a minute. Now, an interesting observation is that there is actually a close relationship between streams and tables, the so-called stream-table duality. And Kafka exploits this duality in many ways: for example, to make your applications elastic, to support fault-tolerant stateful processing, or to run interactive queries against your applications latest processing results. And, beyond its internal usage, the Kafka Streams API also allows developers to exploit this duality in their own applications.
A simple form of a table is a collection of key-value pairs, also called a map or associative array. Such a table may look as follows:
The stream-table duality describes the close relationship between streams and tables.
- Stream as Table: A stream can be considered a changelog of a table, where each data record in the stream captures a state change of the table. A stream is thus a table in disguise, and it can be easily turned into a real table by replaying the changelog from beginning to end to reconstruct the table. Similarly, aggregating data records in a stream will return a table. For example, we could compute the total number of pageviews by user from an input stream of pageview events, and the result would be a table, with the table key being the user and the value being the corresponding pageview count.
- Table as Stream: A table can be considered a snapshot, at a point in time, of the latest value for each key in a stream (a streams data records are key-value pairs). A table is thus a stream in disguise, and it can be easily turned into a real stream by iterating over each key-value entry in the table.
Lets illustrate this with an example. Imagine a table that tracks the total number of pageviews by user (first column of diagram below). Over time, whenever a new pageview event is processed, the state of the table is updated accordingly. Here, the state changes between different points in time and different revisions of the table can be represented as a changelog stream (second column).
Because of the stream-table duality, the same stream can be used to reconstruct the original table (third column):
The same mechanism is used, for example, to replicate databases via change data capture (CDC) and, within Kafka Streams, to replicate its so-called state stores across machines for fault tolerance. The stream-table duality is such an important concept for stream processing applications in practice that Kafka Streams models it explicitly via the KStream and KTable abstractions, which we describe in the next sections.
I would STRONLY urge you all to read the section of the official docs above, as it will really help you should you want to get into Kafka Streams.
Anyway with all that in mind how does that relate to the use case we are trying to solve. Lets assume we have a Kafka publisher that pushes out Rating
objects, and as stated ideally we would like to query these across all processor nodes. As such we should now know that this will involve a KStream
and some sort of aggregation to an eventual KTable
(where a state store will be used).
Probably the easiest thing to do is to start with the code, which looks like this for the main stream processing code for the Rating
section of then final app.
MadCapIdea/blob/develop/KafkaStreams/src/main/scala/Processing/Ratings/RatingStreamProcessingApp.scala
import java.util.concurrent.TimeUnit import org.apache.kafka.common.serialization._ import org.apache.kafka.streams._ import org.apache.kafka.streams.kstream._ import entities.Rating import serialization.JSONSerde import topics.RatingsTopics import utils.Settings import stores.StateStores import org.apache.kafka.streams.state.HostInfo import scala.concurrent.ExecutionContext import scala.concurrent.duration._ package processing.ratings { import org.apache.kafka.streams.errors.BrokerNotFoundException import utils.Retry class RatingByEmailInitializer extends Initializer[List[Rating]] { override def apply(): List[Rating] = List[Rating]() } class RatingByEmailAggregator extends Aggregator[String, Rating, List[Rating]] { override def apply(aggKey: String, value: Rating, aggregate: List[Rating]) = { value :: aggregate } } object RatingStreamProcessingApp extends App { implicit val ec = ExecutionContext.global run() private def run(): Unit = { val restEndpoint: HostInfo = new HostInfo(Settings.restApiDefaultHostName, Settings.restApiDefaultPort) System.out.println(s"Connecting to Kafka cluster via bootstrap servers ${Settings.bootStrapServers}") System.out.println(s"REST endpoint at http://${restEndpoint.host}:${restEndpoint.port}") val maybeStreams = Retry.whileSeeingExpectedException[KafkaStreams,BrokerNotFoundException](10.seconds)(createStreams) maybeStreams match { case Some(streams) => { val restService = new RatingRestService(streams, restEndpoint) restService.start() Runtime.getRuntime.addShutdownHook(new Thread(() => { streams.close(10, TimeUnit.SECONDS) restService.stop })) } case None => { println("Quiting due to no streams available/unknown expcetion") } } //return unit () } def createStreams() : KafkaStreams = { val stringSerde = Serdes.String val ratingSerde = new JSONSerde[Rating] val listRatingSerde = new JSONSerde[List[Rating]] val builder: KStreamBuilder = new KStreamBuilder val ratings = builder.stream(stringSerde, ratingSerde, RatingsTopics.RATING_SUBMIT_TOPIC) //aggrgate by (user email -> their ratings) val ratingTable = ratings.groupByKey(stringSerde, ratingSerde) .aggregate( new RatingByEmailInitializer(), new RatingByEmailAggregator(), listRatingSerde, StateStores.RATINGS_BY_EMAIL_STORE ) //useful debugging aid, print KTable contents ratingTable.toStream.print() val streams: KafkaStreams = new KafkaStreams(builder, Settings.createRatingStreamsProperties) // Always (and unconditionally) clean local state prior to starting the processing topology. // We opt for this unconditional call here because this will make it easier for you to // play around with the example when resetting the application for doing a re-run // (via the Application Reset Tool, // http://docs.confluent.io/current/streams/developer-guide.html#application-reset-tool). // // The drawback of cleaning up local state prior is that your app must rebuilt its local // state from scratch, which will take time and will require reading all the state-relevant // data from the Kafka cluster over the network. // Thus in a production scenario you typically do not want to clean up always as we do // here but rather only when it is truly needed, i.e., only under certain conditions // (e.g., the presence of a command line flag for your app). // See `ApplicationResetExample.java` for a production-like example. streams.cleanUp(); streams.start() streams } } }
Remember the idea is to get a Rating
for a user (based on their email address), and store all the Rating
associated with them in some sequence/list such that they can be retrieved in one go based on a a key, where the key would be the users email, and the value would be this list of Rating
objects.I think with the formal discussion from the official Kafka docs and my actual Rating
requirement, the above should hopefully be pretty clear.
Walking through Kafka Streams interactive queries
So now that we have gone through how data is produced, and transformed (well actually I did not do too much transformation other than a simple map, but trust me you can), and how we aggregate results from a KStream to a KTable (and its state store), we will move on to see how we can use Kafka interactive queries to query the state stores.
One important concept is that if you used multiple partitions for your original topic, the state may be spread across n-many processing node. For this project I have only chosen to use 1 partition, but have written the code to support n-many.
So lets assume that each node could read a different segment of data, or that each node must read from n-many partitions (there is not actually a mapping to nodes and partitions these are 2 mut read chapters elastic-scaling-of-your-application and parallelism-model) we would need each node to expose a REST API to allow its OWN state store to be read. By reading ALL the state stores we are able to get a total view of ALL the persisted data across ALL the partitions. I urge all of you to read this section of the official docs :http://docs.confluent.io/current/streams/developer-guide.html#querying-remote-state-stores-for-the-entire-application
This diagram has also be shamelessly stolen from the official docs:
I think this diagram does an excellent job of showing you 3 separate processor nodes, and each of them may have a bit of state. ONLY be assembling ALL the data from these nodes are we able to see the ENTIRE dataset.
Kafka allows this via metadata about the streams, where we can use the exposed metadata to help us gather the state store data. To do this we first need a MetadataService, which for me is as follows:
package processing.ratings import org.apache.kafka.streams.KafkaStreams import org.apache.kafka.streams.state.StreamsMetadata import java.util.stream.Collectors import entities.HostStoreInfo import org.apache.kafka.common.serialization.Serializer import org.apache.kafka.connect.errors.NotFoundException import scala.collection.JavaConverters._ /** * Looks up StreamsMetadata from KafkaStreams */ class MetadataService(val streams: KafkaStreams) { /** * Get the metadata for all of the instances of this Kafka Streams application * * @return List of { @link HostStoreInfo} */ def streamsMetadata() : List[HostStoreInfo] = { // Get metadata for all of the instances of this Kafka Streams application val metadata = streams.allMetadata return mapInstancesToHostStoreInfo(metadata) } /** * Get the metadata for all instances of this Kafka Streams application that currently * has the provided store. * * @param store The store to locate * @return List of { @link HostStoreInfo} */ def streamsMetadataForStore(store: String) : List[HostStoreInfo] = { // Get metadata for all of the instances of this Kafka Streams application hosting the store val metadata = streams.allMetadataForStore(store) return mapInstancesToHostStoreInfo(metadata) } /** * Find the metadata for the instance of this Kafka Streams Application that has the given * store and would have the given key if it exists. * * @param store Store to find * @param key The key to find * @return { @link HostStoreInfo} */ def streamsMetadataForStoreAndKey[T](store: String, key: T, serializer: Serializer[T]) : HostStoreInfo = { // Get metadata for the instances of this Kafka Streams application hosting the store and // potentially the value for key val metadata = streams.metadataForKey(store, key, serializer) if (metadata == null) throw new NotFoundException( s"No metadata could be found for store : ${store}, and key type : ${key.getClass.getName}") HostStoreInfo(metadata.host, metadata.port, metadata.stateStoreNames.asScala.toList) } def mapInstancesToHostStoreInfo(metadatas : java.util.Collection[StreamsMetadata]) : List[HostStoreInfo] = { metadatas.stream.map[HostStoreInfo](metadata => HostStoreInfo( metadata.host(), metadata.port, metadata.stateStoreNames.asScala.toList)) .collect(Collectors.toList()) .asScala.toList } }
This metadata service is used to obtain the state store information, which we can then use to extract the state data we want (its a key value store really).
The next thing we need to do is expose a REST API to allow us to get the state. lets see that now
package processing.ratings import org.apache.kafka.streams.KafkaStreams import org.apache.kafka.streams.state.HostInfo import akka.actor.ActorSystem import akka.http.scaladsl.Http import akka.http.scaladsl.model._ import akka.http.scaladsl.server.Directives._ import akka.stream.ActorMaterializer import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._ import spray.json.DefaultJsonProtocol._ import entities.AkkaHttpEntitiesJsonFormats._ import entities._ import stores.StateStores import akka.http.scaladsl.marshalling.ToResponseMarshallable import org.apache.kafka.common.serialization.Serdes import scala.concurrent.{Await, ExecutionContext, Future} import akka.http.scaladsl.unmarshalling.Unmarshal import spray.json._ import scala.util.{Failure, Success} import org.apache.kafka.streams.state.QueryableStoreTypes import scala.concurrent.duration._ object RestService { val DEFAULT_REST_ENDPOINT_HOSTNAME = "localhost" } class RatingRestService(val streams: KafkaStreams, val hostInfo: HostInfo) { val metadataService = new MetadataService(streams) var bindingFuture: Future[Http.ServerBinding] = null implicit val system = ActorSystem("rating-system") implicit val materializer = ActorMaterializer() implicit val executionContext = system.dispatcher def start() : Unit = { val emailRegexPattern = """\w+""".r val storeNameRegexPattern = """\w+""".r val route = path("ratingByEmail") { get { parameters('email.as[String]) { (email) => try { val host = metadataService.streamsMetadataForStoreAndKey[String]( StateStores.RATINGS_BY_EMAIL_STORE, email, Serdes.String().serializer() ) var future:Future[List[Rating]] = null //store is hosted on another process, REST Call if(!thisHost(host)) future = fetchRemoteRatingByEmail(host, email) else future = fetchLocalRatingByEmail(email) val ratings = Await.result(future, 20 seconds) complete(ratings) } catch { case (ex: Exception) => { val finalList:List[Rating] = scala.collection.immutable.List[Rating]() complete(finalList) } } } } } ~ path("instances") { get { complete(ToResponseMarshallable.apply(metadataService.streamsMetadata)) } }~ path("instances" / storeNameRegexPattern) { storeName => get { complete(ToResponseMarshallable.apply(metadataService.streamsMetadataForStore(storeName))) } } bindingFuture = Http().bindAndHandle(route, hostInfo.host, hostInfo.port) println(s"Server online at http://${hostInfo.host}:${hostInfo.port}/\n") Runtime.getRuntime.addShutdownHook(new Thread(() => { bindingFuture .flatMap(_.unbind()) // trigger unbinding from the port .onComplete(_ => system.terminate()) // and shutdown when done })) } def fetchRemoteRatingByEmail(host:HostStoreInfo, email: String) : Future[List[Rating]] = { val requestPath = s"http://${hostInfo.host}:${hostInfo.port}/ratingByEmail?email=${email}" println(s"Client attempting to fetch from online at ${requestPath}") val responseFuture: Future[List[Rating]] = { Http().singleRequest(HttpRequest(uri = requestPath)) .flatMap(response => Unmarshal(response.entity).to[List[Rating]]) } responseFuture } def fetchLocalRatingByEmail(email: String) : Future[List[Rating]] = { val ec = ExecutionContext.global println(s"client fetchLocalRatingByEmail email=${email}") val host = metadataService.streamsMetadataForStoreAndKey[String]( StateStores.RATINGS_BY_EMAIL_STORE, email, Serdes.String().serializer() ) val f = StateStores.waitUntilStoreIsQueryable( StateStores.RATINGS_BY_EMAIL_STORE, QueryableStoreTypes.keyValueStore[String,List[Rating]](), streams ).map(_.get(email))(ec) val mapped = f.map(rating => { if (rating == null) List[Rating]() else rating }) mapped } def stop() : Unit = { bindingFuture .flatMap(_.unbind()) // trigger unbinding from the port .onComplete(_ => system.terminate()) // and shutdown when done } def thisHost(hostStoreInfo: HostStoreInfo) : Boolean = { hostStoreInfo.host.equals(hostInfo.host()) && hostStoreInfo.port == hostInfo.port } }
With that final class we are able to run the application and query it using the url http://localhost:8080/ratingByEmail?email=sacha@here.com
(the key to the Kafka store here is sacha@here.com and the value could either be an empty list or a List[Ranking]
objects as JSON, the results of which are shown below after we have run the producer and used Chrome (or any other REST tool of your picking) to get the results
REST Endpoint Facade
So we just created an Akka Http REST endpoint to serve up the combined Rating
(s) that have been pushed through the Kafka stream processing rating topic. However we have this Play framework API which we use for all other REST endpoints. So I have chose to create a façade endpoint in the Play backend that will simply call out to the existing Akka Http endpoint. Keeping all the traffic in one place is a nice thing if you ask me. So lets look at this play code to do this
We obviously need a new route, which is as follows:We obviously need a new route, which is as follows:
GET /rating/byemail controllers.RatingController.ratingByEmail()
Controller Action
To serve this new route we need a new Action in the RatingController
. This is shown below:
package controllers import javax.inject.Inject import entities.RatingJsonFormatters._ import entities._ import actors.rating.RatingProducerActor import akka.actor.{ActorSystem, OneForOneStrategy, Props, SupervisorStrategy} import akka.pattern.{Backoff, BackoffSupervisor} import akka.stream.{ActorMaterializer, ActorMaterializerSettings, Supervision} import play.api.libs.json._ import play.api.libs.json.Json import play.api.libs.json.Format import play.api.libs.json.JsSuccess import play.api.libs.json.Writes import play.api.libs.ws._ import play.api.mvc.{Action, Controller} import utils.{Errors, Settings} import scala.concurrent.{ExecutionContext, Future} import scala.util.Random import scala.concurrent.duration._ class RatingController @Inject() ( implicit actorSystem: ActorSystem, ec: ExecutionContext, ws: WSClient ) extends Controller { //Error handling for streams //http://doc.akka.io/docs/akka/2.5.2/scala/stream/stream-error.html val decider: Supervision.Decider = { case _ => Supervision.Restart } implicit val mat = ActorMaterializer( ActorMaterializerSettings(actorSystem).withSupervisionStrategy(decider)) val childRatingActorProps = Props(classOf[RatingProducerActor],mat,ec) val rand = new Random() val ratingSupervisorProps = BackoffSupervisor.props( Backoff.onStop( childRatingActorProps, childName = s"RatingProducerActor_${rand.nextInt()}", minBackoff = 3.seconds, maxBackoff = 30.seconds, randomFactor = 0.2 ).withSupervisorStrategy( OneForOneStrategy() { case _ => SupervisorStrategy.Restart }) ) val ratingSupervisorActorRef = actorSystem.actorOf(ratingSupervisorProps, name = "ratingSupervisor") def submitNewRating = Action.async(parse.json) { request => Json.fromJson[Rating](request.body) match { case JsSuccess(newRating, _) => { ratingSupervisorActorRef ! newRating Future.successful(Ok(Json.toJson(newRating.copy(toEmail = newRating.toEmail.toUpperCase)))) } case JsError(errors) => Future.successful(BadRequest("Could not build a Rating from the json provided. " + Errors.show(errors))) } } def ratingByEmail = Action.async { request => val email = request.getQueryString("email") email match { case Some(emailAddress) => { val url = s"http://${Settings.ratingRestApiHostName}:${Settings.ratingRestApiPort}/ratingByEmail?email=${emailAddress}" ws.url(url).get().map { response => (response.json).validate[List[Rating]] }.map(x => Ok(Json.toJson(x.get))) } case None => { Future.successful(BadRequest( "ratingByEmail endpoint MUST be supplied with a non empty 'email' query string value")) } } } }
The main thing to note here is:
- We use the play ws (web services) library to issues a GET request against the existing Akka Http endpoint. Thus creating our façade.
- We are still using Future to make it nice an async
React front end for ratings
This is the final results for the View Rating react page. I think its all fairly self explanatory. I guess the only bit that really of any note is that we use lodash _.sumBy(..)
to do the summing up of the Ratings
for this user to create an overall rating.The rest is standard jQuery/react stuff.
import * as React from "react"; import * as ReactDOM from "react-dom"; import * as _ from "lodash"; import { OkDialog } from "./components/OkDialog"; import 'bootstrap/dist/css/bootstrap.css'; import { Well, Grid, Row, Col, Label, ButtonInput } from "react-bootstrap"; import { AuthService } from "./services/AuthService"; import { hashHistory } from 'react-router'; class Rating { fromEmail: string toEmail: string score: number constructor(fromEmail, toEmail, score) { this.fromEmail = fromEmail; this.toEmail = toEmail; this.score = score; } } export interface ViewRatingState { ratings: Array<Rating>; overallRating: number; okDialogOpen: boolean; okDialogKey: number; okDialogHeaderText: string; okDialogBodyText: string; wasSuccessful: boolean; } export class ViewRating extends React.Component<undefined, ViewRatingState> { private _authService: AuthService; constructor(props: any) { super(props); this._authService = props.route.authService; if (!this._authService.isAuthenticated()) { hashHistory.push('/'); } this.state = { overallRating: 0, ratings: Array(), okDialogHeaderText: '', okDialogBodyText: '', okDialogOpen: false, okDialogKey: 0, wasSuccessful: false }; } loadRatingsFromServer = () => { var self = this; var currentUserEmail = this._authService.userEmail(); $.ajax({ type: 'GET', url: 'rating/byemail?email=' + currentUserEmail, contentType: "application/json; charset=utf-8", dataType: 'json' }) .done(function (jdata, textStatus, jqXHR) { console.log("result of GET rating/byemail"); console.log(jqXHR.responseText); let ratingsObtained = JSON.parse(jqXHR.responseText); self.setState( { overallRating: _.sumBy(ratingsObtained, 'score'), ratings: ratingsObtained }); }) .fail(function (jqXHR, textStatus, errorThrown) { self.setState( { okDialogHeaderText: 'Error', okDialogBodyText: 'Could not load Ratings', okDialogOpen: true, okDialogKey: Math.random() }); }); } componentDidMount() { this.loadRatingsFromServer(); } render() { var rowComponents = this.generateRows(); return ( <Well className="outer-well"> <Grid> <Row className="show-grid"> <Col xs={6} md={6}> <div> <h4>YOUR OVERALL RATING <Label>{this.state.overallRating}</Label></h4> </div> </Col> </Row> <Row className="show-grid"> <Col xs={10} md={6}> <h6>The finer details of your ratings are shown below</h6> </Col> </Row> <Row className="show-grid"> <Col xs={10} md={6}> <div className="table-responsive"> <table className="table table-striped table-bordered table-condensed factTable"> <thead> <tr> <th>Rated By</th> <th>Rating Given</th> </tr> </thead> <tbody> {rowComponents} </tbody> </table> </div> </Col> </Row> <Row className="show-grid"> <span> <OkDialog open= {this.state.okDialogOpen} okCallBack= {this._okDialogCallBack} headerText={this.state.okDialogHeaderText} bodyText={this.state.okDialogBodyText} key={this.state.okDialogKey}/> </span> </Row> </Grid> </Well> ) } _okDialogCallBack = () => { this.setState( { okDialogOpen: false }); } generateRows = () => { return this.state.ratings.map(function (item) { return <tr key={item.fromEmail}> <td>{item.fromEmail}</td> <td>{item.score}</td> </tr>; }); } }
Conclusion
This was certainly a challenging thing to write, and I am honestly pleased that I got it done, I have had a really good time writing this and it has been a great project for self improvement. I would recommend this type of thing as a great use of time. Go on find yourself a pet project
License
This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)