was successfully added to your cart.

    Serving Spark NLP via API (2/3): FastAPI and LightPipelines

    This is the second article of the “Serving Spark NLP via API” series, showcasing how to serve Spark NLP using FastAPI and LightPipelines for a quick inference. Don’t forget to check the other articles in this series, namely:

    Background

    Spark NLP is a Natural Language Understanding Library built on top of Apache Spark, leveraging Spark MLLib pipelines, that allows you to run NLP models at scale, including SOTA Transformers. Therefore, it’s the only production-ready NLP platform that allows you to go from a simple PoC on 1 driver node, to scale to multiple nodes in a cluster, to process big amounts of data, in a matter of minutes.

    Before starting, if you want to know more about all the advantages of using Spark NLP (such as the ability to work at scale on air-gapped environments, for instance) we recommend you to take a look at the following resources:

    Motivation

    Spark NLP is server-agnostic, which means it does not come with an integrated API server but offers a lot of options to serve NLP models using Rest APIs.

    There is a wide range of possibilities to add a web server and serve Spark NLP pipelines using RestAPI, and in this series of articles, we are only describing some of them.

    Let’s have an overview of how to use Microsoft’s Synapse ML as an example for that purpose.

    FASTAPI AND SPARK NLP LIGHTPIPELINES

    Fast API serving of Spark NLP pipelines

    FastAPI is, as defined by the creators…

    …a modern, fast (high-performance), web framework for building APIs with Python 3.6+ based on standard Python type hints.

    FastAPI provides very good latency and response times that all along with the good performance of Spark NLP LightPipelines, makes this option the quickest one of the four described in the article.

    Read more about the performance advantages of using *LightPipelines *in this article created by John Snow Labs Data Scientist Lead Veysel Kocaman.

    Strengths

    • Quickest approach
    • Adds flexibility to build and adapt a custom API for your models

    Weaknesses

    • LightPipelines are executed sequentially and don’t leverage the distributed computation that Spark Clusters provide.
    • As an alternative, you can use FastAPI with default pipelines and a custom LoadBalancer, to distribute the calls over your cluster nodes.

    You can serve SparkNLP + FastAPI on Docker. To do that, we will create a project with the following files:

    • Dockerfile: Image for creating a SparkNLP + FastAPI Docker image
    • requirements.txt: PIP Requirements
    • entrypoint.sh: Dockerfile entrypoint
    • content/: folder containing FastAPI webapp and SparkNLP keys
    • content/main.py: FastAPI webapp, entrypoint
    • content/sparknlp_keys.json: SparkNLP keys (for Healthcare or OCR)

    Dockerfile

    The aim of this file is to create a suitable Docker Image with all the OS and Python libraries required to run SparkNLP. Also, adds an entry endpoint for the FastAPI server (see below) and the main folder containing the actual code to run a pipeline on an input text and return the expected values.

        FROM ubuntu:18.04
    RUN apt-get update && apt-get -y update
    
    RUN apt-get -y update \
        && apt-get install -y wget \
        && apt-get install -y jq \
        && apt-get install -y lsb-release \
        && apt-get install -y openjdk-8-jdk-headless \
        && apt-get install -y build-essential python3-pip \
        && pip3 -q install pip --upgrade \
        && apt-get clean && rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* \
                /usr/share/man /usr/share/doc /usr/share/doc-base
    
    ENV PYSPARK_DRIVER_PYTHON=python3
    ENV PYSPARK_PYTHON=python3
    
    ENV LC_ALL=C.UTF-8
    ENV LANG=C.UTF-8
    
    # We expose the FastAPI default port 8515
    EXPOSE 8515
    
    # Install all Python required libraries
    COPY requirements.txt /
    RUN pip install -r /requirements.txt
    
    # Adds the entrypoint to the FastAPI server
    COPY entrypoint.sh /
    RUN chmod +x /entrypoint.sh
    
    # In /content folder we will have our main.py and the license files
    COPY ./content/ /content/
    WORKDIR content/
    
    # We tell Docker to run this file when a container is instantiated
    ENTRYPOINT ["/entrypoint.sh"]

    requirements.txt

    This file describes which Python libraries will be required when creating the Docker image to run Spark NLP on FastAPI.

    **pyspark**==3.1.2
    **fastapi**==0.70.1
    **uvicorn**==0.16
    **wget**==3.2
    **pandas**==1.4.1

    entrypoint.sh

    This file is the entry point of our Docker container, which carries out the following actions:

    1. Takes the sparknlp_keys.json and exports its values as environment variables, as required by Spark NLP for Healthcare.
    2. Installs the proper version of Spark NLP for Healthcare, getting the values from the license keys we have just exported in the previous step.
    3. Runs the main.py file, that will load the pipelines and create and endpoint to serve them.
        #!/bin/bash
    
    # Load the license from sparknlp_keys.json and export the values as OS variables
    export_json () {
    for s in $(echo $values | jq -r 'to_entries|map("\(.key)=\(.value|tostring)")|.[]' $1 ); do
        export $s
    done
    }
    
    export_json "/content/sparknlp_keys.json"
    
    # Installs the proper version of Spark NLP for Healthcare
    pip install --upgrade spark-nlp-jsl==$JSL_VERSION --user --extra-index-url [https://pypi.johnsnowlabs.com/$SECRET](https://pypi.johnsnowlabs.com/$SECRET)
    
    if [ $? != 0 ];
    then
    exit 1
    fi
    
    # Script to create FastAPI endpoints and preloading pipelines for inference
    python3 /content/main.py

    content/main.py: Serving 2 pipelines in a FastAPI endpoint

    To maximize the performance and minimize the latency, we are going to store two Spark NLP pipelines in memory, so that we load only once (at server start) and we just use them everytime we get an API request to infer.

    To do this, let’s create a content/main.py Python script to download the required resources, store them in memory and serve them in Rest API endpoints.

    First, the import section

    import uvicorn, json, os
    from fastapi import FastAPI
    from sparknlp.annotator import *
    from sparknlp_jsl.annotator import *
    from sparknlp.base import *
    import sparknlp, sparknlp_jsl
    from sparknlp.pretrained import PretrainedPipeline
    
    app = FastAPI()
    pipelines = {}

    Then, let’s define the endpoint to serve the pipeline:

    @app.get("/benchmark/pipeline")
    async def get_one_sequential_pipeline_result(modelname, text=''):
    return pipelines[modelname].annotate(text)

    Then, the startup event to preload the pipelines and start a Spark NLP Session:

    @app.on_event("startup")
    async def startup_event():
        with open('/content/sparknlp_keys.json', 'r') as f:
            license_keys = json.load(f)
    
    spark = sparknlp_jsl.start(secret=license_keys['SECRET'])
        
    pipelines['ner_profiling_clinical'] = PretrainedPipeline('ner_profiling_clinical', 'en', 'clinical/models')
        
    pipelines['clinical_deidentification'] = PretrainedPipeline("clinical_deidentification", "en", "clinical/models")

    Finally, let’s run a uvicorn server, listening on port 8515 to the endpoints declared before:

    if __name__ == "__main__":
    uvicorn.run('main:app', host='0.0.0.0', port=8515)

    content/sparknlp_keys.json

    For using Spark NLP for Healthcare, please add your Spark NLP for Healthcare license keys to content/sparknlp_keys.jsonDThe file is ready, you only need to fulfill with your own values taken from the JSON file John Snow Labs has provided you with.

    {
        "AWS_ACCESS_KEY_ID": "",
        "AWS_SECRET_ACCESS_KEY": "",
        "SECRET": "",
        "SPARK_NLP_LICENSE": "",
        "JSL_VERSION": "",
        "PUBLIC_VERSION": ""
    }

    And now, let’s run the server!

    1. Creating the Docker image and running the container

    docker build -t johnsnowlabs/sparknlp:sparknlp_api .
    
    docker run -v jsl_keys.json:/content/sparknlp_keys.json -p 8515:8515 -it johnsnowlabs/sparknlp:sparknlp_api

    2. Consuming the API using a Python script

    Lets import some libraries

    import requests
    import time

    Then, let’s create a clinical note

    ner_text = """
    A 28-year-old female with a history of gestational diabetes mellitus diagnosed eight years prior to presentation and subsequent type two diabetes mellitus ( T2DM ), one prior episode of HTG-induced pancreatitis three years prior to presentation , associated with an acute hepatitis , and obesity with a body mass index ( BMI ) of 33.5 kg/m2 , presented with a one-week history of polyuria , polydipsia , poor appetite , and vomiting. The patient was prescribed 1 capsule of Advil 10 mg for 5 days and magnesium hydroxide 100mg/1ml suspension PO. 
    He was seen by the endocrinology service and she was discharged on 40 units of insulin glargine at night , 12 units of insulin lispro with meals , and metformin 1000 mg two times a day.
    """

    We have preloaded and served two Pretrained Pipelines: clinical_deidentification and ner_profiling_clinical . In modelname, let’s set which one we want to check

    # Change this line to execute any of the two pipelines
    modelname = 'clinical_deidentification'
    # modelname = 'ner_profiling_clinical'

    And finally, let’s use the requestslibrary to send a test request to the endpoint and get the results.

    query = f"?modelname={modelname}&text={ner_text}"
    url = f"http://localhost:8515/benchmark/pipeline{query}"
    
    print(requests.get(url))

    Results (original and deidentified texts in json format)

    >> {
        'masked': ['A  female with a history of gestational diabetes mellitus diagnosed ...],
    
        'obfuscated': ['A 48 female with a history of gestational diabetes mellitus diagnosed ...'],
    
        'ner_chunk': ['28-year-old'],
    
        'sentence': ['A 28-year-old female with a history of gestational diabetes mellitus diagnosed ...']
    }

    You can also prettify the json using the following function with the result of the annotate() function:

    def explode_annotate(ann_result):
    '''
    Function to convert result object to json
    input: raw result
    output: processed result dictionary
    '''
    result = {}
    for column, ann in ann_result[0].items():
        result[column] = []
        for lines in ann:
            content = {
                "result": lines.result,
                "begin": lines.begin,
                "end": lines.end,
                "metadata": dict(lines.metadata),
            }
            result[column].append(content)
    return result

    Do you want to know more?

    How useful was this post?

    Try Healthcare NLP

    See in action
    Our additional expert:
    Juan Martinez is a Sr. Data Scientist, working at John Snow Labs since 2021. He graduated from Computer Engineering in 2006, and from that time on, his main focus of activity has been the application of Artificial Intelligence to texts and unstructured data. To better understand the intersection between Language and AI, he complemented his technical background with a Linguistics degree from Moscow Pushkin State Language Institute in 2012 and later on on University of Alcala (2014). He is part of the Healthcare Data Science team at John Snow Labs. His main activities are training and evaluation of Deep Learning, Semantic and Symbolic models within the Healthcare domain, benchmarking, research and team coordination tasks. His other areas of interest are Machine Learning operations and Infrastructure.

    Rule-Based and Pattern Matching for Entity Recognition in Spark NLP

    Finding patterns and matching strategies are well-known NLP procedures to extract information from text. Spark NLP library has two annotators that can...
    preloader