Streaming JSON

Sometimes data stored in memory can become very large, for example the pattern of pagination here:

# ssva.lt energinio naudingumo registras

# from pprint import pprint
import re
import json
from dphelper import DPHelper

helper = DPHelper(is_verbose=True)
headers = helper.create_headers(authority='ssva.lt')

def generate_url(page_number, PAGE_SIZE=10000):
    return f'https://www.ssva.lt/registrai/pensreg/pensert_list.php?goto={page_number}&pagesize={PAGE_SIZE}'

PAGINATION_DATA_PATTERN = ('Toliau</a></li>.*?pageNum="(.*?)" >Pabaiga</a>')

DATA_PATTERN = (
    '<tr  id="gridRow.*?class="r-gridrow">'
    '<tddata-record-id=".*?".*?SertifikatoNr".*?val="(.*?)" >' # sert nr
    '.*?"IsdavimoD".*?val="(.*?)" >' # isdavimo data
    '.*?GaliojimoD".*?val="(.*?)" >' # galiojimo data
    '.*?UnikalusNr".*?val="(.*?)" >' # unikalus nr
    '.*?Adresas".*?val="(.*?)" >' # adresas
    '.*?Paskirtis.*?val="(.*?)" >' # paskirtis
    '.*?PEN".*?val="(.*?)" >' # PEN
    '.*?Ap" >.*?val="(.*?)" >' # sildomas plotas
    '.*?Q".*?val="(.*?)" >' # E.sanaudos
    '.*?Hsrc".*?val="(.*?)" >' # silumos saltinis
    '.*?Pastaba".*?val="(.*?)" ></span></span></td></tr>' # pastaba
)

def get_page_count():
    FIRST_PAGE_URL = generate_url(1)
    raw_content = helper.from_url(FIRST_PAGE_URL, headers=headers)
    rg = re.compile(PAGINATION_DATA_PATTERN)
    last_page_nr = rg.findall(raw_content)
    return int(last_page_nr[0])

if __name__ == "__main__":
    last_page_nr = get_page_count()
    # Compile is slow, so we do it once
    rg = re.compile(DATA_PATTERN)

    data = []
    for page_number in range(1, last_page_nr + 1):
        url_for_read = generate_url(page_number)
        # print(url_for_read)
        page_content = helper.from_url(url_for_read, headers=headers)
        results = rg.findall(page_content)
        data.extend(results)
        print(json.dumps(results, indent=2))

    print(json.dumps(data, indent=2))
 

In such scenarios it can be useful to define this as streamin g response from json_stream pip package. Basic example:

import sys
import json

from json_stream import streamable_list

def test_stream():
    for i in range(20):
        yield i
 

# wrap existing iterable
data = streamable_list(test_stream())

# consume iterable with standard json.dump()
json.dump(data, sys.stdout)

Applying example to this scenario:

# ssva.lt energinio naudingumo registras

# from pprint import pprint
import re
import json
import sys
from dphelper import DPHelper


from json_stream import streamable_list

helper = DPHelper(is_verbose=True)
headers = helper.create_headers(authority='ssva.lt')

def generate_url(page_number, PAGE_SIZE=100):
    return f'https://www.ssva.lt/registrai/pensreg/pensert_list.php?goto={page_number}&pagesize={PAGE_SIZE}'

PAGINATION_DATA_PATTERN = ('Toliau</a></li>.*?pageNum="(.*?)" >Pabaiga</a>')

DATA_PATTERN = (
    '<tr  id="gridRow.*?class="r-gridrow">'
    '<tddata-record-id=".*?".*?SertifikatoNr".*?val="(.*?)" >' # sert nr
    '.*?"IsdavimoD".*?val="(.*?)" >' # isdavimo data
    '.*?GaliojimoD".*?val="(.*?)" >' # galiojimo data
    '.*?UnikalusNr".*?val="(.*?)" >' # unikalus nr
    '.*?Adresas".*?val="(.*?)" >' # adresas
    '.*?Paskirtis.*?val="(.*?)" >' # paskirtis
    '.*?PEN".*?val="(.*?)" >' # PEN
    '.*?Ap" >.*?val="(.*?)" >' # sildomas plotas
    '.*?Q".*?val="(.*?)" >' # E.sanaudos
    '.*?Hsrc".*?val="(.*?)" >' # silumos saltinis
    '.*?Pastaba".*?val="(.*?)" ></span></span></td></tr>' # pastaba
)

def get_page_count():
    FIRST_PAGE_URL = generate_url(1)
    raw_content = helper.from_url(FIRST_PAGE_URL, headers=headers)
    rg = re.compile(PAGINATION_DATA_PATTERN)
    last_page_nr = rg.findall(raw_content)
    return int(last_page_nr[0])

def yield_data():
    last_page_nr = get_page_count()
    rg = re.compile(DATA_PATTERN)

    last_page_nr = 5

    for page_number in range(1, last_page_nr + 1):
        url_for_read = generate_url(page_number)
        page_content = helper.from_url(url_for_read, headers=headers)
        results = rg.findall(page_content)

        for result in results:
            yield result
       

if __name__ == "__main__":
    # wrap existing iterable
    data = streamable_list(yield_data())

    # consume iterable with standard json.dump()
    json.dump(data, sys.stdout)

Enforcing data freshness

Sometimes we reuse output from other pipelines using this pattern:

helper = DPHelper(is_verbose=True) 
r = helper.snapshot.get_latest(by_challenge_id=140) 

We do have staleness checks on the pipelines. However, if pipeline reuses other pipeline data the pattern above would silently fail.

To avoid that situation, since dphelper v0.16 version we added optional parameter max_stale_days.

First, make sure you update your pip package localy.

pip3 install dphelper -U

Check your package version

pip show dphelper

If everything OK, then use this pattern

helper = DPHelper(is_verbose=True) 
r = helper.snapshot.get_latest(by_challenge_id=140, max_stale_days=14) 

Sharing charts

Sometimes we want to visualise data, in the form of infographic or chart/dashboard. For example, this is interactive chart depicting salaries growth rate:

This could be used in media articles as well to back content with (live) data.

Infogram tool

We use infogram tool for data visualisation as it allows to share charts easily and have quite big collection of charts available.

Example: line chart

We will use example above (charts) to demonstrate how to build interactive chart.

Lets build static chart first. Pick line chart, and add data manually:

To keep it interactive you will need to use JSON feeds option.

Important: works for static snapshot, for live updates (i.e. update real time) it requires paid version of app.

Data should be in the proper JSON format with sheets as outer dimensions (similar to spreadsheets in Infogram), followed by rows and columns.

[[["Index of Happiness","09:00","10:00","11:00","12:00","13:00"],["Eden",794,404,301,563,406],["Shambhala",470,940,142,575,294],["Avalon",332,914,860,616,741],["Camelot",351,686,720,447,467],["El Dorado",863,353,612,872,533],["Atlantis",382,882,240,320,420]]]

Here is the API link which returns JUST data, without other meta information (snapshot created date, etc). You will have to use this in Infogram tool when prompted “Enter JSON feed URL” (replace challenge with your ID):

https://api.dataplatform.lt/snapshots/latest/json/?challenge_id=459

You can inspect results here. Here is example which would produce data in required format in data platform


import requests
import json


def get_last_snapshot(challenge_id):
  api_url = 'https://data-platform-backend-4ddpl.ondigitalocean.app/snapshots/latest/?challenge_id=%d' % challenge_id
  response = requests.get(api_url)
  data = response.json()
  return data.get('json_data', [])

payroll_data = get_last_snapshot(237)

location = 'Vilnius'
payroll_data = list(filter(lambda p: p.get('location') == location and p.get('salary') == 'Bruto' and p.get('company_status') == 'With', payroll_data))
payroll_data.sort(key=lambda d: d.get('date'))

results = []
results.append(['Data', 'Alga'])
for item in payroll_data:
  results.append([item.get('date'), item.get('value')])

combined_results = [results]
print(json.dumps(combined_results))

Index and crawling pipelines

For use cases like listing portals, e-commerce (eshops), typical pattern is having very many pages in the system. Write one pipeline which crawls entire website and produces urls, while other populates data using those urls.

In a lot of cases even in indexing stage, you can find most of required information, so it would be faster.  Furthermore, this will require less load on target website and decrease chance of being blocked.

Tip: please refer to section how to reuse data from another pipeline in our plaform.

Data from APIs

Sometimes data for web page is loaded dynamically, i.e. on demand. This can be done for performance reasons, to make initial load faster. Sometimes you can identify this pattern by observing spinners or loaders, or content appearing incrementally:

On the browser, this can be observed by inspecting Network tab of debugger tools. For example, on Chrome, you can find Network tab by clicking on menu “View > Developer > Developer Tools” and going into Network tab

. Watch screencast

Having problems finding the right request? Try searching by data

Now you can find API request which yields required data

curl https://gedimino37.lt//catalog.php

Which would (in this case) would yield structured data:

[{"id":"B0.1","status":"sold","direction":"PR","floor":"0","size":"87,69","rooms":"6}]

The code snippet in Python to retrieve this kind of data:

import json
from dphelper import DPHelper

helper = DPHelper()
headers = helper.create_headers(authority="gedimino37.lt")

content = helper.from_url('https://gedimino37.lt//catalog.php', headers=headers)
data = json.loads(content)

Or, without or helper library (using standard requests library):

import requests
from dphelper import DPHelper

helper = DPHelper()
headers = helper.create_headers(authority="gedimino37.lt")

r = requests.get('https://gedimino37.lt/catalog.php', headers=headers)
data = r.json()