Awesome piece of software, let's you do map reduce sort of stuff and I believe a lot more. It can run on hadoop.
Basically, you might set up a pipeline so that you do a series of operations to a massive amount of data. For example, let's say you had a huge dump of metadata of wikipedia pages, one line of JSON in a file per URL. You have this huge - let's say 50 terabyte - file.
You can do things like map, filter, etc on it and write it out very easily in pyspark. Something that looks like:
data.filter(lambda x: x['genre'] == 'history').map(lambda x: x['citations']).map(lambda x: [get_hostname(i['url']) for i in x]).toJSON('myresults.json')
That's just example pseudocode, but the idea is you can do things like this very quickly over huge amounts of data. That pipeline might filter out all metadata of pages related to history, pull out the citations, get the hostname of the URLs in the citations and dump it all to a file.
And behind the scenes the data is shuffled to X number of servers, they all work on their dataset, then return results, so with a small amount of code, you can essentially compute stuff like that on terabytes of data in minutes (or even seconds with enough servers), by distributing it to a huge cluster of servers.
It abstracts a lot of stuff you'd need to do to work with hadoop - data shuffling and all that. Extremely cool stuff. I hate to say "big data" but this is an example of how you might work with big data. Extremely scalable, extremely flexible in what you can do, and easy as hell to solve random problems that someone might throw at you. It doesn't matter if the data is a huge file of serial JSON, if its in mysql, or even if it's a huge parquet file. It'll abstract all that bs out so you can do what you need to do, and use all your computing resources available to do it as quickly as possible.
The difference is, that Hadoop etc. brought something completly new onto the table and it's successors are a major upgrades. In addition they are fairly well designed and documented. They are released when they are minimally production ready. Hadoop is 8 years old. Things that can't be said about the newest shiniest JS framework: Reiterations with marginal benefit of already existing solutions.
Biggest issue I see is that you are using "format("memory")--- which is traditionally only for debugging purposes because it will store the output in memory--on the driver node. So I would imagine after running for a long time memory fills up causing expensive JVM garbage collection. Check out the output sinks: http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-sinks
MLlib in Spark is getting quite some steam at the moment. Available through Scala, Java and Python interfaces. I hear they are preparing an R interface too.
Here are features: http://spark.apache.org/docs/latest/mllib-guide.html.
Spark being Spark, this is supposed to scale to a cluster effortlessly too.
I can't speak much on the Clojure side as I've only tinkered in it. I've totally become swept up in Scala when we started using it at my job. A very strong practical case for Scala is Apache Spark.
Remember that guy that dumped that massive reddit comment corpus? I downloaded the data from the archive.org location. I took all of the data, dumped all of the bzip2 files into a folder called "reddit" on my Desktop. Running an ls would give output like this:
RC_2007-10.bz2
RC_2007-11.bz2
RC_2007-12.bz2
...
many more files
...
RC_2015-03.bz2
RC_2015-04.bz2
RC_2015-05.bz2
There are 92 files in all. Then I fired up spark-shell, a Scala REPL that loads spark by default. I then wrote the following line of code:
scala> sc.textFile("reddit").count()
I then watched as Spark examined the bzip2 files in my "reddit" folder, decompressed them, and counted the records, all in parallel. I could've also done this in Python (because Spark supports it as well) but most of the magic I saw happened because of Spark, and under the covers Spark is written in Scala.
There's a whole lot more to both Scala and Spark than this trivial example of course, but I feel that this tiny single line of code really shows the point: On the surface, you see the elegant, concise, Domain-specific language. Under the covers, there is a heavy-duty, parallel computing, howlingly fast JVM monster.
Spark is pretty simple: it's an engine for big data processing that is faster than MapReduce (it makes much more effective use of RAM instead of disk). You get a simple API to work with that engine, and ways to run that engine in batch mode, streaming mode, using SQL, and you get functions that allow you to run statistics and machine learning functions on that engine.
You can download Spark locally, and the Spark quickstart is a good place to start. When you're just starting out, I'd suggest writing Spark with Python using the IPython Notebook, it's much more interactive (and IMO more fun) than constantly using maven/sbt to build jars and then run them. If later on you want to use a primary API or get more speed, Scala is a great option.
If you're using Standalone mode, this documentation describes how to make Spark highly available.
If you're running Spark on YARN, there is no SPOF in YARN itself but you should still carefully design the spark-submit script to retry upon failure, etc.
According to this:
http://spark.apache.org/docs/latest/hardware-provisioning.html
Spark can run in 8GB. But yes, some of the lighter weight alternatives mentioned by others are probably a better bet.
Theoretically yes, though I have not done it. Spark allows connections to databases using JDBC: http://spark.apache.org/docs/latest/sql-programming-guide.html#jdbc-to-other-databases
That being said, I think there's a lot to consider here. If you're doing this for things like word counts like you mentioned above, then Spark could be useful. If there are only a few columns that you're querying over, maybe it's worth doing all of your work at the database side. For example, adding indices to those columns to help speed up your queries.
What do you mean by this?
>I can't dump my data into text files because I need to be able to retrieve and update individual rows daily
Spark is for analysis. I wouldn't recommend updating your database from Spark -- that can get very messy. If you're using Spark for its intended purpose -- data analysis -- then I don't think you should shy away from dumping your data our daily to text. You should play around with what you get from the JDBC connection. If it meets your requirements, great. If not, I think you should refine your objective and remove some of your constraints (e.g. no textfile dump)
myList foreach { x => println(x) }
can be myList foreach println(_)
or just myList foreach println
. So pages.flatMap(wikiText)
is just shorthand for pages.flatMap(wikiText(_))
or, if you prefer, pages flatMap { page => wikiText(page) }
. I'm assuming readFile
is actually <code>SparkContext#textFile</code>, in which case the type of plainText
is RDD[(String, String)]
.a.zip(b).map(Some.apply)
, but see below.pageXml.split('\n')
, but see below.OK, here's the "below" part:
wikiText
's type Option[(String, String)]
if you're returning Some
? You can eliminate the Option
. Then you could just use .map(wikiText)
and your a.zip(b)
won't need the superfluous .map(Some.apply)
, either.pageXML
is already one page from the Wikipedia dump, because textFile
already breaks the incoming file up by lines. When doing statistics on Wikipedia, I doubt you want to be any finer-grained than the page level, and anyway, new EnglishWikipediaPage
makes explicit that you're dealing with a page..filter(_ != null)
when using a Scala API.Hope this helps!
Their distributed systems technology is also the secret sauce. A super-awesome-badass machine learning framework for a single machine isn't interesting at all. For scaling, stick with Spark's MLLib.
Load the offsets from the SSC (spark streaming context) and create the writer with that offset.
I'd poke around http://spark.apache.org/docs/1.2.2/api/java/org/apache/spark/streaming/StreamingContext.html#StreamingContextState()
This function seems to have an offset mechanism:
are you using spark-shell or spark-submit?
If you're using spark-shell you can add
--driver-memory 8g --executor-memory 8g
to your call. For spark-submit you can edit the conf file and set those properties there, or you can pass in a --conf
argument. There's also the ability to set it in the code itself. There's a few examples here: http://spark.apache.org/docs/latest/configuration.html. I could be wrong, but I think the default is only about 256m if you don't specify it.
EDIT: added further explanation
Have you looked into using Apache Spark?
If allows you to easily parallelize various data processing tasks. You can scale it to clusters and it has a python interface. Of course, it's not suited to all problems but many data analysis tasks would fall under its umbrella. It has a very simple interface and can is easy to learn. Check out their programming guide for an overview of what they offer.
I actually did a small benchmark of Python and Scala running on different numbers of cores to see how it scales. You can find the comparison here if you're interested. There's sample code at the bottom that shows a very basic workflow using the Spark API.
Agreed, and Java has a growing number of distributed computation frameworks such as Spark (http://spark.apache.org/) or GPU computation framework such as Aparapi (https://code.google.com/p/aparapi/). These both currently require a lot of modifications and the general purpose tools will probably not result in the same performance that hand-coded C++ would provide, but the savings in development time are remarkable.
I don't know what I'm talking about, but as a user, it would be very cool to have some compiler library that would allow me to annotate my Java program and say "this region is well suited for a GPU, run it on there if possible," or "this region is well suited for distributing across Spark, run it there if a cluster is available."
Perhaps we can use this project (WalnutiQ) to create an intelligent compiler system to do this for us. :-)
One major benefit is that you avoid a lot of duplicated code. For instance, if you want to transform each element in a collection and get a new one, if you loop over the collection, you have to both create a new collection, write the loop, do the transformation of the element, and insert the element into the new collection. The issue with this is that the only thing that varies between transformations is the transformation of the element itself; the rest of the code is duplicated each time. If you instead use a function such as map
, you only have to write the application of map to the collection as well as the transformation code. And given how common different operations on collections are, this can lead to a lot of duplicated code and thus a lot more potential for bugs in the looping.
Another benefit is that if you describe the kind of operation you do in an abstract and limited way, the collection you are using may be able to take advantage of the constraints imposed by the kind of operation you are doing on it. For instance, if you use map
to transform each element in a collection instead of transforming each element by using direct looping, the collection controls the looping, and since map
constrains the transformation to each element independently, the collection can parallelize or distribute the computation of the operation. For examples of this, see for instance Apache Spark and some of the code examples there.
I meant to paste this one also: http://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrameReader.option.html#pyspark.sql.DataFrameReader.option :)
Please help im dying here lol been stuck for 2 days
Spark has its own set of machine learning libraries that are analogous - take a look at Spark.Mllib - http://spark.apache.org/docs/latest/ml-guide.html. Note that there is an RDD and a DataFrame API - stick with the DataFrame API if possible.
Thanks for the great questions.
It's a fun way of executing Spark Column functions. Sometimes it's just fun to play with a framework without a specific production use case. I am happy to have an easy way to execute Spark code from the command line.
Yea, if Spark isn't needed, definitely just use Scala, but this technique lets you easily execute these functions and these functions from the shell.
I ran some testing benchmarks and here are the prelim results on my machine (for the same logic):
I don't think writing tests with evalString is the best (will explain that more in another post), but it's the fastest ;)
What do you mean "general programming"? Every problem has a tool/language best suited for it. If you want to be near the hardware with a lot of freedoms, you pick a C-based language. If you want to write a website, there are frameworks in lots of languages to do that. Ada was a government project to try to standardize a language across military tech (it's used all over the place in military aviation), while SPARKs bread and butter is big data, so it's used across a bunch of industries.
For companies using spark, see http://spark.apache.org/powered-by.html
For companies using Ada, see https://www2.seas.gwu.edu/~mfeldman/ada-project-summary.html
You'll notice lots of the entries in the Ada list are in the aviation industry...ATC, lots of commercial and military aircraft, trains, space rockets and vehicles, some finance and medicine, but aviation, space, and trains keep us Ada developers employeed.
to go from rows to columns, you can groupBy on user_id and game_id, then use the "array" function (pyspark docs) in an aggregation to create arrays for card_face, card_suit, etc. Then use the column getItem method (docs) to create a column from the first/second element of each array.
` import pyspark.sql.functions as F
cols_to_rowify = ["card_face","card_suit"] array_cols = [F.array(x).alias(x + "_array") for x in cols_to_rowify] item_cols = [F.col(x+ "_array").getItem(0).alias(x+'_1) for x in cols_to_rowify]
table\
.groupBy("user_id", "game_id")\
.agg(*array_cols)\
.select(
"user_id",
"game_id",
*item_cols)
will pull out the first item in card_face and card_suit. getItem(1) would get the second. If you have cases where the arrays could be of variable length, then you'd probably want
select(F.when(F.size(array_col) >= 3,F.col(array_col).getItem(3)).otherwise('missing value')`
Spark can read directly from hive tables (if the enableHiveSupport() is called on the spark session builder):
So you can just do that and then your spark.read.table(...) will work on hive table names.
Spark is such an overloaded term in software engineering.
I'm sure there's more.
So it’s not really a connection between IntelliJ and HDFS. It’s a connection between the Spark client on my machine and HDFS. My setup is a little unique since our cluster is a MapR distro of Hadoop. This doc from Apache explains how you can submit a local JAR to a remote cluster I think.
Essentially the process is: 1. Write code in IntelliJ 2. Build JAR with SBT or Maven 3. Submit JAR to YARN with the Spark client on my machine
I've found the pyspark.sql documentation nice and readable. The basic pyspark dataframe operations are basically the same as in pandas. You can also work with spark in a jupyter notebook using findspark.
"Several external tools can be used to help profile the performance of Spark jobs:
Cluster-wide monitoring tools, such as Ganglia, can provide insight into overall cluster utilization and resource bottlenecks. For instance, a Ganglia dashboard can quickly reveal whether a particular workload is disk bound, network bound, or CPU bound. OS profiling tools such as dstat, iostat, and iotop can provide fine-grained profiling on individual nodes. JVM utilities such as jstack for providing stack traces, jmap for creating heap-dumps, jstat for reporting time-series statistics and jconsole for visually exploring various JVM properties are useful for those comfortable with JVM internals." [Ref. Apache Spark - Official Documentation - Advanced Instrumentation http://spark.apache.org/docs/latest/monitoring.html#advanced-instrumentation]
Thanks for taking the time to respond. I'm less interested in the advantages and disadvantages of the different API's, and more interested in the relationship between the RDD and Dataset classes as they are implemented. The documentation for RDD still says it is "the basic abstraction in Spark", but that doesn't appear to be the case. Specifically, Dataset is not built on top of RDD, from what I can tell. Can you comment on this question?
Okay yeah it seems my sbt file was totally wrong, I was running Scala 2.11 all along. I tried updating the depencies to use 2.11 builds of everything but now I just get an error with spark-csv:
name := "Averager" version := "1.0" scalaVersion := "2.11.8" libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.0" libraryDependencies += "org.apache.spark" % "spark-sql_2.11" % "2.1.0" libraryDependencies += "com.databricks" % "spark-csv_2.11" % "1.5.0"
But now I get:
Exception in thread "main" java.lang.ClassNotFoundException: Failed to find data source: com.databricks.spark-csv. Please find packages at http://spark.apache.org/third-party-projects.html at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:569)
Reading specs if pretty important though. Sometimes, the tech you work with doesn't really have nice guides and you're just stuck with reading documentation that are dozens of pages long.
To do real-time computation on the stream, you probably don't want 2 instances of Spark running. Instead, have one Spark application and within your Spark app, set up as many Kinesis receiver DStreams as you have shards, so that each DStream consumes one shard. See Spark Streaming Kinesis Architecture for more info on this.
Then, you can use DStream operations in your Spark app like union() to combine the separate streams, reduceByKeyAndWindow() to calculate totals for sliding time windows or updateStateByKey() to maintain cumulative totals, etc. This part really depends on what computation you want to do over the streaming data.
Spark will distribute the computation over the DStreams across the cluster resources that you provide to it.
There's a lot we could write here, but some initial guidance: (I'm assuming you are literally working on actual continuous, ongoing streams of video, as opposed to e.g. analysis of large video files.)
For your situation, I recommend you use either Python or Scala, whichever you are most comfortable with right now. Scala lets you do a few more things, and with better performance, but the key for you personally is avoiding spending time on anything not absolutely necessary, at least right now.
Focus first on learning RDDs. Your best friend is the Spark Programming guide:
http://spark.apache.org/docs/latest/programming-guide.html
Invest particular effort in the section "Resilient Distributed Datasets (RDDs)". (You can skip over Shared Variables for now, for example.) That done, start learning about Spark Streaming:
http://spark.apache.org/docs/latest/streaming-programming-guide.html
You can put off learning about DataFrames for now. Generally speaking, I advise the opposite: most people should start with DataFrames, and then go down to RDDs only if they need to. However, Spark Streaming currently is built to operate on RDDs, so for your goal it's better to focus. Just learning RDDs + streaming will keep you busy :) Eventually I do advise moving to DataFrames (or maybe its generalization Datasets if you're using Scala), but that is a low priority given you mainly need to determine whether using Spark at all is going to be viable for what you need.
By the way: In Spark 2.0, which is just around the corner, there is an initial release of something called "Structured Streaming". It will still be a bit experimental and incomplete compared to standard streaming, though, and you won't be able to find online answers to the questions you'll have until it's been out a while. So stick with standard streaming for now.
Good luck!
[For posterity: advice above is current for Spark 1.6, and very likely, Spark 2.0 as well]
If your into python take a look at: http://www.nltk.org. For Perl there is http://search.cpan.org/~kwilliams/Algorithm-NaiveBayes-0.04/lib/Algorithm/NaiveBayes.pm. For the JVM you can check out Spark MLib http://spark.apache.org/mllib/
Perhaps you're looking for this: http://spark.apache.org/docs/latest/job-scheduling.html#dynamic-resource-allocation
>Spark provides a mechanism to dynamically adjust the resources your application occupies based on the workload. This means that your application may give resources back to the cluster if they are no longer used and request them again later when there is demand. This feature is particularly useful if multiple applications share resources in your Spark cluster.
I'm a total noob at hadoop, spark, and big data in general. However I have been doing research in Spark and Hadoop for a while now.
From what I see, hadoop is harder to program in. This is an official example. It simply searches for a pattern in all the files inside a folder, and if it finds it, it prints the results inside another file, in another folder. https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/Grep.java
However in the programming guide for spark, all the code looks much much shorter http://spark.apache.org/docs/latest/programming-guide.html
...even if it's still java (but also has scala, python and some R integration)
Yeah, that's their thing that parses SQL queries and runs them as Spark operations. So that would count as an example of translating SQL into the operations shown in this page.
Language: Scala
For: Distributed computing big data with Spark
Reason: It's what Spark is written in and it seems like a nice alternative to Java. I'm having a lot of fun with it. So long to verbose Java code and MapReduce!
Sounds like a machine learning problem. Does your dataset contain info about pages that those users like?
You could apply supervised learning alghorithms - multiclass classification. If your dataset is larger than your memory I recommend having a look on http://spark.apache.org/ and http://spark.apache.org/docs/latest/mllib-guide.html
Hope it helps.
Thanks for the suggestion. After hours of research yesterday I found that there are two major ways to build a recommendation system
1) Collaborative filtering 2) Content-based recommender system
What I'm looking for is the Content-based recommender system. Matrix factorization are usually useful for the former. I'm not looking for empty attributes to be filled. I'm looking for recommendation on new set of (complete)data which is based on old set of data.
I found that Naive Bayes classifiers is a great way to do the later. Luckily Mllib does have an implementation of the same.
I'm passing each of my song attributes("danceability","energy","key","loudness","tempo","time_signature") as a dense vector. Training the NaiveBayes on new set of data and testing it with the old set of data did the trick for me.
> Do you think it's worth learning clojure or Scala given a background in Java?
Depends much on what you want to accomplish.
If you want to work on a platform which tends to prefer Scala (say, Apache Spark), of course it's worth it to learn Scala. If you're interested in functional programming, Scala could be useful there too (but if that's your goal there are more strictly functional languages that might be better for that).
If you want to learn a lisp-like language, I think Clojure's the best one out there these days; and with a background in Java you'll already know what libraries are available. It seems like a lot less typing than Java or even Scala to accomplish something too.
> Or are there better more common/applicable languages to work on?
Well, of course there are. Javascript runs everything from browsers to servers (node.js). Python (Jython if you want to run it on a JVM) is used by practically all Linux shops, and Microsoft seems to like C# these days. Lots of websites use Ruby. And many important open source projects (Linux itself; Postgres; etc) are C.
I like Scala - using it with Spark/Shark. Have also used JRuby for real work (mostly due to constraints where a Ruby application had to run on a Windows machine and the target machines (at a customer site) had a JVM installed and would have had painful loops to jump through to install native Ruby). I've played with Clojure and think I would like it, but haven't used it for any real work yet.
But in practice, I do type more lines of C, Ruby, and Java than the other languages.
Spark is the new cool tool to do distributed processing for different application fields.
Other than that I can point you to my own scala resource catalog page, with categories.
correct ... they aren't for purchase yet ... that was the Aug 17 announce date. As of now, mainframes that are plugged into the wall, bought by customers, it's 0% native linux.
much of the change away from zos is because of "big data" things like spark which is like hadoop, but leverages the mainframe architecture.
the obvious realization is that at an entry point of $250,000, ibm has a hard time selling devs on why their mainframe architecture or software stack is a good idea. this is the same logic that made oracle release the express edition or many years ago, when sun made solaris free on the x86 - creating an obvious carrot dangling from a string.
microsoft also has a iis express and a sql server express along the same business lines; i don't think this business practice has actually worked for any of the players, but who knows, I am not the head of a successful multi-billion dollar trans-national corporation, so perhaps their business decisions are more relevant than my opinion. Ho hum...
btw, I was around in 1999 when suse linux was being run in vm/esa on the frames for the first time. vm/esa goes back to about 1960 and is ibm's way to have virtual machines ... you know, the whole hotness today with kvm, docker, virtualbox, vagrant, vmware, xen, or whatever the latest cool thing is - ibm was doing it before intel started and linus torvalds was born.
Spark is just an engine for data processing. How 'fast' it is relies mainly on how efficient your transformations are and how well the problem parallelises.
Also; when you ask "Do Spark handle incoming requests concurrently?" I'm getting the distinct feeling you didn't bother to read the homepage at all.
You should try running Spark on your laptop even before the course using the steps at http://spark.apache.org/docs/latest/quick-start.html. There's also a book on it now that might be worth a look at.
Basically the major difference is that the Hive community wanted to change the existing Hive engine to run on Spark (in addition to MapReduce, Tez and other runtimes), whereas Spark SQL was an attempt to make a new SQL engine optimized only for Spark that can also access data sources beyond Hive. Spark SQL provides richer integration with the Spark API (e.g. through DataFrames, http://spark.apache.org/docs/latest/sql-programming-guide.html) and can also access non-Hive data sources (e.g. Cassandra, JDBC). In the long run though, it would be nice if Hive-on-Spark called into Spark SQL. In the short run, it was hard to refactor Hive to do that and the people there wanted to keep the existing code for query execution in Hive and just map it down at a lower level.
Out of curiosity, have you considered using Spark (http://spark.apache.org/) for your real time processing, or Shark for interactive query analysis? If you have considered them, what reasons made you go with Storm/Hive?
We've been working with Hadoop for a while and while it gets the job done, the big problem is it is s l o w. The new hotness is Spark, which is much better about keeping data in memory rather than on disk and is a couple orders of magnitude faster than Hadoop for typical jobs.