Obsei: An open-source low-code AI powered automation tool





Obsei (pronounced "Ob see" | /əb-'sē/) is an open-source, low-code, AI powered automation tool. Obsei consists of -

All the Observers can store their state in databases (Sqlite, Postgres, MySQL, etc.), making Obsei suitable for scheduled jobs or serverless applications.

Obsei diagram

Future direction -

Introduction and demo video

Introduction and demo video

Use cases

Obsei use cases are following, but not limited to -

Companies/Projects using Obsei

Here are some companies/projects (alphabetical order) using Obsei. To add your company/project to the list, please raise a PR or contact us via email.

Tutorials

Demo

We have a minimal streamlit based UI that you can use to test Obsei.

Screenshot

Watch UI demo video

Introductory and demo video

Check demo at or at Demo Link

(Note: Sometimes the Streamlit demo might not work due to rate limiting, use the docker image (locally) in such cases.)

To test locally, just run

docker run -d --name obesi-ui -p 8501:8501 obsei/obsei-ui-demo

# You can find the UI at http://localhost:8501

To run Obsei workflow easily using GitHub Actions (no sign ups and cloud hosting required), refer to this repo.

Documentation

For detailed installation instructions, usages and examples, refer to our documentation.

Support and Release Matrix

1. On Windows you have to install pytorch manually. Refer to the Pytorch official instruction.

2. Conda channel is missing few dependencies, hence install missing dependencies manually -

pip install presidio-analyzer
pip install presidio-anonymizer
pip install zenpy
pip install searchtweets-v2
pip install google-play-scraper
pip install tweet-preprocessor
pip install gnews
pip install python-facebook-api
# GPL dependency
pip install trafilatura

How to use

Expand the following steps and create your workflow -

Install the following (if not present already) -

You can install Obsei either via PIP or Conda based on your preference.

NOTE: On Windows you have to install pytorch manually. Refer to https://pytorch.org/get-started/locally/.

Install via PIP:

To install latest released version -

pip install obsei

Install from master branch (if you want to try the latest features) -

git clone https://github.com/obsei/obsei.git
cd obsei
pip install --editable .

Install via Conda:

To install the latest version -

conda install -c lalitpagaria obsei

Install from master branch (if you want to try the latest features) -

git clone https://github.com/obsei/obsei.git
cd obsei
conda env create -f conda/environment.yml

For GPU based local environment -

git clone https://github.com/obsei/obsei.git
cd obsei
conda env create -f conda/gpu-environment.yml
from obsei.source.twitter_source import TwitterCredentials, TwitterSource, TwitterSourceConfig

# initialize twitter source config
source_config = TwitterSourceConfig(
   keywords=["issue"], # Keywords, @user or #hashtags
   lookup_period="1h", # Lookup period from current time, format: `<number><d|h|m>` (day|hour|minute)
   cred_info=TwitterCredentials(
       # Enter your twitter consumer key and secret. Get it from https://developer.twitter.com/en/apply-for-access
       consumer_key="<twitter_consumer_key>",
       consumer_secret="<twitter_consumer_secret>",
       bearer_token='<ENTER BEARER TOKEN>',
   )
)

# initialize tweets retriever
source = TwitterSource()
from obsei.source.youtube_scrapper import YoutubeScrapperSource, YoutubeScrapperConfig

# initialize Youtube source config
source_config = YoutubeScrapperConfig(
    video_url="https://www.youtube.com/watch?v=uZfns0JIlFk", # Youtube video URL
    fetch_replies=True, # Fetch replies to comments
    max_comments=10, # Total number of comments and replies to fetch
    lookup_period="1Y", # Lookup period from current time, format: `<number><d|h|m|M|Y>` (day|hour|minute|month|year)
)

# initialize Youtube comments retriever
source = YoutubeScrapperSource()
from obsei.source.facebook_source import FacebookCredentials, FacebookSource, FacebookSourceConfig

# initialize facebook source config
source_config = FacebookSourceConfig(
   page_id="110844591144719", # Facebook page id, for example this one for Obsei
   lookup_period="1h", # Lookup period from current time, format: `<number><d|h|m>` (day|hour|minute)
   cred_info=FacebookCredentials(
       # Enter your facebook app_id, app_secret and long_term_token. Get it from https://developers.facebook.com/apps/
       app_id="<facebook_app_id>",
       app_secret="<facebook_app_secret>",
       long_term_token="<facebook_long_term_token>",
   )
)

# initialize facebook post comments retriever
source = FacebookSource()
from obsei.source.email_source import EmailConfig, EmailCredInfo, EmailSource

# initialize email source config
source_config = EmailConfig(
   # List of IMAP servers for most commonly used email providers
   # https://www.systoolsgroup.com/imap/
   # Also, if you're using a Gmail account then make sure you allow less secure apps on your account -
   # https://myaccount.google.com/lesssecureapps?pli=1
   # Also enable IMAP access -
   # https://mail.google.com/mail/u/0/#settings/fwdandpop
   imap_server="imap.gmail.com", # Enter IMAP server
   cred_info=EmailCredInfo(
       # Enter your email account username and password
       username="<email_username>",
       password="<email_password>"
   ),
   lookup_period="1h" # Lookup period from current time, format: `<number><d|h|m>` (day|hour|minute)
)

# initialize email retriever
source = EmailSource()
from obsei.source.google_maps_reviews import OSGoogleMapsReviewsSource, OSGoogleMapsReviewsConfig

# initialize Outscrapper Maps review source config
source_config = OSGoogleMapsReviewsConfig(
   # Collect API key from https://outscraper.com/
   api_key="<Enter Your API Key>",
   # Enter Google Maps link or place id
   # For example below is for the "Taj Mahal"
   queries=["https://www.google.co.in/maps/place/Taj+Mahal/@27.1751496,78.0399535,17z/data=!4m5!3m4!1s0x39747121d702ff6d:0xdd2ae4803f767dde!8m2!3d27.1751448!4d78.0421422"],
   number_of_reviews=10,
)


# initialize Outscrapper Maps review retriever
source = OSGoogleMapsReviewsSource()
from obsei.source.appstore_scrapper import AppStoreScrapperConfig, AppStoreScrapperSource

# initialize app store source config
source_config = AppStoreScrapperConfig(
   # Need two parameters app_id and country.
   # `app_id` can be found at the end of the url of app in app store.
   # For example - https://apps.apple.com/us/app/xcode/id497799835
   # `310633997` is the app_id for xcode and `us` is country.
   countries=["us"],
   app_id="310633997",
   lookup_period="1h" # Lookup period from current time, format: `<number><d|h|m>` (day|hour|minute)
)


# initialize app store reviews retriever
source = AppStoreScrapperSource()
from obsei.source.playstore_scrapper import PlayStoreScrapperConfig, PlayStoreScrapperSource

# initialize play store source config
source_config = PlayStoreScrapperConfig(
   # Need two parameters package_name and country.
   # `package_name` can be found at the end of the url of app in play store.
   # For example - https://play.google.com/store/apps/details?id=com.google.android.gm&hl=en&gl=US
   # `com.google.android.gm` is the package_name for xcode and `us` is country.
   countries=["us"],
   package_name="com.google.android.gm",
   lookup_period="1h" # Lookup period from current time, format: `<number><d|h|m>` (day|hour|minute)
)

# initialize play store reviews retriever
source = PlayStoreScrapperSource()
from obsei.source.reddit_source import RedditConfig, RedditSource, RedditCredInfo

# initialize reddit source config
source_config = RedditConfig(
   subreddits=["wallstreetbets"], # List of subreddits
   # Reddit account username and password
   # You can also enter reddit client_id and client_secret or refresh_token
   # Create credential at https://www.reddit.com/prefs/apps
   # Also refer https://praw.readthedocs.io/en/latest/getting_started/authentication.html
   # Currently Password Flow, Read Only Mode and Saved Refresh Token Mode are supported
   cred_info=RedditCredInfo(
       username="<reddit_username>",
       password="<reddit_password>"
   ),
   lookup_period="1h" # Lookup period from current time, format: `<number><d|h|m>` (day|hour|minute)
)

# initialize reddit retriever
source = RedditSource()

Note: Reddit heavily rate limit scrappers, hence use it to fetch small data during long period

from obsei.source.reddit_scrapper import RedditScrapperConfig, RedditScrapperSource

# initialize reddit scrapper source config
source_config = RedditScrapperConfig(
   # Reddit subreddit, search etc rss url. For proper url refer following link -
   # Refer https://www.reddit.com/r/pathogendavid/comments/tv8m9/pathogendavids_guide_to_rss_and_reddit/
   url="https://www.reddit.com/r/wallstreetbets/comments/.rss?sort=new",
   lookup_period="1h" # Lookup period from current time, format: `<number><d|h|m>` (day|hour|minute)
)

# initialize reddit retriever
source = RedditScrapperSource()
from obsei.source.google_news_source import GoogleNewsConfig, GoogleNewsSource

# initialize Google News source config
source_config = GoogleNewsConfig(
   query='bitcoin',
   max_results=5,
   # To fetch full article text enable `fetch_article` flag
   # By default google news gives title and highlight
   fetch_article=True,
   # proxy='http://127.0.0.1:8080'
)

# initialize Google News retriever
source = GoogleNewsSource()
from obsei.source.website_crawler_source import TrafilaturaCrawlerConfig, TrafilaturaCrawlerSource

# initialize website crawler source config
source_config = TrafilaturaCrawlerConfig(
   urls=['https://obsei.github.io/obsei/']
)

# initialize website text retriever
source = TrafilaturaCrawlerSource()
import pandas as pd
from obsei.source.pandas_source import PandasSource, PandasSourceConfig

# Initialize your Pandas DataFrame from your sources like csv, excel, sql etc
# In following example we are reading csv which have two columns title and text
csv_file = "https://raw.githubusercontent.com/deepset-ai/haystack/master/tutorials/small_generator_dataset.csv"
dataframe = pd.read_csv(csv_file)

# initialize pandas sink config
sink_config = PandasSourceConfig(
   dataframe=dataframe,
   include_columns=["score"],
   text_columns=["name", "degree"],
)

# initialize pandas sink
sink = PandasSource()

Note: To run transformers in an offline mode, check transformers offline mode.

Text classification: Classify text into user provided categories.

from obsei.analyzer.classification_analyzer import ClassificationAnalyzerConfig, ZeroShotClassificationAnalyzer

# initialize classification analyzer config
# It can also detect sentiments if "positive" and "negative" labels are added.
analyzer_config=ClassificationAnalyzerConfig(
   labels=["service", "delay", "performance"],
)

# initialize classification analyzer
# For supported models refer https://huggingface.co/models?filter=zero-shot-classification
text_analyzer = ZeroShotClassificationAnalyzer(
   model_name_or_path="typeform/mobilebert-uncased-mnli",
   device="auto"
)

Sentiment Analyzer: Detect the sentiment of the text. Text classification can also perform sentiment analysis but if you don't want to use heavy-duty NLP model then use less resource hungry dictionary based Vader Sentiment detector.

from obsei.analyzer.sentiment_analyzer import VaderSentimentAnalyzer

# Vader does not need any configuration settings
analyzer_config=None

# initialize vader sentiment analyzer
text_analyzer = VaderSentimentAnalyzer()

NER (Named-Entity Recognition) Analyzer: Extract information and classify named entities mentioned in text into pre-defined categories such as person names, organizations, locations, medical codes, time expressions, quantities, monetary values, percentages, etc

from obsei.analyzer.ner_analyzer import NERAnalyzer

# NER analyzer does not need configuration settings
analyzer_config=None

# initialize ner analyzer
# For supported models refer https://huggingface.co/models?filter=token-classification
text_analyzer = NERAnalyzer(
   model_name_or_path="elastic/distilbert-base-cased-finetuned-conll03-english",
   device = "auto"
)
from obsei.analyzer.translation_analyzer import TranslationAnalyzer

# Translator does not need analyzer config
analyzer_config = None

# initialize translator
# For supported models refer https://huggingface.co/models?pipeline_tag=translation
analyzer = TranslationAnalyzer(
   model_name_or_path="Helsinki-NLP/opus-mt-hi-en",
   device = "auto"
)
from obsei.analyzer.pii_analyzer import PresidioEngineConfig, PresidioModelConfig, \
   PresidioPIIAnalyzer, PresidioPIIAnalyzerConfig

# initialize pii analyzer's config
analyzer_config = PresidioPIIAnalyzerConfig(
   # Whether to return only pii analysis or anonymize text
   analyze_only=False,
   # Whether to return detail information about anonymization decision
   return_decision_process=True
)

# initialize pii analyzer
analyzer = PresidioPIIAnalyzer(
   engine_config=PresidioEngineConfig(
       # spacy and stanza nlp engines are supported
       # For more info refer
       # https://microsoft.github.io/presidio/analyzer/developing_recognizers/#utilize-spacy-or-stanza
       nlp_engine_name="spacy",
       # Update desired spacy model and language
       models=[PresidioModelConfig(model_name="en_core_web_lg", lang_code="en")]
   )
)

Dummy Analyzer: Does nothing. Its simply used for transforming the input (TextPayload) to output (TextPayload) and adding the user supplied dummy data.

from obsei.analyzer.dummy_analyzer import DummyAnalyzer, DummyAnalyzerConfig

# initialize dummy analyzer's configuration settings
analyzer_config = DummyAnalyzerConfig()

# initialize dummy analyzer
analyzer = DummyAnalyzer()
from obsei.sink.slack_sink import SlackSink, SlackSinkConfig

# initialize slack sink config
sink_config = SlackSinkConfig(
   # Provide slack bot/app token
   # For more detail refer https://slack.com/intl/en-de/help/articles/215770388-Create-and-regenerate-API-tokens
   slack_token="<Slack_app_token>",
   # To get channel id refer https://stackoverflow.com/questions/40940327/what-is-the-simplest-way-to-find-a-slack-team-id-and-a-channel-id
   channel_id="C01LRS6CT9Q"
)

# initialize slack sink
sink = SlackSink()
from obsei.sink.zendesk_sink import ZendeskSink, ZendeskSinkConfig, ZendeskCredInfo

# initialize zendesk sink config
sink_config = ZendeskSinkConfig(
   # provide zendesk domain
   domain="zendesk.com",
   # provide subdomain if you have one
   subdomain=None,
   # Enter zendesk user details
   cred_info=ZendeskCredInfo(
       email="<zendesk_user_email>",
       password="<zendesk_password>"
   )
)

# initialize zendesk sink
sink = ZendeskSink()
from obsei.sink.jira_sink import JiraSink, JiraSinkConfig

# For testing purpose you can start jira server locally
# Refer https://developer.atlassian.com/server/framework/atlassian-sdk/atlas-run-standalone/

# initialize Jira sink config
sink_config = JiraSinkConfig(
   url="http://localhost:2990/jira", # Jira server url
    # Jira username & password for user who have permission to create issue
   username="<username>",
   password="<password>",
   # Which type of issue to be created
   # For more information refer https://support.atlassian.com/jira-cloud-administration/docs/what-are-issue-types/
   issue_type={"name": "Task"},
   # Under which project issue to be created
   # For more information refer https://support.atlassian.com/jira-software-cloud/docs/what-is-a-jira-software-project/
   project={"key": "CUS"},
)

# initialize Jira sink
sink = JiraSink()
from obsei.sink.elasticsearch_sink import ElasticSearchSink, ElasticSearchSinkConfig

# For testing purpose you can start Elasticsearch server locally via docker
# `docker run -d --name elasticsearch -p 9200:9200 -e "discovery.type=single-node" elasticsearch:7.9.2`

# initialize Elasticsearch sink config
sink_config = ElasticSearchSinkConfig(
   # Elasticsearch server hostname
   host="localhost",
   # Elasticsearch server port
   port=9200,
   # Index name, it will create if not exist
   index_name="test",
)

# initialize Elasticsearch sink
sink = ElasticSearchSink()
from obsei.sink.http_sink import HttpSink, HttpSinkConfig

# For testing purpose you can create mock http server via postman
# For more details refer https://learning.postman.com/docs/designing-and-developing-your-api/mocking-data/setting-up-mock/

# initialize http sink config (Currently only POST call is supported)
sink_config = HttpSinkConfig(
   # provide http server url
   url="https://localhost:8080/api/path",
   # Here you can add headers you would like to pass with request
   headers={
       "Content-type": "application/json"
   }
)

# To modify or converting the payload, create convertor class
# Refer obsei.sink.dailyget_sink.PayloadConvertor for example

# initialize http sink
sink = HttpSink()
from pandas import DataFrame
from obsei.sink.pandas_sink import PandasSink, PandasSinkConfig

# initialize pandas sink config
sink_config = PandasSinkConfig(
   dataframe=DataFrame()
)

# initialize pandas sink
sink = PandasSink()

This is useful for testing and dry running the pipeline.

from obsei.sink.logger_sink import LoggerSink, LoggerSinkConfig
import logging
import sys

logger = logging.getLogger("Obsei")
logging.basicConfig(stream=sys.stdout, level=logging.INFO)

# initialize logger sink config
sink_config = LoggerSinkConfig(
   logger=logger,
   level=logging.INFO
)

# initialize logger sink
sink = LoggerSink()

source will fetch data from the selected source, then feed it to the analyzer for processing, whose output we feed into a sink to get notified at that sink.

# Uncomment if you want logger
# import logging
# import sys
# logger = logging.getLogger(__name__)
# logging.basicConfig(stream=sys.stdout, level=logging.INFO)

# This will fetch information from configured source ie twitter, app store etc
source_response_list = source.lookup(source_config)

# Uncomment if you want to log source response
# for idx, source_response in enumerate(source_response_list):
#     logger.info(f"source_response#'{idx}'='{source_response.__dict__}'")

# This will execute analyzer (Sentiment, classification etc) on source data with provided analyzer_config
analyzer_response_list = text_analyzer.analyze_input(
    source_response_list=source_response_list,
    analyzer_config=analyzer_config
)

# Uncomment if you want to log analyzer response
# for idx, an_response in enumerate(analyzer_response_list):
#    logger.info(f"analyzer_response#'{idx}'='{an_response.__dict__}'")

# Analyzer output added to segmented_data
# Uncomment to log it
# for idx, an_response in enumerate(analyzer_response_list):
#    logger.info(f"analyzed_data#'{idx}'='{an_response.segmented_data.__dict__}'")

# This will send analyzed output to configure sink ie Slack, Zendesk etc
sink_response_list = sink.send_data(analyzer_response_list, sink_config)

# Uncomment if you want to log sink response
# for sink_response in sink_response_list:
#     if sink_response is not None:
#         logger.info(f"sink_response='{sink_response}'")
python example.py

Articles

Discussion forum

Discussion about Obsei can be done at community forum

Upcoming release and changelog

Upcoming release plan and progress can be tracked at link (Suggestions are welcome). Refer releases for changelogs.

Security Issue

For any security issue please contact us via email

Stargazers over time

Stargazers over time

Maintainer

This project is being maintained by Lalit Pagaria.

License

Attribution

This could not have been possible without these open source softwares.

Contribution

First off, thank you for even considering contributing to this package, every contribution big or small is greatly appreciated. Please refer our Contribution Guideline and Code of Conduct.

Thanks so much to all our contributors