Spatial Search with Cassandra & DataStax Enterprise

Posted on

I was asked recently at a meetup whether or not Cassandra could be used for spatial data storage. My answer was a definite yes but with the proviso that you would need some special sauce to do spatial queries on your row data. Technologies that spring to mind are SolrCloud or ElasticSearch. DataStax Enterprise also has it’s flavour of integrated Solr so I decided to put together a brief tutorial to try it out. Note: This tutorial assumes a basic knowledge of CQL, Cassandra and Solr.

I’ll keep the use case simple enough, its a common one for spatial data. I want to put points of interest (POIs) on a map and let people comment on them or add new points of interest. In terms of querying for POIs, when I pan around the map I want it to update showing me everything in that area within a certain radius. If a user clicks on a POI they can┬ásee a time series view of all comments for a particular location.

The comments part is simple with cassandra as it eats time series use cases for breakfast but the spatial search is a little more involved. In the real world, if I can help it, I don’t want to manage a search index in a separate system to my record store as this increases complexity as the data has to be managed in two places.

Note: DSE is free for development purposes but you pay a license to use it in production. If you did want to do this with open source software then ElasticSearch or SolrCloud would be the more prominent choices. See a balanced comparison of them here. With these systems, you will have to manage syncing of data between your cluster and search index yourself. This can be quite a challenge as it introduces more failure scenarios and the question of whether an update is truly done until it is in both systems.

With DSE you can run Solr Search nodes within a cluster which are kept in sync with specific column families (now tables). So a write to the cluster will result in both a classic Cassandra write and a Solr Index update. DSE manages this for you. Details on Solr integration and starting a node/cluster with search enabled are here. Always a good idea to RTM before using any feature! Furthermore, when you write data via the Solr API it gets reflected in the Cassandra table, and conversely, when you write data via a Cassandra client, the Solr index gets updated automatically. This means you have plenty of options in terms of your client implementation.

Schemas & Data

For test data, you can source an RDF file of all the geocoded points of interest on wikipedia here. In the “Geographic Coordinates” section. It is available in multiple languages. I used the IRI resource format as opposed to URI. It doesn’t really matter as you can always play with the link encodings yourself. There are about 450,000 pois, it only takes a few minutes to insert and index them into Cassandra/Solr.

Next create a keyspace and some tables for the points of interest and the comments via CQL. This is a basic demo, so I’m not too worried about having every field in there. I’ve left out the concept of a user for the moment. RF will be higher in a real environment. Mixed search/analytics and real-time node environments are set up using virtual clusters, so you would also be using the NetworkTopologyStrategy in a real environment.

CREATE KEYSPACE geo WITH strategy_class = 'SimpleStrategy' AND strategy_options:replication_factor = '1';
CREATE TABLE pois (id uuid, label varchar, iri varchar, latlng varchar, PRIMARY KEY(id));
CREATE TABLE poiComments (poiId uuid, commentId timeuuid, content varchar, PRIMARY KEY(poiId, commentId)) WITH CLUSTERING ORDER BY (commentId DESC);

We now need to tell SOLR we want to index any data put into the geo.pois table. We can modify the schema and configuration loading script from the wikipedia demo available in the DSE distribution directory.

cd $MYDSEDIR/demos/wikipedia
cp set-solr-options.sh 1-add-schema.sh schema.xml solrconfig.xml $MYPROJDIR

Edit your schema file to include the geo spatial types. You can leave the solrconfig.xml as it is. Your schema.xml file should look like the following.

<?xml version="1.0" encoding="UTF-8"?>
<schema name="pois" version="1.1">
   <types>
      <fieldType name="string" class="solr.TextField" />
      <fieldType name="text" class="solr.TextField">
         <analyzer>
            <tokenizer class="solr.StandardTokenizerFactory" />
         </analyzer>
      </fieldType>
      <fieldType name="coord" class="solr.LatLonType" subFieldSuffix="_coordinate" />
      <fieldType name="tdouble" class="solr.TrieDoubleField" precisionStep="8" positionIncrementGap="0" />
   </types>
   <fields>
      <field name="id" type="string" indexed="true" stored="true" />
      <field name="label" type="text" indexed="true" stored="true" />
      <field name="iri" type="text" indexed="true" stored="true" />
      <field name="latlng" type="coord" indexed="true" stored="true" />
      <dynamicField name="*_coordinate" type="tdouble" indexed="true" stored="false" />
   </fields>
   <defaultSearchField>label</defaultSearchField>
   <uniqueKey>id</uniqueKey>
</schema>

What to set the default search field depends on how you are querying. The point to note in the schema above is that we are storing a solr.LatLng type which has an associated dynamic field for coordinate stored as a solr.TrieDoubleField. Under the hood this type is stored as UTF8Type in cassandra.

Open the 1-add-schema.sh file and where you see any reference to ‘solr.wiki’, replace it with ‘geo.pois’. Then execute the following:

./1-add-schema.sh

You hopefully should see several success messages telling you your config has been accepted and your schema has been created. From this point on, inserts/updates to the geo.pois table in cassandra will be automatically updated in the solr index. If you want to check that a Solr core has been successfully created, pop open a web browser and go to http://$MYHOST:8983/solr/ then browse to the geo.pois core. If you update your schema later you need to issue a RELOAD command. Check the 1-add-schema.sh script for the curl command for this, just replace the action with RELOAD.

Now you need to write a simple import script in python to drag in your poi data. I’ve included some snippets below from mine but the code is really quick and dirty so I’m not going to post it here. A tip is to use RDFLib which is a python library for parsing RDF files. You can add whatever batching or validation/logging code you want.

import rdflib
import uuid
import cql

# Grab a connection to cassandra
keyspace = "geo"
con = cql.connect("127.0.0.1", 9160, keyspace, cql_version='3.0.0')
cursor = con.cursor()

...

# Iterate through each RDF identifier in the file

for each unique RDF record:

# Insert a row
cursor.execute('INSERT INTO pois (id, label, iri, latlng) VALUES (:id, :label, :iri, :latlng)', obj)

...

# Obj dictionary above typically contains
obj["id"] = uuid.uuid4()
obj["iri"] = "http://dbpedia.org/resource/Ascot_Park_railway_station";
# Parse out the end of the IRI and remove the _ chars
obj["label"] = obj["iri"].split('/')[4].replace("_", " ")
obj["latlng"] = "-34.9912692,138.5612366"

...

# end for and cleanup connections
cursor.close();
con.close();

Comments

We mentioned we want people to be able to comment on a poi and in our UI we will pull back the last 10 comments when a user clicks on the POI. When creating the comments table I specified WITH CLUSTERING ORDER BY (commentId DESC). This gives me the reverse ordering by time that I’m looking for. Below are some examples of how we would interact with this table.

1. Inserting comments. We use the id from the POI table as the partition key and the commentId timeuuid as the column key.

cqlsh:geo> INSERT INTO poiComments (poiId, commentId, content) VALUES (1998f22b-51f0-4002-854f-f32790e60297,af07d130-1caa-11e3-b968-0800200c9a66,'Worst airport ever!');
cqlsh:geo> INSERT INTO poiComments (poiId, commentId, content) VALUES (1998f22b-51f0-4002-854f-f32790e60297,b447a8a0-1caa-11e3-b968-0800200c9a66,'Best airport ever!');

2. Querying Comments by POI. Notice how the results are coming back by default in reverse time order as desired.

cqlsh:geo> select commentId, content from poiComments where poiId=1998f22b-51f0-4002-854f-f32790e60297 LIMIT 10;
 commentid                | content
--------------------------+---------------------
 2013-09-13 20:28:44+0100 |  Best airport ever!
 2013-09-13 20:28:35+0100 | Worst airport ever!

And that is that. We now have the ability to store potentially millions of comments about a POI without ever needing to read the entire row at once.

Querying & Spatial Search

With the POI data imported you should expect to see the following when you query via CQL.

cqlsh:geo> select id,label,iri,latlng from pois limit 4;
 id                                   | label                       | iri                                                     | latlng
--------------------------------------+-----------------------------+---------------------------------------------------------+--------------------------------------
 dd01145b-542b-4228-b4e4-9a30434f13c0 |  Ascot Park railway station |  http://dbpedia.org/resource/Ascot_Park_railway_station |              -34.9912692,138.5612366
 c35a27fc-dac1-4276-90e7-f91b6761f184 | Science Museum at Wroughton | http://dbpedia.org/resource/Science_Museum_at_Wroughton |         51.51225,-1.8127777777777778
 5f0ec39f-77a1-42cf-ae4d-cff92924630e |       Netaji Indoor Stadium |       http://dbpedia.org/resource/Netaji_Indoor_Stadium | 22.566127777777776,88.34166666666667
 2d890ba7-5bd7-4c56-99da-d9012acc5f1b |              Gare de Troyes |              http://dbpedia.org/resource/Gare_de_Troyes | 48.295833333333334,4.064722222222223

If you do a * query, you will see that DSE has added some additional columns to the table. This is most likely for versioning but I would still question why they need to be there for each row, I’m not referring to the _docBoost or _dynFld but rather the other ones. If anyone knows the answer DM me.

The spatial queries are straightforward. To begin, navigate your web browser to http://$MYHOST:8983/solr/#/geo.pois/query.

The important field to use is the filter query (fq) field. In our case we want to use the geofilt function. It takes the form:

{!geofilt pt=45.15,-93.85 sfield=latlng d=5}

Where pt is point, sfield is spatial search field (coords in solr.LatLng) and d is the radius, in my case 5km. This is a float so you can also do sub 1km radius searches.

The query parameter remains as *:* which in Solr parlance means any field matching any value.

Screen Shot 2013-09-13 at 15.26.05

In our demo application we have a few different types of queries:

1) Give me all pois near where the map is centered. Usually the radius is determined based on the zoom level.


http://localhost:8983/solr/geo.pois/select?q=*:*&amp;fq={!geofilt pt=45.15,-93.85 sfield=latlng d=5}&amp;wt=json

{
  "responseHeader":{
    "status":0,
    "QTime":260,
    "params":{
      "indent":"true",
      "q":"*:*",
      "wt":"json",
      "fq":"{!geofilt pt=45.15,-93.85 sfield=latlng d=5}"}},
  "response":{"numFound":4,"start":0,"docs":[
      {
        "id":"d92dae60-626f-468c-b414-eef3e4faebc9",
        "iri":"http://dbpedia.org/resource/Buffalo,_Minnesota",
        "label":"Buffalo, Minnesota",
        "latlng":"45.17194444444444,-93.87472222222222"},
      {
        "id":"1998f22b-51f0-4002-854f-f32790e60297",
        "iri":"http://dbpedia.org/resource/Buffalo_Municipal_Airport_(Minnesota)",
        "label":"Buffalo Municipal Airport (Minnesota)",
        "latlng":"45.159166666666664,-93.84333333333333"},
      {
        "id":"30abd9e0-8766-4cdd-8c9e-9834c8c4bbb9",
        "iri":"http://dbpedia.org/resource/Dickinson,_Minnesota",
        "label":"Dickinson, Minnesota",
        "latlng":"45.117777777777775,-93.81194444444445"},
      {
        "id":"43730baf-f3e8-4883-b597-588293133909",
        "iri":"http://dbpedia.org/resource/Buffalo_High_School_(Buffalo,_Minnesota)",
        "label":"Buffalo High School (Buffalo, Minnesota)",
        "latlng":"45.1824869,-93.829996"}]
  }}

2) Allow me to find any poi by its label and centre the map there.

http://localhost:8983/solr/geo.pois/select?q=Airport&wt=json

{
  "responseHeader":{
    "status":0,
    "QTime":83,
    "params":{
      "indent":"true",
      "q":"Airport",
      "wt":"json"}},
  "response":{"numFound":5396,"start":0,"docs":[
      {
        "id":"78a00940-393b-4be7-bf21-17e485d9fb4b",
        "iri":"http://dbpedia.org/resource/Airport/Facility_Directory",
        "label":"Airport",
        "latlng":"34.942,-90.775"},
      {
        "id":"ee8b02d5-eb65-4338-abe7-4d449c2d2760",
        "iri":"http://dbpedia.org/resource/Ukunda_Airport",
        "label":"Ukunda Airport",
        "latlng":"-4.296944444444445,39.57138888888889"},
...

The above query will match any document with a label field matching Airport, we can then pluck out the lat/lng value and center our map on the most relevant result.

3) If you want to search for a particular POI within a particular area, you combine the two above. Use the default query field (label in our case) but also supply the fq parameter.

Tip: For the spatial queries, if you want the results to come back sorted by distance from a particular point add a sort parameter to the query of the form &sort=geodist() asc. Be careful though as you need to add sfield,d and pt parameters to the query string also. So you will end up with:

http://localhost:8983/solr/geo.pois/select?q=Airport&amp;fq={!geofilt}&amp;sort=geodist() asc&amp;wt=json&amp;indent=true&amp;spatial=true&amp;pt=45.15,-93.85&amp;sfield=latlng&amp;d=5

Working with Clients

Once we have our schema set up it is now time to decide how we are going to interact with our Solr nodes. If we look at the solr query extension in CQL first.

cqlsh:geo> select id,label,latlng from pois where solr_query='{!geofilt sfield=latlng pt=45.15,-93.85 d=5}' limit 10;
 id                                   | label                                    | latlng
--------------------------------------+------------------------------------------+---------------------------------------
 43730baf-f3e8-4883-b597-588293133909 | Buffalo High School (Buffalo, Minnesota) |                 45.1824869,-93.829996
 30abd9e0-8766-4cdd-8c9e-9834c8c4bbb9 |                     Dickinson, Minnesota | 45.117777777777775,-93.81194444444445
 1998f22b-51f0-4002-854f-f32790e60297 |    Buffalo Municipal Airport (Minnesota) | 45.159166666666664,-93.84333333333333
 d92dae60-626f-468c-b414-eef3e4faebc9 |                       Buffalo, Minnesota |  45.17194444444444,-93.87472222222222

In this case we would merely specify a solr_query in the where clause in a normal statement and use the DataStax java driver to query our search nodes. The only problem with this approach is I can’t do sort by distance from the center-point. I wouldn’t use the above method in practice, but I’m calling it out here for demonstration purposes.

The other option is to use Solrj for querying. This is my preferred option as it gives us more complete Solr query features. When connecting to a Solr node in the cluster, DSE automagically routes you to one of the solr nodes providing automatic load balancing across the Solr nodes. This means using the Solrj client should work seamlessly with our cluster by specifying a single IP/HOST and let DSE handle the routing thereafter.

Typically we would be formulating a query in java in the following way.

SolrQuery query = new SolrQuery();
query.setQuery( "*:*" );
query.addFilterQuery("{!geofilt sfield=latlng pt=45.15,-93.85 d=5}");
query.addSort("geodist()", ORDER.asc);

As a note, any write to a table in cassandra via CQL or our index via Solr will automatically get propagated to both. So it may be simpler to use a library for query only such as Solrj and use the standard Datastax driver for writing data. But the option is there to do either.

Tip: Near-realtime soft-commit indexing is not enabled by default in DSE. But you can turn it on. If you are going to be writing new rows at high velocity you might consider using it. See here.

Conclusion

Firstly, a quick comment on storage, with my almost half million points of interest inserted, my data directories looked as follows (I cleaned down snapshots and ran compaction first):

$ du -Hs solr.data
350568	solr.data
$ du -Hs geo
216760	geo

The index does come at a storage cost. But this would be the case regardless. As lucene stores additional fields for spatial indexing the above isn’t too scary at that ratio.

Having implemented geo spatial searching before and experienced the complications of managing two disparate data sources I find the DSE solution very elegant and fast to use, the above took a couple of hours to import the data and see real results coming back to my API server.

Financially speaking, if you can’t afford the license fee for DSE then SolrCloud or ElasticSearch may serve, but as you’d expect, you will have to code around scenarios where they are out of sync or make sure you have a strategy to keep them consistent over time.

About Us: DigBigData offer certified development, administration and training services for Apache Cassandra. Niall Milton is a Systems Architect by trade with a passion for robust and scalable systems.

Disclaimer: We are a DataStax partner but the tech speaks for itself.