Source code for pyoko.db.schema_update

# -*-  coding: utf-8 -*-
"""
"""

# Copyright (C) 2015 ZetaOps Inc.
#
# This file is licensed under the GNU General Public License v3
# (GPLv3).  See LICENSE.txt for details.

from __future__ import print_function
import codecs
from sys import stdout
import time
import six
from riak import RiakError
from pyoko.conf import settings
from pyoko.db.connection import client
import os
import copy
from pyoko.manage import BaseThreadedCommand
from pyoko.manage import ReIndex

try:
    from urllib.request import urlopen
    from urllib.error import HTTPError
except ImportError:
    from urllib2 import urlopen, HTTPError


[docs]class FakeContext(object):
[docs] def has_permission(self, perm): return True
fake_context = FakeContext()
[docs]def wait_for_schema_creation(index_name): url = 'http://%s:8093/internal_solr/%s/select' % (settings.RIAK_SERVER, index_name) print("pinging solr schema: %s" % url, end='') while True: try: urlopen(url) return except HTTPError as e: if e.code == 404: time.sleep(1) import traceback print(traceback.format_exc()) else: raise
[docs]def wait_for_schema_deletion(index_name): url = 'http://%s:8093/internal_solr/%s/select' % (settings.RIAK_SERVER, index_name) i = 0 while True: i += 1 stdout.write("\r Waiting for actual deletion of solr schema %s %s" % (index_name, i * '.')) stdout.flush() try: urlopen(url) time.sleep(1) except HTTPError as e: print("") if e.code == 404: return else: raise
[docs]def get_schema_from_solr(index_name): url = 'http://%s:8093/internal_solr/%s/admin/file?file=%s.xml' % (settings.RIAK_SERVER, index_name, index_name) try: return urlopen(url).read() except HTTPError as e: if e.code == 404: return "" else: raise
[docs]class SchemaUpdater(object): """ traverses trough all models, collects fields marked for index or store in solr then creates a solr schema for these fields. """ FIELD_TEMPLATE = '<field type="{type}" name="{name}" indexed="{index}" ' \ 'stored="{store}" multiValued="{multi}" />' def __init__(self, models, threads, force): self.report = [] self.models = models self.force = force self.client = client self.threads = int(threads) self.n_val = client.bucket_type(settings.DEFAULT_BUCKET_TYPE).get_property('n_val') self.base_thread = BaseThreadedCommand()
[docs] def run(self, check_only=False): """ Args: check_only: do not migrate, only report migration is needed or not if True Returns: """ reindex = ReIndex().reindex_model models = copy.deepcopy(self.models) num_models = len(models) self.client.create_search_index('foo_index', '_yz_default', n_val=self.n_val) print("Schema creation started for %s model(s) with max %s threads\n" % ( num_models, self.threads)) exec_models = [] self.base_thread.do_with_submit(self.find_models_and_delete_search_index, models, self.force, exec_models, check_only, threads=self.threads) if exec_models: models = copy.deepcopy(exec_models) self.creating_schema_and_index(models, self.create_schema) self.creating_schema_and_index(models, self.create_index) self.base_thread.do_with_submit(reindex, models, threads=self.threads)
[docs] def find_models_and_delete_search_index(self, model, force, exec_models, check_only): """ Finds models to execute and these models' exist search indexes are deleted. For other operations, necessary models are gathered to list(exec_models) Args: model: model to execute force(bool): True or False if True, all given models are executed. exec_models(list): if not force, models to execute are gathered to list. If there is not necessity to migrate operation model doesn't put to exec list. check_only: do not migrate, only report migration is needed or not if True Returns: """ ins = model(fake_context) fields = self.get_schema_fields(ins._collect_index_fields()) new_schema = self.compile_schema(fields) bucket_name = model._get_bucket_name() bucket_type = client.bucket_type(settings.DEFAULT_BUCKET_TYPE) bucket = bucket_type.bucket(bucket_name) index_name = "%s_%s" % (settings.DEFAULT_BUCKET_TYPE, bucket_name) if not force: try: schema = get_schema_from_solr(index_name) if schema == new_schema: print("Schema %s is already up to date, nothing to do!" % index_name) return elif check_only and schema != new_schema: print("Schema %s is not up to date, migrate this model!" % index_name) return except: import traceback traceback.print_exc() bucket.set_property('search_index', 'foo_index') try: client.delete_search_index(index_name) except RiakError as e: if 'notfound' != e.value: raise wait_for_schema_deletion(index_name) exec_models.append(model)
[docs] def creating_schema_and_index(self, models, func): """ Executes given functions with given models. Args: models: models to execute func: function name to execute Returns: """ waiting_models = [] self.base_thread.do_with_submit(func, models, waiting_models, threads=self.threads) if waiting_models: print("WAITING MODELS ARE CHECKING...") self.creating_schema_and_index(waiting_models, func)
[docs] def create_schema(self, model, waiting_models): """ Creates search schemas. Args: model: model to execute waiting_models: if riak can't return response immediately, model is taken to queue. After first execution session, method is executed with waiting models and controlled. And be ensured that all given models are executed properly. Returns: """ bucket_name = model._get_bucket_name() index_name = "%s_%s" % (settings.DEFAULT_BUCKET_TYPE, bucket_name) ins = model(fake_context) fields = self.get_schema_fields(ins._collect_index_fields()) new_schema = self.compile_schema(fields) schema = get_schema_from_solr(index_name) if not (schema == new_schema): try: client.create_search_schema(index_name, new_schema) print("+ %s (%s) search schema is created." % (model.__name__, index_name)) except: print("+ %s (%s) search schema checking operation is taken to queue." % ( model.__name__, index_name)) waiting_models.append(model)
[docs] def create_index(self, model, waiting_models): """ Creates search indexes. Args: model: model to execute waiting_models: if riak can't return response immediately, model is taken to queue. After first execution session, method is executed with waiting models and controlled. And be ensured that all given models are executed properly. Returns: """ bucket_name = model._get_bucket_name() bucket_type = client.bucket_type(settings.DEFAULT_BUCKET_TYPE) index_name = "%s_%s" % (settings.DEFAULT_BUCKET_TYPE, bucket_name) bucket = bucket_type.bucket(bucket_name) try: client.get_search_index(index_name) if not (bucket.get_property('search_index') == index_name): bucket.set_property('search_index', index_name) print("+ %s (%s) search index is created." % (model.__name__, index_name)) except RiakError: try: client.create_search_index(index_name, index_name, self.n_val) bucket.set_property('search_index', index_name) print("+ %s (%s) search index is created." % (model.__name__, index_name)) except RiakError: print("+ %s (%s) search index checking operation is taken to queue." % ( model.__name__, index_name)) waiting_models.append(model)
[docs] def create_report(self): """ creates a text report for the human user :return: str """ if self.report: self.report += "\n Operation took %s secs" % round( time.time() - self.t1) else: self.report = "Operation failed: %s \n" % self.report return self.report
[docs] @classmethod def get_schema_fields(cls, fields): """ :param list[(,)] fields: field props tupple list :rtype: list[str] :return: schema fields list """ return [cls.FIELD_TEMPLATE.format(name=name, type=field_type, index=str(index).lower(), store=str(store).lower(), multi=str(multi).lower()) for name, field_type, index, store, multi in fields]
[docs] @staticmethod def compile_schema(fields): """ joins schema fields with base solr schema :param list[str] fields: field list :return: compiled schema :rtype: byte """ path = os.path.dirname(os.path.realpath(__file__)) # path = os.path.dirname( # os.path.abspath(inspect.getfile(inspect.currentframe()))) with codecs.open("%s/solr_schema_template.xml" % path, 'r', 'utf-8') as fh: schema_template = fh.read() schema = schema_template.format('\n'.join(fields)) if six.PY2: return schema.encode('utf-8') return schema