Skip to content

Examples

Here, we'll have a look at implementation geoserverx asynchronous Class

Get started

To start using geoserverx in Async mode, create a new instance of AsyncGeoServerX Class

Setup Class instance

AsyncGeoServerX Class has default username, password, url which points to default GeoServer settings.

# Import class from package
from geoserverx._async.gsx import AsyncGeoServerX
import asyncio
# Create class Instance with default paramaters
client = AsyncGeoServerX()
We'll assume connection to local GeoServer with default credentials

Get all workspaces

from geoserverx._async.gsx import AsyncGeoServerX
import asyncio

async def get_info_raster_workspaces(url, username, password):
    print("-------------start-----------------")
    client = AsyncGeoServerX(username, password,url)
    print(await client.get_all_workspaces())

async def main():
    await asyncio.gather(get_info_raster_workspaces(url='http://localhost:8080/geoserver/rest/',username='admin', password='GeoServer'),get_info_raster_workspaces(url='http://locahost:8080/geoserver/rest',username='admin', password='myP'),get_info_raster_workspaces(url='http://localhost:8080/geoserver/rest/',username='admin', password='GeoServer'))

asyncio.run(main())

''' Console 
-------------start-----------------
-------------start-----------------
-------------start-----------------
workspaces=workspaceDict(workspace=[WorkspaceInBulk(name='MySimple', href='http://localhost:8080/geoserver/rest/workspaces/MySimple.json'), WorkspaceInBulk(name='MyHidden', href='http://localhost:8080/geoserver/rest/workspaces/MyHidden.json'), WorkspaceInBulk(name='MyDefault', href='http://localhost:8080/geoserver/rest/workspaces/MyDefault.json'), WorkspaceInBulk(name='nondefaultws', href='http://localhost:8080/geoserver/rest/workspaces/nondefaultws.json'), WorkspaceInBulk(name='mydefaultws', href='http://localhost:8080/geoserver/rest/workspaces/mydefaultws.json'), WorkspaceInBulk(name='ajadasfasdf', href='http://localhost:8080/geoserver/rest/workspaces/ajadasfasdf.json'), WorkspaceInBulk(name='ajada', href='http://localhost:8080/geoserver/rest/workspaces/ajada.json'), WorkspaceInBulk(name='aja', href='http://localhost:8080/geoserver/rest/workspaces/aja.json'), WorkspaceInBulk(name='cesium', href='http://localhost:8080/geoserver/rest/workspaces/cesium.json')])
'''

Get Information about cesium workspace

from geoserverx._async.gsx import AsyncGeoServerX
import asyncio

async def get_info_raster_workspaces(url, username, password,workspace):
    print("-------------start-----------------")
    client = AsyncGeoServerX(username, password,url)
    print(await client.get_workspace(workspace))

async def main():
    await asyncio.gather(get_info_raster_workspaces(url='http://localhost:8080/geoserver/rest/',username='admin', password='GeoServer',workspace='cesium'))

asyncio.run(main())

''' Console 
-------------start-----------------
workspace=SingleWorkspace(name='cesium', isolated=False, dateCreated='2023-02-13 06:43:28.793 UTC', dataStores='http://localhost:8080/geoserver/rest/workspaces/cesium/datastores.json', coverageStores='http://localhost:8080/geoserver/rest/workspaces/cesium/coveragestores.json', wmsStores='http://localhost:8080/geoserver/rest/workspaces/cesium/wmsstores.json', wmtsStores='http://localhost:8080/geoserver/rest/workspaces/cesium/wmtsstores.json')
'''

Create New workspaces

  • MyDefault - Default and not Isolated
  • MyHidden - Not Default and Isolated
  • MySimple - Not Default and not Isolated
from geoserverx._async.gsx import AsyncGeoServerX
import asyncio

async def create_single_workspaces(url, username, password,workspace,default,isolated):
    print("-------------start-----------------")
    client = AsyncGeoServerX(username, password,url)
    print(await client.create_workspace(workspace, default,isolated))

async def main():
    await asyncio.gather(create_single_workspaces(
        url='http://localhost:8080/geoserver/rest/',username='admin', password='GeoServer',
workspace='AsyncMyDefault',default=True,isolated= False),
create_single_workspaces(
        url='http://localhost:8080/geoserver/rest/',username='admin', password='GeoServer',
workspace='AsyncMyHidden',default=False,isolated= True),
create_single_workspaces(
        url='http://localhost:8080/geoserver/rest/',username='admin', password='GeoServer',
workspace='AsyncMySimple',default=False,isolated= False))

asyncio.run(main())

''' Console 
-------------start-----------------
-------------start-----------------
-------------start-----------------
code=201 response='Data added successfully'
code=201 response='Data added successfully'
code=201 response='Data added successfully'
'''

Get all Vector stores in cesium workspace

from geoserverx._async.gsx import AsyncGeoServerX
import asyncio

async def create_single_workspaces(url, username, password,workspace):
    print("-------------start-----------------")
    client = AsyncGeoServerX(username, password,url)
    print(await client.get_vector_stores_in_workspaces(workspace))

async def main():
    await asyncio.gather(create_single_workspaces(
        url='http://localhost:8080/geoserver/rest/',username='admin', password='GeoServer',
workspace='cesium'))

asyncio.run(main())

''' Console 
-------------start-----------------
dataStores=DataStoreDict(dataStore=[DataStoreInBulk(name='mygpkgs', href='http://localhost:8080/geoserver/rest/workspaces/cesium/datastores/mygpkgs.json'), DataStoreInBulk(name='myshp', href='http://localhost:8080/geoserver/rest/workspaces/cesium/datastores/myshp.json'), DataStoreInBulk(name='mysql', href='http://localhost:8080/geoserver/rest/workspaces/cesium/datastores/mysql.json')])
'''

Get Information of Vector store myshp in cesium workspace

from geoserverx._async.gsx import AsyncGeoServerX
import asyncio

async def get_info_vector_workspaces(url, username, password,workspace,store):
    print("-------------start-----------------")
    client = AsyncGeoServerX(username, password,url)
    print(await client.get_vector_store(workspace,store))

async def main():
    await asyncio.gather(get_info_vector_workspaces(
        url='http://localhost:8080/geoserver/rest/',username='admin', password='GeoServer',
workspace='cesium',store='myshp'))

asyncio.run(main())


''' Console 
-------------start-----------------
dataStore=DataStoreModelDetails(name='myshp', description=None, enabled=True, workspace=WorkspaceInBulk(name='cesium', href='http://localhost:8080/geoserver/rest/workspaces/cesium.json'), connectionParameters=EntryItem(entry=[DatastoreConnection(key='namespace', path='cesium'), DatastoreConnection(key='url', path='file:/path/to/nyc.shp')]), dateCreated='2023-02-28 18:14:01.199 UTC', dateModified=None, featureTypes='http://localhost:8080/geoserver/rest/workspaces/cesium/datastores/myshp/featuretypes.json')
'''

Get all Raster stores in cesium workspace

from geoserverx._async.gsx import AsyncGeoServerX
import asyncio

async def get_all_raster_workspaces(url, username, password,workspace):
    print("-------------start-----------------")
    client = AsyncGeoServerX(username, password,url)
    print(await client.get_raster_stores_in_workspaces(workspace))

async def main():
    await asyncio.gather(get_all_raster_workspaces(
        url='http://localhost:8080/geoserver/rest/',username='admin', password='GeoServer',workspace='cesium'))

asyncio.run(main())

''' Console 
-------------start-----------------
coverageStores=CoveragesStoresDict(coverageStore=[CoveragesStoreInBulk(name='dem', href='http://localhost:8080/geoserver/rest/workspaces/cesium/coveragestores/dem.json'), CoveragesStoreInBulk(name='dsm', href='http://localhost:8080/geoserver/rest/workspaces/cesium/coveragestores/dsm.json'), CoveragesStoreInBulk(name='ortho', href='http://localhost:8080/geoserver/rest/workspaces/cesium/coveragestores/ortho.json')])
'''

Get Information of Raster store dsm in cesium workspace

from geoserverx._async.gsx import AsyncGeoServerX
import asyncio

async def get_info_raster_workspaces(url, username, password,workspace,store):
    print("-------------start-----------------")
    client = AsyncGeoServerX(username, password,url)
    print(await client.get_raster_store(workspace,store))

async def main():
    await asyncio.gather(get_info_raster_workspaces(
        url='http://localhost:8080/geoserver/rest/',username='admin', password='GeoServer',workspace='cesium',store='dsm'))

asyncio.run(main())

''' Console 
-------------start-----------------
coverageStore=CoveragesStoreModelDetail(name='dsm', description=None, enabled=True, workspace=WorkspaceInBulk(name='cesium', href='http://localhost:8080/geoserver/rest/workspaces/cesium.json'), url='file:///Users/krishnaglodha/Desktop/IGI_DATA/DSM/IGI_DSM1m1.tif', coverages='http://localhost:8080/geoserver/rest/workspaces/cesium/coveragestores/dsm/coverages.json', dateCreated='2023-02-23 13:39:48.417 UTC', metadata=None)
'''

Get all Styles in GeoServer

from geoserverx._async.gsx import AsyncGeoServerX
import asyncio

async def get_info_raster_workspaces(url, username, password):
    print("-------------start-----------------")
    client = AsyncGeoServerX(username, password,url)
    print(await client.get_all_styles())

async def main():
    await asyncio.gather(get_info_raster_workspaces(
        url='http://localhost:8080/geoserver/rest/',username='admin', password='GeoServer'))

asyncio.run(main())

''' Console 
-------------start-----------------
styles=allStyle(style=[allStyleList(name='burg', href='http://localhost:8080/geoserver/rest/styles/burg.json'), allStyleList(name='capitals', href='http://localhost:8080/geoserver/rest/styles/capitals.json'), allStyleList(name='cite_lakes', href='http://localhost:8080/geoserver/rest/styles/cite_lakes.json'), allStyleList(name='dem', href='http://localhost:8080/geoserver/rest/styles/dem.json'), allStyleList(name='generic', href='http://localhost:8080/geoserver/rest/styles/generic.json'), allStyleList(name='giant_polygon', href='http://localhost:8080/geoserver/rest/styles/giant_polygon.json'), allStyleList(name='grass', href='http://localhost:8080/geoserver/rest/styles/grass.json'), allStyleList(name='green', href='http://localhost:8080/geoserver/rest/styles/green.json'), allStyleList(name='line', href='http://localhost:8080/geoserver/rest/styles/line.json'), allStyleList(name='poi', href='http://localhost:8080/geoserver/rest/styles/poi.json'), allStyleList(name='point', href='http://localhost:8080/geoserver/rest/styles/point.json'), allStyleList(name='poly_landmarks', href='http://localhost:8080/geoserver/rest/styles/poly_landmarks.json'), allStyleList(name='polygon', href='http://localhost:8080/geoserver/rest/styles/polygon.json'), allStyleList(name='pophatch', href='http://localhost:8080/geoserver/rest/styles/pophatch.json'), allStyleList(name='population', href='http://localhost:8080/geoserver/rest/styles/population.json'), allStyleList(name='rain', href='http://localhost:8080/geoserver/rest/styles/rain.json'), allStyleList(name='raster', href='http://localhost:8080/geoserver/rest/styles/raster.json'), allStyleList(name='restricted', href='http://localhost:8080/geoserver/rest/styles/restricted.json'), allStyleList(name='simple_roads', href='http://localhost:8080/geoserver/rest/styles/simple_roads.json'), allStyleList(name='simple_streams', href='http://localhost:8080/geoserver/rest/styles/simple_streams.json'), allStyleList(name='tiger_roads', href='http://localhost:8080/geoserver/rest/styles/tiger_roads.json')])
'''

Get Single Style in GeoServer

from geoserverx._async.gsx import AsyncGeoServerX
import asyncio

async def get_info_raster_workspaces(url, username, password,style):
    print("-------------start-----------------")
    client = AsyncGeoServerX(username, password,url)
    print(await client.get_style(style))

async def main():
    await asyncio.gather(get_info_raster_workspaces(
        url='http://localhost:8080/geoserver/rest/',username='admin', password='GeoServer',style='poi'))

asyncio.run(main())
''' Console 
-------------start-----------------
style=SingleStyle(name='poi', format='sld', languageVersion=langVersion(version='1.0.0'), filename='poi.sld')
'''